1.选择curator-framework的jar包,1.0.1的版本已经十分稳定,相对应的zk版本是3.3.x,还在开发中的版本是1.1.x,对应的版本是zk3.4.x。
2.zookeeper对应的hosts和永久znode路径
public class ZKUtil {
//Zookeeper 永久节点
public static final String PATH = "/spider";
//Zookeeper连接地址
public static final String ZOOKEEPER_HOSTS = PropUtil.getValue(Config.CONF_SPIDER_NAME,"zk");
}
3.创建永久和临时znode
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SparkHtmlEntry {
public static final Logger log = LoggerFactory.getLogger(SparkHtmlEntry.class);
public SparkHtmlEntry() {
//重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
String hosts = ZKUtil.ZOOKEEPER_HOSTS;
CuratorFramework client = CuratorFrameworkFactory.newClient(hosts, retryPolicy);
//建立连接
client.start();
try {
//获取本地ip地址
InetAddress localHost = InetAddress.getLocalHost();
String ip = localHost.getHostAddress();
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
.withACL(Ids.OPEN_ACL_UNSAFE).forPath(ZKUtil.PATH+"/"+ip);
} catch (Exception e) {
// TODO Auto-generated catch block
log.info("SparkHtmlEntry() --> ",e);
e.printStackTrace();
}
}
}
4.客户端监视器watcher
import java.util.List;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.paic.spider.util.ZKUtil;
public class ConfigWatcher implements Watcher{
public static final Logger log = LoggerFactory.getLogger(ConfigWatcher.class);
private CuratorFramework client;
private List<String> oldChildrens;
public ConfigWatcher() {
//重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
String hosts = ZKUtil.ZOOKEEPER_HOSTS;
client = CuratorFrameworkFactory.newClient(hosts, retryPolicy);
//建立连接
client.start();
try {
oldChildrens = client.getChildren().usingWatcher(this).forPath(ZKUtil.PATH);
} catch (Exception e) {
// TODO Auto-generated catch block
log.info("ConfigWatcher() --> ",e);
e.printStackTrace();
}
}
//具体业务实现
public void displayConfig() {
try {
List<String> currentChildrenList = client.getChildren()
.usingWatcher(this).forPath(ZKUtil.PATH);
for (String child : currentChildrenList) {
if (!oldChildrens.contains(child)) {
System.out.println("新增加的爬虫节点为:" + child);
}
}
for (String child : oldChildrens) {
if (!currentChildrenList.contains(child)) {
System.out.println("挂掉的爬虫节点为:" + child);
//邮件告警
}
}
//子节点集合更新
this.oldChildrens = currentChildrenList;
} catch (Exception e) {
log.info("displayConfig --> ",e);
e.printStackTrace();
}
}
//监控节点变化
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
try {
//调用具体业务代码
displayConfig();
} catch (Exception e) {
// TODO Auto-generated catch block
log.info("process --> ",e);
e.printStackTrace();
}
}
}
}
5.shell查看
[root@SZB-L0032016 bin]# ./zookeeper-client -server localhost:2181
[zk: localhost:2181(CONNECTED) 0] ls /
[hbase, zookeeper, drill, spider]
[zk: localhost:2181(CONNECTED) 3] ls /spider
[10.13.35.162]
[zk: localhost:2181(CONNECTED) 4] ls /spider
[]