Zookeeper入门编程之对zookeeper节点的增删改查
2017-10-10 09:46
513 查看
Zookeeper入门编程之对zookeeper节点的增删改查
最近做了一个项目,关于一个访问API的开放平台,其中有很重要的一个模块是从mysql数据库到zookeeper节点的数据同步,这一块是我独立负责的,这两天刚刚开发和测试完,其中遇到了一些问题,也有了不少的收获,现在做一个总结。
对于zk,百度百科上有详细的介绍,不知道的童鞋可以自行了解,这里我们只需要明白两点即可:
1:zk是可以单实例或者集群化部署的
2:如果zk以集群化部署,相应会产生多个zk节点,那么只要有超过一半(必须超过)的zk节点宕机,则整个zk集群都将无法正常工作。
对zk操作的pom.xml文件中maven添加的依赖地址如下:
<!-- zk 连接客户端 start -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.8.0</version>
</dependency>
<!-- zk 连接客户端 end -->
要想对zk的节点即zktree进行操作,第一步是与zk建立连接,代码如下:
/**
* @描述:创建一个zookeeper连接
*/
private static void CreateZkclientConnection() {
// 定义zk服务器的ip和port,多个节点的话用","分隔
final String connectString = "197.3.153.159:2181,197.3.153.160:2181,197.3.153.161:2181";
// retryPolicy是连接zk过程中重连策略,两个参数分别代表:两次重连的等待时间和最大重试次数
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 100);
// 创建CuratorFramework实例,创建完成即代表连接zk成功
CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
// 调用start方法打开连接
client.start();
}
与zk建立连接之后就可以对zk节点进行操作了,首先是创建节点,代码如下:
/**
* @描述:创建一个zookeeper节点
* @param path 路径
* @param json 节点名
* @throws Exception
*/
public void createNode(String path, String json) throws Exception {
final String connectString = "197.3.153.159:2181,197.3.153.160:2181,197.3.153.161:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 100);
CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
client.start();
try {
// 传入路径和节点名调用zkclient自身的create方法
client.create().creatingParentsIfNeeded().forPath(path, json.getBytes(GatewayConstant.UTF8));
} catch (Exception e) {
e.printStackTrace();
} finally {
// 对zk操作完成之后要注意关闭连接
if (client != null) {
client.close();
}
}
}
这里对create()方法做个说明:create()---执行创建操作,可以调用额外的方法(比如后台执行background)并在最后调用forPath()指定要操作的znode。
接下来是查询某个路径下的节点,代码如下:
/**
* @描述:查询zookeeper路径的节点
* @param path 要查询节点的路径
* @return attrJson 查询到的节点名
* @throws Exception
*/
public String queryNode(String path) throws Exception {
final String connectString = "197.3.153.159:2181,197.3.153.160:2181,197.3.153.161:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 100);
CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
client.start();
// 定义查询到的节点
String attrJson = null;
try {
// 调用zkclient的getData()方法
byte[] byteNode = client.getData().forPath(path);
// 转成UTF-8格式的字符串
attrJson = new String(byteNode, GatewayConstant.UTF8);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 对zk操作完成之后要注意关闭连接
if (client != null) {
client.close();
}
}
return attrJson;
}
这里对getData()方法做个说明:getData()---执行获取znode节点数据的操作,可以调用额外的方法(比如监控、后台处理或者获取状态watch)并在最后调用forPath()指定要操作的znode。
然后是修改某个路径下的节点值,代码如下:
/**
* @描述:修改zookeeper路径的节点值
* @param path 要修改节点值的路径
* @param json 要修改成的内容
* @throws Exception
*/
public void editNode(String path, String json) throws Exception {
final String connectString = "197.3.153.159:2181,197.3.153.160:2181,197.3.153.161:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 100);
CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
client.start();
try {
// 调用zkclient的setData()方法
client.setData().forPath(path, json.getBytes(GatewayConstant.UTF8));
} catch (Exception e) {
e.printStackTrace();
} finally {
// 对zk操作完成之后要注意关闭连接
if (client != null) {
client.close();
}
}
}
这里对setData()方法做个说明:setData()---执行设置znode节点数据的操作,可以调用额外的方法(比如版本、后台处理)并在最后调用forPath()指定要操作的znode。
然后是删除某个路径的节点,代码如下:
/**
* @描述:删除zookeeper路径的节点
* @param path 要删除节点的路径
* @throws Exception
*/
public void deteleNode(String path) throws Exception {
final String connectString = "197.3.153.159:2181,197.3.153.160:2181,197.3.153.161:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 100);
CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
client.start();
try {
// 调用zkclient的delete()方法
client.delete().forPath(path);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 对zk操作完成之后要注意关闭连接
if (client != null) {
client.close();
}
}
}
这里对delete()方法做个说明:delete()---执行删除znode节点的操作,可以调用额外的方法(比如版本、后台处理)并在最后调用forPath()指定要操作的znode。
上面的增删改查都是在路径和json都确定的情况下进行的,下面给出一个完整的类代码,这个类功能是一键将数据库中的某些数据同步到zk服务器的zktree上,现在暂定zktree上有5个需要同步的节点,分别为:
inbound_channel、accessVerification、authentication、router、outbound_channel,代码如下:
/**
* mysql--->zk一键同步类 适用于插件第一次初始化数据和运行过程中出现问题需要强制同步的场景
* Created by xuzheng on 2017/09.
* @since 3.0.0
*/
@Functions
@Service
public class OneClickSyncFunction {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private GatewayCuratorUtil gatewayCuratorUtil;
@Autowired
private SysParamsMapper sysParamsMapper;
@Autowired
ApiInfoMapper apiInfoMapper;
@Autowired
AuthenticationInfoMapper authenticationInfoMapper;
@Autowired
AccessAuthorityMapper accessAuthorityMapper;
@Autowired
AuthenticationConfigFunctions authenticationConfigFunction;
@Autowired
AccessVerificationConfigFunctions accessVerificationConfigFunction;
@Autowired
RouterConfigFunctions routerConfigFunctions;
@Autowired
OutboundChannelConfigFunctions outboundChannelConfigFunction;
/**
* 数据同步一键执行所有插件
* @throws Exception
*/
@Function
public void dataSync() throws Exception {
// 加载本地配置文件,获取大插件名
Map<String, String> config = LoadConfigurationDocument.LoadConfiguration();
String plugName = config.get("plugName");
// 用插件名和插件节点名获取节点路径
String nodePath = GatewayZkPathUtil.getNodepath(plugName);
String path = nodePath.substring(0, nodePath.length() - 1);
// 获取该路径下节点名的集合,如:[authentication]
List<String> nodeList = GatewayCuratorUtil.queryList(path);
String idAuthen = "authentication";
Boolean blAuthen = nodeList.contains(idAuthen);
// 如果集合中有身份验证节点名:authentication,调用身份验证同步方法
if (blAuthen) {
authenticationDataSync(plugName, idAuthen);
}
String idAccess = "accessVerification";
Boolean blAccess = nodeList.contains(idAccess);
// 如果集合中有权限校验节点名:accessVerification,调用权限校验同步方法
if (blAccess) {
accessVerificationDataSync(plugName, idAccess);
}
String idRouter = "router";
Boolean blRouter = nodeList.contains(idRouter);
// 如果集合中有router节点名:router,调用router同步方法
if (blRouter) {
routerDataSync(plugName, idRouter);
}
String idOutboundChannel = "outbound_channel";
Boolean blOutboundChannel = nodeList.contains(idOutboundChannel);
// 如果集合中有outbound_channel节点名:outbound_channel,调用outbound_channel同步方法
if (blOutboundChannel) {
outboundChannelDataSync(plugName);
}
}
/**
* 身份验证一键同步方法
* @param plugName 大插件名
* @param id 小插件名
*/
public void authenticationDataSync(String plugName, String id) throws Exception {
logger.info("+++++ 身份验证节点数据同步开始 +++++");
// 创建要同步节点数据实体对象
AuthenticationConfig authenticationConfig = new AuthenticationConfig();
// 定义api级别的map
Map<String, String> uriGradeMap = new HashMap<>();
// 定义app认证信息map
Map<String, String> authenticationMap = new HashMap<>();
// 从数据库中查询出需要同步的参数,包括
// JWT秘钥:keyStr
String keyStr = sysParamsMapper.queryJwtKeystr();
authenticationConfig.setKeyStr(keyStr);
// Token的有效时间:ttlMillis
String ttlMillis = sysParamsMapper.queryTokenTtlMillis();
authenticationConfig.setTtlMillis(Long.valueOf(ttlMillis));
// 查询API信息表,用查询出的元素组装uriGradeMap
List<ApiInfo> apiInfoList = apiInfoMapper.queryApiInfoList();
// apiInfoList不为空则从apiInfoList循环取出每个元素--ApiInfo的实体
if (apiInfoList != null) {
for (int i = 0; i < apiInfoList.size(); i++) {
ApiInfo apiInfo = apiInfoList.get(i);
// 再从apiInfo实体中取出需要组装的字段,包括:version,openURI,api_level
String version = apiInfo.getVersion();
String openUri = apiInfo.getOpenuri();
Integer apiLevel = apiInfo.getApiLevel();
// 组装放到uriGradeMap中
String versionAddUri = GatewayConstant.PATH_SEPARATOR+ GatewayConstant.VSEPARATOR
+ version+ GatewayConstant.PATH_SEPARATOR + openUri;
if (apiLevel == null) {
logger.error("同步身份验证节点时发现API信息列表参数有误:api级别不能为空"+ apiInfo.getApiId());
throw new TeslaApplicationException("api级别不能为空");
} else {
String apiLevelString = String.valueOf(apiLevel);
uriGradeMap.put(versionAddUri, apiLevelString);
}
}
} else {
logger.error("同步身份验证节点时发现API信息列表信息有误:API信息列表为空");
throw new TeslaApplicationException("API信息列表不能为空");
}
authenticationConfig.setUriGradeMap(uriGradeMap);
// 查询鉴权信息表,用查询出的元素组装authenticationMap
List<AuthenticationInfo> authenticationInfoList = authenticationInfoMapper.queryAuthenticationInfo();
// 从authenticationInfoList循环取出每个元素--ApiInfo的实体
for (int j = 0; j < authenticationInfoList.size(); j++) {
AuthenticationInfo authenticationInfo = authenticationInfoList.get(j);
// 再从authenticationInfo实体中取出需要组装的字段,包括:appkey,secret
String appkey = authenticationInfo.getAppkey();
String secret = authenticationInfo.getSecret();
// 放到authenticationMap中
authenticationMap.put(appkey, secret);
}
authenticationConfig.setAuthenticationMap(authenticationMap);
// 同步之前需要把子节点强行设置为true,插件名不变
authenticationConfig.setAvailable(true);
authenticationConfig.setId(id);
// 调用单节点同步方法执行同步操作
authenticationConfigFunction.editAuthentication(authenticationConfig,plugName, id);
logger.info("+++++ 身份验证节点数据同步结束 +++++");
}
/**
* 权限校验一键同步方法
* @param plugName 大插件名
* @param id 小插件名
*/
public void accessVerificationDataSync(String plugName, String id) throws Exception {
logger.info("+++++ 权限校验节点数据同步开始 +++++");
// 创建要同步节点数据实体对象
AccessVerificationConfig accessVerificationConfig = new AccessVerificationConfig();
// 定义app对api的访问权限关系map,以${appkey+path}为key,以""为value
Map<String, String> accessVerificationMap = new HashMap<>();
// 从数据库中查询出需要同步的参数,包括
// 从访问权限表查出所有的appId
List<String> appidList = accessAuthorityMapper.queryAppid();
// appidList不为空则循环取出每一个appId去查询该appid对应的appkey和关联的apiid
if (appidList != null) {
for (int k = 0; k < appidList.size(); k++) {
String appId = appidList.get(k);
// 从鉴权信息表查该appId对应的appkey(唯一)
String appkey = authenticationInfoMapper.queryAppkeyByAppid(appId);
// 从访问权限表查该appId关联的apiId(不唯一)
List<String> apiidList = accessAuthorityMapper.queryApiidByAppid(appId);
// 循环取出每一个apiid去查询api信息表
for (int l = 0; l < apiidList.size(); l++) {
String apiid = apiidList.get(l);
ApiInfo apiInfo = apiInfoMapper.queryApiInfoByApiid(apiid);
if (apiInfo != null) {
// 从apiInfo校验API是否废弃
int availability = apiInfo.getAvailability();
// 如果
// availability为1代表API有效,从apiInfo中取出version和OPenUri
if (availability == 1) {
String version = apiInfo.getVersion();
String openUri = apiInfo.getOpenuri();
// 拼接appkey+version+uri放到map的key中
String accessMapkey = appkey+ GatewayConstant.PATH_SEPARATOR
+ GatewayConstant.VSEPARATOR + version
+ GatewayConstant.PATH_SEPARATOR + openUri;
accessVerificationMap.put(accessMapkey, "");
}
}
}
}
} else {
logger.error("同步权限校验节点时发现APP访问权限表信息列表信息有误:appid都为空");
throw new TeslaApplicationException("appid不能都为空");
}
accessVerificationConfig.setAccessVerificationMap(accessVerificationMap);
// 同步之前需要把子节点强行设置为true,插件名不变
accessVerificationConfig.setAvailable(true);
accessVerificationConfig.setId(id);
// 调用单节点同步方法执行同步操作
accessVerificationConfigFunction.editAccessVerification(
accessVerificationConfig, plugName, id);
logger.info("+++++ 权限校验节点数据同步结束 +++++");
}
/**
* router一键同步方法
* @param plugName 大插件名
* @param id 小插件名
*/
public void routerDataSync(String plugName, String id) throws Exception {
logger.info("+++++ router节点数据同步开始 +++++");
// 创建要同步节点数据实体对象
RouterConfig routerConfig = new RouterConfig();
// 定义APP对uri转发配置map,以${pathToken}->/v+version+openuri为key,
// 以clientChannel+apiid为值
Map<String, String> routerMap = new HashMap<>();
// 从数据库中查询出需要同步的参数,包括
// 查询API信息表
List<ApiInfo> apiInfoList = apiInfoMapper.queryApiInfoList();
// apiInfoList不为空则从apiInfoList循环取出每个元素--ApiInfo的实体
if (apiInfoList != null) {
for (int i = 0; i < apiInfoList.size(); i++) {
ApiInfo apiInfo = apiInfoList.get(i);
// 再从apiInfo实体中取出availability校验API是否废弃
Integer availability = apiInfo.getAvailability();
// 如果 availability为1代表API有效,从apiInfo中取出version和OPenUri
if (availability != null && availability == 1) {
String version = apiInfo.getVersion();
String openUri = apiInfo.getOpenuri();
// 拼接routerMap的key和值
String routerMapKey = GatewayConstant.PATHTOKEN+ GatewayConstant.PATH_SEPARATOR
+ GatewayConstant.VSEPARATOR + version
+ GatewayConstant.PATH_SEPARATOR + openUri;
// 从apiInfo中取出apiid拼接value
String apiId = apiInfo.getApiId();
String routerMapValue = GatewayConstant.CLIENTCHANNEL+ apiId;
// 给routerMap赋值
routerMap.put(routerMapKey, routerMapValue);
}
}
} else {
logger.error("同步router节点时发现API信息列表信息有误:API信息列表为空");
throw new TeslaApplicationException("API信息列表不能为空");
}
routerConfig.setRouterMap(routerMap);
// 同步之前需要把子节点强行设置为true,插件名不变,type依旧为default
routerConfig.setAvailable(true);
routerConfig.setId(id);
routerConfig.setType("default");
// 调用单节点同步方法执行同步操作
routerConfigFunctions.editRouterConfig(routerConfig, plugName, id);
logger.info("+++++ router节点数据同步结束 +++++");
}
/**
* outbound_channel一键同步方法
* @param plugName 大插件名
*/
public void outboundChannelDataSync(String plugName) throws Exception {
logger.info("+++++ outbound_channel节点数据同步开始 +++++");
// 创建要同步节点数据实体对象
// channel
OutboundChannelParent outboundChannelParent = new OutboundChannelParent();
// connector
OutboundChannelConfig outboundChannelConfig = new OutboundChannelConfig();
// connectionConfig
ConnectionConfig connectionConfig = new ConnectionConfig();
// 查询API信息表
List<ApiInfo> apiInfoList = apiInfoMapper.queryApiInfoList();
// 若apiInfoList不为空则从apiInfoList循环取出每个元素--ApiInfo的实体
if (apiInfoList != null) {
for (int i = 0; i < apiInfoList.size(); i++) {
ApiInfo apiInfo = apiInfoList.get(i);
// 再从apiInfo实体中取出availability校验API是否废弃
Integer availability = apiInfo.getAvailability();
// 如果 availability为1代表API有效
if (availability != null && availability == 1) {
// 先给channel赋默认值
OutboundChannelParent defaultOutboundChannelParent = outboundChannelParent.defaultOutboundChannelParent();
// 从apiInfo中取出apiid
String apiId = apiInfo.getApiId();
// 拼接clientChannelApiid
// set给defaultOutboundChannelParent的id
String clientChannelApiid = defaultOutboundChannelParent.getId() + apiId;
defaultOutboundChannelParent.setId(clientChannelApiid);
// 给connector赋默认值
OutboundChannelConfig defaultOutboundChannelConfig = outboundChannelConfig.defaultOutboundChannelConfig();
// 给connector赋默认值中的connectionConfig赋默认值
ConnectionConfig defaultConnectionConfig = connectionConfig.defaultConnectionConfig();
// defaultConnectionConfig
// set给defaultOutboundChannelConfig
defaultOutboundChannelConfig.setConnectionConfig(defaultConnectionConfig);
// 从apiInfo中取出Inneruri,set给defaultOutboundChannelConfig
String innerUri = apiInfo.getInneruri();
defaultOutboundChannelConfig.setUri(innerUri);
// 拼接clientConnectorApiid
// set给defaultOutboundChannelConfig的id
String clientConnectorApiid = defaultOutboundChannelConfig.getId() + apiId;
defaultOutboundChannelConfig.setId(clientConnectorApiid);
// 将初始化好的defaultOutboundChannelParent、defaultOutboundChannelConfig调用创建渠道和连接器的方法
outboundChannelConfigFunction.createOutboundChannelParent(plugName, defaultOutboundChannelParent,defaultOutboundChannelConfig);
}
}
} else {
logger.error("同步outbound_channel节点时发现API信息列表信息有误:API信息列表为空");
throw new TeslaApplicationException("API信息列表不能为空");
}
logger.info("+++++ outbound_channel节点数据同步结束 +++++");
}
}
这个类的代码稍微有点长,关键是看里面对从mysql同步到zk的逻辑和方法,暂时就先写这么多,之后会把zktree的截图放上去的。
最近做了一个项目,关于一个访问API的开放平台,其中有很重要的一个模块是从mysql数据库到zookeeper节点的数据同步,这一块是我独立负责的,这两天刚刚开发和测试完,其中遇到了一些问题,也有了不少的收获,现在做一个总结。
对于zk,百度百科上有详细的介绍,不知道的童鞋可以自行了解,这里我们只需要明白两点即可:
1:zk是可以单实例或者集群化部署的
2:如果zk以集群化部署,相应会产生多个zk节点,那么只要有超过一半(必须超过)的zk节点宕机,则整个zk集群都将无法正常工作。
对zk操作的pom.xml文件中maven添加的依赖地址如下:
<!-- zk 连接客户端 start -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.8.0</version>
</dependency>
<!-- zk 连接客户端 end -->
要想对zk的节点即zktree进行操作,第一步是与zk建立连接,代码如下:
/**
* @描述:创建一个zookeeper连接
*/
private static void CreateZkclientConnection() {
// 定义zk服务器的ip和port,多个节点的话用","分隔
final String connectString = "197.3.153.159:2181,197.3.153.160:2181,197.3.153.161:2181";
// retryPolicy是连接zk过程中重连策略,两个参数分别代表:两次重连的等待时间和最大重试次数
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 100);
// 创建CuratorFramework实例,创建完成即代表连接zk成功
CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
// 调用start方法打开连接
client.start();
}
与zk建立连接之后就可以对zk节点进行操作了,首先是创建节点,代码如下:
/**
* @描述:创建一个zookeeper节点
* @param path 路径
* @param json 节点名
* @throws Exception
*/
public void createNode(String path, String json) throws Exception {
final String connectString = "197.3.153.159:2181,197.3.153.160:2181,197.3.153.161:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 100);
CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
client.start();
try {
// 传入路径和节点名调用zkclient自身的create方法
client.create().creatingParentsIfNeeded().forPath(path, json.getBytes(GatewayConstant.UTF8));
} catch (Exception e) {
e.printStackTrace();
} finally {
// 对zk操作完成之后要注意关闭连接
if (client != null) {
client.close();
}
}
}
这里对create()方法做个说明:create()---执行创建操作,可以调用额外的方法(比如后台执行background)并在最后调用forPath()指定要操作的znode。
接下来是查询某个路径下的节点,代码如下:
/**
* @描述:查询zookeeper路径的节点
* @param path 要查询节点的路径
* @return attrJson 查询到的节点名
* @throws Exception
*/
public String queryNode(String path) throws Exception {
final String connectString = "197.3.153.159:2181,197.3.153.160:2181,197.3.153.161:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 100);
CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
client.start();
// 定义查询到的节点
String attrJson = null;
try {
// 调用zkclient的getData()方法
byte[] byteNode = client.getData().forPath(path);
// 转成UTF-8格式的字符串
attrJson = new String(byteNode, GatewayConstant.UTF8);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 对zk操作完成之后要注意关闭连接
if (client != null) {
client.close();
}
}
return attrJson;
}
这里对getData()方法做个说明:getData()---执行获取znode节点数据的操作,可以调用额外的方法(比如监控、后台处理或者获取状态watch)并在最后调用forPath()指定要操作的znode。
然后是修改某个路径下的节点值,代码如下:
/**
* @描述:修改zookeeper路径的节点值
* @param path 要修改节点值的路径
* @param json 要修改成的内容
* @throws Exception
*/
public void editNode(String path, String json) throws Exception {
final String connectString = "197.3.153.159:2181,197.3.153.160:2181,197.3.153.161:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 100);
CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
client.start();
try {
// 调用zkclient的setData()方法
client.setData().forPath(path, json.getBytes(GatewayConstant.UTF8));
} catch (Exception e) {
e.printStackTrace();
} finally {
// 对zk操作完成之后要注意关闭连接
if (client != null) {
client.close();
}
}
}
这里对setData()方法做个说明:setData()---执行设置znode节点数据的操作,可以调用额外的方法(比如版本、后台处理)并在最后调用forPath()指定要操作的znode。
然后是删除某个路径的节点,代码如下:
/**
* @描述:删除zookeeper路径的节点
* @param path 要删除节点的路径
* @throws Exception
*/
public void deteleNode(String path) throws Exception {
final String connectString = "197.3.153.159:2181,197.3.153.160:2181,197.3.153.161:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 100);
CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
client.start();
try {
// 调用zkclient的delete()方法
client.delete().forPath(path);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 对zk操作完成之后要注意关闭连接
if (client != null) {
client.close();
}
}
}
这里对delete()方法做个说明:delete()---执行删除znode节点的操作,可以调用额外的方法(比如版本、后台处理)并在最后调用forPath()指定要操作的znode。
上面的增删改查都是在路径和json都确定的情况下进行的,下面给出一个完整的类代码,这个类功能是一键将数据库中的某些数据同步到zk服务器的zktree上,现在暂定zktree上有5个需要同步的节点,分别为:
inbound_channel、accessVerification、authentication、router、outbound_channel,代码如下:
/**
* mysql--->zk一键同步类 适用于插件第一次初始化数据和运行过程中出现问题需要强制同步的场景
* Created by xuzheng on 2017/09.
* @since 3.0.0
*/
@Functions
@Service
public class OneClickSyncFunction {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private GatewayCuratorUtil gatewayCuratorUtil;
@Autowired
private SysParamsMapper sysParamsMapper;
@Autowired
ApiInfoMapper apiInfoMapper;
@Autowired
AuthenticationInfoMapper authenticationInfoMapper;
@Autowired
AccessAuthorityMapper accessAuthorityMapper;
@Autowired
AuthenticationConfigFunctions authenticationConfigFunction;
@Autowired
AccessVerificationConfigFunctions accessVerificationConfigFunction;
@Autowired
RouterConfigFunctions routerConfigFunctions;
@Autowired
OutboundChannelConfigFunctions outboundChannelConfigFunction;
/**
* 数据同步一键执行所有插件
* @throws Exception
*/
@Function
public void dataSync() throws Exception {
// 加载本地配置文件,获取大插件名
Map<String, String> config = LoadConfigurationDocument.LoadConfiguration();
String plugName = config.get("plugName");
// 用插件名和插件节点名获取节点路径
String nodePath = GatewayZkPathUtil.getNodepath(plugName);
String path = nodePath.substring(0, nodePath.length() - 1);
// 获取该路径下节点名的集合,如:[authentication]
List<String> nodeList = GatewayCuratorUtil.queryList(path);
String idAuthen = "authentication";
Boolean blAuthen = nodeList.contains(idAuthen);
// 如果集合中有身份验证节点名:authentication,调用身份验证同步方法
if (blAuthen) {
authenticationDataSync(plugName, idAuthen);
}
String idAccess = "accessVerification";
Boolean blAccess = nodeList.contains(idAccess);
// 如果集合中有权限校验节点名:accessVerification,调用权限校验同步方法
if (blAccess) {
accessVerificationDataSync(plugName, idAccess);
}
String idRouter = "router";
Boolean blRouter = nodeList.contains(idRouter);
// 如果集合中有router节点名:router,调用router同步方法
if (blRouter) {
routerDataSync(plugName, idRouter);
}
String idOutboundChannel = "outbound_channel";
Boolean blOutboundChannel = nodeList.contains(idOutboundChannel);
// 如果集合中有outbound_channel节点名:outbound_channel,调用outbound_channel同步方法
if (blOutboundChannel) {
outboundChannelDataSync(plugName);
}
}
/**
* 身份验证一键同步方法
* @param plugName 大插件名
* @param id 小插件名
*/
public void authenticationDataSync(String plugName, String id) throws Exception {
logger.info("+++++ 身份验证节点数据同步开始 +++++");
// 创建要同步节点数据实体对象
AuthenticationConfig authenticationConfig = new AuthenticationConfig();
// 定义api级别的map
Map<String, String> uriGradeMap = new HashMap<>();
// 定义app认证信息map
Map<String, String> authenticationMap = new HashMap<>();
// 从数据库中查询出需要同步的参数,包括
// JWT秘钥:keyStr
String keyStr = sysParamsMapper.queryJwtKeystr();
authenticationConfig.setKeyStr(keyStr);
// Token的有效时间:ttlMillis
String ttlMillis = sysParamsMapper.queryTokenTtlMillis();
authenticationConfig.setTtlMillis(Long.valueOf(ttlMillis));
// 查询API信息表,用查询出的元素组装uriGradeMap
List<ApiInfo> apiInfoList = apiInfoMapper.queryApiInfoList();
// apiInfoList不为空则从apiInfoList循环取出每个元素--ApiInfo的实体
if (apiInfoList != null) {
for (int i = 0; i < apiInfoList.size(); i++) {
ApiInfo apiInfo = apiInfoList.get(i);
// 再从apiInfo实体中取出需要组装的字段,包括:version,openURI,api_level
String version = apiInfo.getVersion();
String openUri = apiInfo.getOpenuri();
Integer apiLevel = apiInfo.getApiLevel();
// 组装放到uriGradeMap中
String versionAddUri = GatewayConstant.PATH_SEPARATOR+ GatewayConstant.VSEPARATOR
+ version+ GatewayConstant.PATH_SEPARATOR + openUri;
if (apiLevel == null) {
logger.error("同步身份验证节点时发现API信息列表参数有误:api级别不能为空"+ apiInfo.getApiId());
throw new TeslaApplicationException("api级别不能为空");
} else {
String apiLevelString = String.valueOf(apiLevel);
uriGradeMap.put(versionAddUri, apiLevelString);
}
}
} else {
logger.error("同步身份验证节点时发现API信息列表信息有误:API信息列表为空");
throw new TeslaApplicationException("API信息列表不能为空");
}
authenticationConfig.setUriGradeMap(uriGradeMap);
// 查询鉴权信息表,用查询出的元素组装authenticationMap
List<AuthenticationInfo> authenticationInfoList = authenticationInfoMapper.queryAuthenticationInfo();
// 从authenticationInfoList循环取出每个元素--ApiInfo的实体
for (int j = 0; j < authenticationInfoList.size(); j++) {
AuthenticationInfo authenticationInfo = authenticationInfoList.get(j);
// 再从authenticationInfo实体中取出需要组装的字段,包括:appkey,secret
String appkey = authenticationInfo.getAppkey();
String secret = authenticationInfo.getSecret();
// 放到authenticationMap中
authenticationMap.put(appkey, secret);
}
authenticationConfig.setAuthenticationMap(authenticationMap);
// 同步之前需要把子节点强行设置为true,插件名不变
authenticationConfig.setAvailable(true);
authenticationConfig.setId(id);
// 调用单节点同步方法执行同步操作
authenticationConfigFunction.editAuthentication(authenticationConfig,plugName, id);
logger.info("+++++ 身份验证节点数据同步结束 +++++");
}
/**
* 权限校验一键同步方法
* @param plugName 大插件名
* @param id 小插件名
*/
public void accessVerificationDataSync(String plugName, String id) throws Exception {
logger.info("+++++ 权限校验节点数据同步开始 +++++");
// 创建要同步节点数据实体对象
AccessVerificationConfig accessVerificationConfig = new AccessVerificationConfig();
// 定义app对api的访问权限关系map,以${appkey+path}为key,以""为value
Map<String, String> accessVerificationMap = new HashMap<>();
// 从数据库中查询出需要同步的参数,包括
// 从访问权限表查出所有的appId
List<String> appidList = accessAuthorityMapper.queryAppid();
// appidList不为空则循环取出每一个appId去查询该appid对应的appkey和关联的apiid
if (appidList != null) {
for (int k = 0; k < appidList.size(); k++) {
String appId = appidList.get(k);
// 从鉴权信息表查该appId对应的appkey(唯一)
String appkey = authenticationInfoMapper.queryAppkeyByAppid(appId);
// 从访问权限表查该appId关联的apiId(不唯一)
List<String> apiidList = accessAuthorityMapper.queryApiidByAppid(appId);
// 循环取出每一个apiid去查询api信息表
for (int l = 0; l < apiidList.size(); l++) {
String apiid = apiidList.get(l);
ApiInfo apiInfo = apiInfoMapper.queryApiInfoByApiid(apiid);
if (apiInfo != null) {
// 从apiInfo校验API是否废弃
int availability = apiInfo.getAvailability();
// 如果
// availability为1代表API有效,从apiInfo中取出version和OPenUri
if (availability == 1) {
String version = apiInfo.getVersion();
String openUri = apiInfo.getOpenuri();
// 拼接appkey+version+uri放到map的key中
String accessMapkey = appkey+ GatewayConstant.PATH_SEPARATOR
+ GatewayConstant.VSEPARATOR + version
+ GatewayConstant.PATH_SEPARATOR + openUri;
accessVerificationMap.put(accessMapkey, "");
}
}
}
}
} else {
logger.error("同步权限校验节点时发现APP访问权限表信息列表信息有误:appid都为空");
throw new TeslaApplicationException("appid不能都为空");
}
accessVerificationConfig.setAccessVerificationMap(accessVerificationMap);
// 同步之前需要把子节点强行设置为true,插件名不变
accessVerificationConfig.setAvailable(true);
accessVerificationConfig.setId(id);
// 调用单节点同步方法执行同步操作
accessVerificationConfigFunction.editAccessVerification(
accessVerificationConfig, plugName, id);
logger.info("+++++ 权限校验节点数据同步结束 +++++");
}
/**
* router一键同步方法
* @param plugName 大插件名
* @param id 小插件名
*/
public void routerDataSync(String plugName, String id) throws Exception {
logger.info("+++++ router节点数据同步开始 +++++");
// 创建要同步节点数据实体对象
RouterConfig routerConfig = new RouterConfig();
// 定义APP对uri转发配置map,以${pathToken}->/v+version+openuri为key,
// 以clientChannel+apiid为值
Map<String, String> routerMap = new HashMap<>();
// 从数据库中查询出需要同步的参数,包括
// 查询API信息表
List<ApiInfo> apiInfoList = apiInfoMapper.queryApiInfoList();
// apiInfoList不为空则从apiInfoList循环取出每个元素--ApiInfo的实体
if (apiInfoList != null) {
for (int i = 0; i < apiInfoList.size(); i++) {
ApiInfo apiInfo = apiInfoList.get(i);
// 再从apiInfo实体中取出availability校验API是否废弃
Integer availability = apiInfo.getAvailability();
// 如果 availability为1代表API有效,从apiInfo中取出version和OPenUri
if (availability != null && availability == 1) {
String version = apiInfo.getVersion();
String openUri = apiInfo.getOpenuri();
// 拼接routerMap的key和值
String routerMapKey = GatewayConstant.PATHTOKEN+ GatewayConstant.PATH_SEPARATOR
+ GatewayConstant.VSEPARATOR + version
+ GatewayConstant.PATH_SEPARATOR + openUri;
// 从apiInfo中取出apiid拼接value
String apiId = apiInfo.getApiId();
String routerMapValue = GatewayConstant.CLIENTCHANNEL+ apiId;
// 给routerMap赋值
routerMap.put(routerMapKey, routerMapValue);
}
}
} else {
logger.error("同步router节点时发现API信息列表信息有误:API信息列表为空");
throw new TeslaApplicationException("API信息列表不能为空");
}
routerConfig.setRouterMap(routerMap);
// 同步之前需要把子节点强行设置为true,插件名不变,type依旧为default
routerConfig.setAvailable(true);
routerConfig.setId(id);
routerConfig.setType("default");
// 调用单节点同步方法执行同步操作
routerConfigFunctions.editRouterConfig(routerConfig, plugName, id);
logger.info("+++++ router节点数据同步结束 +++++");
}
/**
* outbound_channel一键同步方法
* @param plugName 大插件名
*/
public void outboundChannelDataSync(String plugName) throws Exception {
logger.info("+++++ outbound_channel节点数据同步开始 +++++");
// 创建要同步节点数据实体对象
// channel
OutboundChannelParent outboundChannelParent = new OutboundChannelParent();
// connector
OutboundChannelConfig outboundChannelConfig = new OutboundChannelConfig();
// connectionConfig
ConnectionConfig connectionConfig = new ConnectionConfig();
// 查询API信息表
List<ApiInfo> apiInfoList = apiInfoMapper.queryApiInfoList();
// 若apiInfoList不为空则从apiInfoList循环取出每个元素--ApiInfo的实体
if (apiInfoList != null) {
for (int i = 0; i < apiInfoList.size(); i++) {
ApiInfo apiInfo = apiInfoList.get(i);
// 再从apiInfo实体中取出availability校验API是否废弃
Integer availability = apiInfo.getAvailability();
// 如果 availability为1代表API有效
if (availability != null && availability == 1) {
// 先给channel赋默认值
OutboundChannelParent defaultOutboundChannelParent = outboundChannelParent.defaultOutboundChannelParent();
// 从apiInfo中取出apiid
String apiId = apiInfo.getApiId();
// 拼接clientChannelApiid
// set给defaultOutboundChannelParent的id
String clientChannelApiid = defaultOutboundChannelParent.getId() + apiId;
defaultOutboundChannelParent.setId(clientChannelApiid);
// 给connector赋默认值
OutboundChannelConfig defaultOutboundChannelConfig = outboundChannelConfig.defaultOutboundChannelConfig();
// 给connector赋默认值中的connectionConfig赋默认值
ConnectionConfig defaultConnectionConfig = connectionConfig.defaultConnectionConfig();
// defaultConnectionConfig
// set给defaultOutboundChannelConfig
defaultOutboundChannelConfig.setConnectionConfig(defaultConnectionConfig);
// 从apiInfo中取出Inneruri,set给defaultOutboundChannelConfig
String innerUri = apiInfo.getInneruri();
defaultOutboundChannelConfig.setUri(innerUri);
// 拼接clientConnectorApiid
// set给defaultOutboundChannelConfig的id
String clientConnectorApiid = defaultOutboundChannelConfig.getId() + apiId;
defaultOutboundChannelConfig.setId(clientConnectorApiid);
// 将初始化好的defaultOutboundChannelParent、defaultOutboundChannelConfig调用创建渠道和连接器的方法
outboundChannelConfigFunction.createOutboundChannelParent(plugName, defaultOutboundChannelParent,defaultOutboundChannelConfig);
}
}
} else {
logger.error("同步outbound_channel节点时发现API信息列表信息有误:API信息列表为空");
throw new TeslaApplicationException("API信息列表不能为空");
}
logger.info("+++++ outbound_channel节点数据同步结束 +++++");
}
}
这个类的代码稍微有点长,关键是看里面对从mysql同步到zk的逻辑和方法,暂时就先写这么多,之后会把zktree的截图放上去的。
相关文章推荐
- Zookeeper入门之使用curator连接zookeeper并且进行节点的增删改查及ACL
- Zookeeper入门之使用curator连接zookeeper并且进行节点的增删改查及ACL
- Zookeeper入门之使用curator连接zookeeper并且进行节点的增删改查及ACL
- ZooKeeper客户端 zkCli.sh 节点的增删改查
- javawebday07补充(jaxp增删查替换节点 2个工厂Document Transformer schema约束入门)
- 基本DELPHI中XML编程--简单节点增删改查
- ZooKeeper客户端 zkCli.sh 节点的增删改查
- 网页编程--DOM之节点的增删改查
- Zookeeper之Java入门应用以及临时节点-yellowcong
- zookeeper学习之zkclient节点增删改查<九>
- Zookeeper.NET Client(三)【Znode节点增删改查】
- zookeeper节点的增删改查小demo
- 双链表:实现基本的增删查改,正反向现实双链表的节点
- OpenGL ES编程入门资源集合
- MFC 数据库编程 增删改查的一个例子
- 《Java从入门到放弃》JavaSE入门篇:网络编程(入门版)
- 编程之美。求二叉树节点最大距离
- zookeeper windows 入门安装和测试
- VC-----ADO数据库编程入门
- oracle编程入门笔记2015-01-27--分析函数性能