curator操作zookeeper

浏览: 2342

1.选择curator-framework的jar包,1.0.1的版本已经十分稳定,相对应的zk版本是3.3.x,还在开发中的版本是1.1.x,对应的版本是zk3.4.x。

2.zookeeper对应的hosts和永久znode路径

public class ZKUtil {
//Zookeeper 永久节点
public static final String PATH = "/spider";
//Zookeeper连接地址
public static final String ZOOKEEPER_HOSTS = PropUtil.getValue(Config.CONF_SPIDER_NAME,"zk");
}

3.创建永久和临时znode

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkHtmlEntry {
public static final Logger log = LoggerFactory.getLogger(SparkHtmlEntry.class);


public SparkHtmlEntry() {
//重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
String hosts = ZKUtil.ZOOKEEPER_HOSTS;
CuratorFramework client = CuratorFrameworkFactory.newClient(hosts, retryPolicy);

//建立连接
client.start();

try {
//获取本地ip地址
InetAddress localHost = InetAddress.getLocalHost();
String ip = localHost.getHostAddress();
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
.withACL(Ids.OPEN_ACL_UNSAFE).forPath(ZKUtil.PATH+"/"+ip);
} catch (Exception e) {
// TODO Auto-generated catch block
log.info("SparkHtmlEntry() --> ",e);
e.printStackTrace();
}
}
}

4.客户端监视器watcher

import java.util.List;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.paic.spider.util.ZKUtil;

public class ConfigWatcher implements Watcher{
public static final Logger log = LoggerFactory.getLogger(ConfigWatcher.class);

private CuratorFramework client;
private List<String> oldChildrens;

public ConfigWatcher() {
//重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
String hosts = ZKUtil.ZOOKEEPER_HOSTS;
client = CuratorFrameworkFactory.newClient(hosts, retryPolicy);

//建立连接
client.start();

try {
oldChildrens = client.getChildren().usingWatcher(this).forPath(ZKUtil.PATH);
} catch (Exception e) {
// TODO Auto-generated catch block
log.info("ConfigWatcher() --> ",e);
e.printStackTrace();
}
}

//具体业务实现
public void displayConfig() {
try {
List<String> currentChildrenList = client.getChildren()
.usingWatcher(this).forPath(ZKUtil.PATH);
for (String child : currentChildrenList) {
if (!oldChildrens.contains(child)) {
System.out.println("新增加的爬虫节点为:" + child);
}
}
for (String child : oldChildrens) {
if (!currentChildrenList.contains(child)) {
System.out.println("挂掉的爬虫节点为:" + child);

//邮件告警
}
}

//子节点集合更新
this.oldChildrens = currentChildrenList;
} catch (Exception e) {
log.info("displayConfig --> ",e);
e.printStackTrace();
}

}

//监控节点变化
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
try {
//调用具体业务代码
displayConfig();
} catch (Exception e) {
// TODO Auto-generated catch block
log.info("process --> ",e);
e.printStackTrace();
}
}

}

}

5.shell查看

[root@SZB-L0032016 bin]# ./zookeeper-client -server localhost:2181

[zk: localhost:2181(CONNECTED) 0] ls /
[hbase, zookeeper, drill, spider]
[zk: localhost:2181(CONNECTED) 3] ls /spider
[10.13.35.162]
[zk: localhost:2181(CONNECTED) 4] ls /spider
[]
推荐 1
本文由 平常心 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册