您的位置:首页 > 其它

(原) 2.3 Curator使用

2016-07-15 10:35 295 查看

本文为原创文章,转载请注明出处,谢谢

Curator使用

1、jar包引入,演示版本为2.6.0,非maven项目,可以下载jar包导入到项目中

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.6.0</version>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.6.0</version>
</dependency>


2、RetryPolicy:重试机制

ExponentialBackoffRetry:每次重试会增加重试时间baseSleepTimeMs

ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)

ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)

baseSleepTimeMs:基本重试时间差

maxRetries:最大重试次数

maxSleepMs:最大重试时间

RetryNTimes

RetryNTimes(int n, int sleepMsBetweenRetries)

n:重试次数

sleepMsBetweenRetries:每次重试间隔时间

RetryUntilElapsed

RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)

maxElapsedTimeMs:最大重试时间

sleepMsBetweenRetries:每次重试间隔时间

BoundedExponentialBackoffRetry、RetryOneTime、SleepingRetry

3、创建Zookeeper连接

传统方式

  示例:CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.117.128:2181",5000,5000,retryPolicy);

  API:  

newClient(java.lang.String connectString, org.apache.curator.RetryPolicy retryPolicy)

newClient(java.lang.String connectString, int sessionTimeoutMs, int connectionTimeoutMs, org.apache.curator.RetryPolicy retryPolicy)


connectString:Zookeeper服务器地址

retryPolicy:自定义重试机制

sessionTimeoutMs:session超时时间

connectionTimeoutMs:连接超时时间

链式方式 

curatorFramework = CuratorFrameworkFactory.builder()
.connectString("192.168.117.128:2181")
//.authorization() 设置访问权限 设置方法同原生API
.sessionTimeoutMs(5000).connectionTimeoutMs(5000)
.retryPolicy(retryPolicy).build();


代码示例

    public void createSession() {
//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);//基本重试间隔时间,重试次数(每次重试时间加长)
//RetryPolicy retryPolicy = new RetryNTimes(5,1000);//重试次数,重试间隔时间
RetryPolicy retryPolicy = new RetryUntilElapsed(5000,1000);//重试时间,重试间隔时间
//curatorFramework = CuratorFrameworkFactory.newClient("192.168.117.128:2181",5000,5000,retryPolicy);
curatorFramework = CuratorFrameworkFactory.builder() .connectString("192.168.117.128:2181") //.authorization() 设置访问权限 设置方法同原生API .sessionTimeoutMs(5000).connectionTimeoutMs(5000) .retryPolicy(retryPolicy).build();
curatorFramework.start();
}


4、创建节点

public void createNode() throws Exception {
createSession();
String path = curatorFramework.create()
.creatingParentsIfNeeded()//如果父节点没有自动创建
//.withACL()设置权限  权限创建同原生API
.withMode(CreateMode.PERSISTENT)//节点类型
.forPath("/note_curator/02", "02".getBytes());
System.out.println("path:"+path);
}


节点类型、权限设置详见2.1Zookeeper原生API使用

5、节点删除

public void del() throws Exception {
createSession();
curatorFramework.delete()
.guaranteed()//保证机制,出错后后台删除 直到删除成功
.deletingChildrenIfNeeded()//删除当前节点下的所有节点,再删除自身
.forPath("/note_curator");
}


6、获取子节点

public void getChildren() throws Exception {
createSession();
List<String> children = curatorFramework.getChildren().forPath("/note_curator");
System.out.println(children);

}


7、获取节点信息

public void getData() throws Exception {
createSession();
Stat stat = new Stat();
byte[] u = curatorFramework.getData().storingStatIn(stat).forPath("/note_curator");
System.out.println(new String(u));
System.out.println(stat);
}


8、设置节点信息

public void setData() throws Exception {
createSession();
curatorFramework.setData()
//.withVersion(1) 设置版本号 乐观锁概念
.forPath("/note_curator/01", "shengke0815".getBytes());
}


9、是否存在节点

public void exists() throws Exception {
createSession();
Stat s = curatorFramework.checkExists().forPath("/note_curator");
System.out.println(s);
}


10、设置节点信息回调

ExecutorService executorService = Executors.newFixedThreadPool(5);//线程池
@Test
public void setDataAsync() throws Exception {
createSession();
curatorFramework.setData().inBackground(new BackgroundCallback() {//设置节点信息时回调方法
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {

System.out.println(curatorFramework.getZookeeperClient());
System.out.println(curatorEvent.getResultCode());
System.out.println(curatorEvent.getPath());
System.out.println(curatorEvent.getContext());
}
},"shangxiawen",executorService).forPath("/note_curator","sksujer0815".getBytes());
Thread.sleep(Integer.MAX_VALUE);
}


API:

inBackground(org.apache.curator.framework.api.BackgroundCallback backgroundCallback, java.lang.Object o, java.util.concurrent.Executor executor);


backgroundCallback:自定义BackgroundCallback


o:上下文信息,回调方法中curatorEvent.getContext()可获取此信息

executor:线程池

11、监听节点改变事件

public void nodeListen() throws Exception {
createSession();
final NodeCache cache = new NodeCache(curatorFramework,"/note_curator");
cache.start();
cache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println(new String(cache.getCurrentData().getData()));
System.out.println(cache.getCurrentData().getPath());
}
});

Thread.sleep(Integer.MAX_VALUE);

}


12、监听子节点列表改变事件

public void nodeClildrenListen() throws Exception {
createSession();
final PathChildrenCache cache = new PathChildrenCache(curatorFramework,"/note_curator",true);
cache.start();
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
switch (pathChildrenCacheEvent.getType()){
case CHILD_ADDED:
System.out.println("add children");
System.out.println(new String(pathChildrenCacheEvent.getData().getData()));
System.out.println(new String(pathChildrenCacheEvent.getData().getPath()));
break;
case CHILD_REMOVED:
System.out.println("remove children");
System.out.println(new String(pathChildrenCacheEvent.getData().getData()));
System.out.println(new String(pathChildrenCacheEvent.getData().getPath()));
break;
case CHILD_UPDATED:
System.out.println("update children");
System.out.println(new String(pathChildrenCacheEvent.getData().getData()));
System.out.println(new String(pathChildrenCacheEvent.getData().getPath()));
break;
}
}
});

Thread.sleep(Integer.MAX_VALUE);

}


下一节:3.1 Zookeeper应用 - Master选举
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: