您的位置:首页 > 其它

ZooKeeper=Util 重复注册监听节点

2015-08-16 17:39 471 查看
package com.easou.noveladmin.utils;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.commons.lang.StringUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.easou.noveladmin.model.ApiAppUpdateCodeCh;
import com.easou.noveladmin.model.ApiConstants;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;

public class ApiUtil {
private static Logger logger = LoggerFactory.getLogger("zookeeperLogger");
private static CountDownLatch connectedLatch = new CountDownLatch(1);
private static Watcher watcher = new ConnectedWatcher(connectedLatch);

public static ZooKeeper zk = null;
//public static String rootPath="/appUpdate";
public static boolean isInit_appUpdate=false;
public static boolean isInit_appFound=false;
static{
}

static class ConnectedWatcher implements Watcher {

private CountDownLatch connectedLatch;

ConnectedWatcher(CountDownLatch connectedLatch) {
this.connectedLatch = connectedLatch;
}

@Override
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
connectedLatch.countDown();
}
}
}

public static Watcher wh =null;

/* public static Watcher wh = new Watcher() {
public void process(org.apache.zookeeper.WatchedEvent event) {
try {
String path = event.getPath();
EventType type = event.getType();
if(type.getIntValue()==3){
logger.info("###################### "+"数据改变"+ "路径" + path + " 类型:" + event.getType());
}else if (type.getIntValue()==4) {
logger.info("###################### "+"子节点改变"+ "路径" + path + " 类型:" + event.getType());
if(StringUtils.isNotBlank(path)){
logger.info("******************添加子节点,重新监听路径"+path);
addWatcher(zk, path);
}
}
if (StringUtils.isNotBlank(event.getPath())) {
logger.info("###################### "+"主动监听:"+path);
}
if(StringUtils.isNotBlank(path)){
zk.getData(path, wh, null);
}else{
zk.getData(rootPath, wh, null);
}
} catch (Exception e) {
logger.error("获取Wach失败:",e.getMessage());
}
}
};*/

public static Watcher initWatch(final String defaultRootPath){
wh = new Watcher() {
public void process(org.apache.zookeeper.WatchedEvent event) {
try {
String path = event.getPath();
EventType type = event.getType();
if(type.getIntValue()==3){
logger.info("###################### "+"数据改变"+ "路径" + path + " 类型:" + event.getType());
}else if (type.getIntValue()==4) {
logger.info("###################### "+"子节点改变"+ "路径" + path + " 类型:" + event.getType());
if(StringUtils.isNotBlank(path)){
logger.info("******************添加子节点,重新监听路径"+path);
addWatcher(zk, path);
}
}
if (StringUtils.isNotBlank(event.getPath())) {
logger.info("###################### "+"主动监听:"+path);
}
if(StringUtils.isNotBlank(path)){
zk.getData(path, initWatch(path), null);
}else{
zk.getData(defaultRootPath, initWatch(defaultRootPath), null);
}
} catch (Exception e) {
logger.error("获取Wach失败:",e.getMessage());
}
}
};
return wh;
}

public static void addWatcher(ZooKeeper zke,final String path){
List<String> children =null;
try {
Watcher wh_c = wh;
children = zke.getChildren(path, wh_c);
if(children==null||children.size()<=0){
logger.info("主动监听:"+path+"wh_c:"+wh_c);
zk.getData(path, wh_c, null);
return;
}else{
for (String str : children) {
if(StringUtils.isNotBlank(str)){
String pwd=path+"/"+str;
addWatcher(zke, pwd);
}
}
}
zk.getData(path, wh_c, null);
logger.info("主动监听:父节点:"+path);
} catch (KeeperException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

public static String getValue(ZooKeeper zk,String key){
try {
long st = System.currentTimeMillis();
byte[] data = zk.getData(key, wh, null);
//byte[] data = zk.getData(key, false, null);
String val = new String(data, "UTF-8");
logger.info("获取val用时:"+(System.currentTimeMillis()-st));
return val;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return "";
}

public static void setValue(ZooKeeper zk,String path,String value) throws Exception{
// 修改节点/root/childone下的数据,第三个参数为版本,如果是-1,那会无视被修改的数据版本,直接改掉
try {
long st = System.currentTimeMillis();
zk.setData(path,value.getBytes("UTF-8") , -1);
logger.info("设置val用时:"+(System.currentTimeMillis()-st));
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
throw new Exception(e);
}

}


/**
* 获取zookeeper连接,如果连接未建立完成,则程序阻塞new ZooKeeper(ZKPURL, TIMEOUT, wh)
* @param zkpUrl
* @param timeout
* @return
*/
public static ZooKeeper getZooKeeperConnection(String zkpUrl, int timeout,String rootPath) {
long st = System.currentTimeMillis();
ZooKeeper zooKeeper = null;
try {
zooKeeper = new ZooKeeper(zkpUrl, timeout, watcher);
if (States.CONNECTING == zooKeeper.getState()) {
if (logger.isDebugEnabled())
logger.debug("zookeeper连接状态:" + zooKeeper.getState());
try {
connectedLatch.await();
if (logger.isDebugEnabled())
logger.debug("zookeeper连接状态:" + zooKeeper.getState());
} catch (InterruptedException e) {
e.printStackTrace();
}
}

if(ApiConstants.API_APPFOUND_ROOTPATH.equals(rootPath)){
if(!isInit_appFound){
logger.info("没有初始化....");
initWatch(rootPath);
zk = new ZooKeeper(zkpUrl, 300000, wh);
addWatcher(zk, rootPath);
isInit_appFound=true;
}else{
logger.info("已经初始化....");
}
}else{
if(!isInit_appUpdate){
logger.info("没有初始化....");
initWatch(rootPath);
zk = new ZooKeeper(zkpUrl, 300000, wh);
addWatcher(zk, rootPath);
isInit_appUpdate=true;
}else{
logger.info("已经初始化....");
}
}


// if(!isInit){
// logger.info("没有初始化....");
// initWatch(rootPath);
// zk = new ZooKeeper(zkpUrl, 300000, wh);
// addWatcher(zk, rootPath);
// isInit=true;
// }else{
// logger.info("已经初始化....");
// }
logger.info("获取连接用时:"+(System.currentTimeMillis()-st));
} catch (IOException e) {
logger.error("获取连接失败:isInit:"+isInit_appFound+" zooKeeper:"+zooKeeper,e.getMessage());
e.printStackTrace();
}
return zooKeeper;
}

// public static void saveWach(String rootPath){
// if(!isInit){
// logger.info("没有初始化....");
// initWatch(rootPath);
// zk = new ZooKeeper(zkpUrl, 300000, wh);
// addWatcher(zk, rootPath);
// isInit=true;
// }else{
// logger.info("已经初始化....");
// }
// }


public static void delteAllNode(ZooKeeper zk,String path){
try {
List<String> children = zk.getChildren(path,true);
if(children != null && children.size()>0){
for (String url : children) {
delteAllNode(zk, path+"/"+url);
}
}
zk.delete(path, -1);
logger.info("删除节点:"+path);
} catch (KeeperException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}


public static void main(String[] args) {
//test_addAppUpdate();
String ZKPURL = "120.197.138.35:2181,120.197.138.35:2182,120.197.138.35:2183";
int timeout=3000;
ZooKeeper zk = ApiUtil.getZooKeeperConnection(ZKPURL, timeout,"");
try {
System.out.println(new String(zk.getData("/appUpdate/online/ios/noUpdate_key", wh, null)));
//setValue(zk, "/appUpdate/online/ios/desc", "");
System.out.println(new String(zk.getData("/appUpdate/online/ios/desc", wh, null)));

String str="[{'ch':'123','code':'1012'},{'ch':'abc','code':'1542'}]";

//ApiAppUpdateBean bean = JSON.parseObject(str, new ArrayList()<ApiAppUpdateCodeCh>.class);
Gson gson=new Gson();
List<ApiAppUpdateCodeCh> list=gson.fromJson(str,new TypeToken<List<ApiAppUpdateCodeCh>>() {}.getType());
System.out.println(list);
String json = gson.toJson(list);
System.out.println(json);
} catch (KeeperException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: