Java - zookeeper 服务注册发现
2016-04-22 18:37
2301 查看
网上实际一些的案例严重缺乏,真心不易出此稿~~
1、zookeeper 服务注册发现模型
流程:1)注册,2)发现:监听、负载均衡、故障检测、变更通知,3)调用。
From http://blog.cloudera.com/blog/2014/03/zookeeper-resilience-at-pinterest/
From http://www.techweb.com.cn/network/hardware/2015-12-25/2246973.shtml
From http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.2/bk_hadoop-ha/content/ha-hs2-service-discovery.html
2、单机代码
虚拟机准备:1)启动zookeeper,2)启动多个服务(如ElasticSearch, Tomcat之类的),3)一个服务(如tomcat、elasticsearch)对应多个instance(如节点1、节点2)
流程:
1)准备:CuratorFramework(可选择getClient或newClient方法),ServiceInstance;
2)注册:ServiceDiscovery (注册有两种方式:registerService/thisInstance);
3.1)负载均衡:ServiceProvider providerStrategy()
3.2)启动监听:ServiceProvider start(),一个provider对应一个服务;
4)通知:ServiceProvider getInstance()
5)调用: 有ip、port,想怎么用就怎么用吧~
3、单机负载均衡测试
多个服务是否按照负载均衡策略进行访问,需要利用REST http方式进行测试。单机测试如下:
ZkTest02.java
SpringRestTestClient.java (测试类)
先启动ZkTest02,然后连续运行SpringRestTestClient 几次。ZkTest02那边的结果(sys out):
4、Spring下负载均衡测试代码
另外,也可以用Spring controller来做测试,不过遇到一些引用包的问题,应该与spring相关,由于前面的方法已经满足PoC需求,暂时没有再试下去,有兴趣也可以参考以下代码:
上面的代码只是基于zookeeer znode操作,对服务的监测还是需要自己实现的,比如一个provider - war程序服务,可以做一个REST接口出来进行监听。
5、Q&A
Q: curator没有现成的jar包
A: 上http://search.maven.org/
Q: Connection Timeout
A: 检查zookeeper、服务的端口是否打开(如在centos,打开端口命令为/sbin/iptables -I INPUT -p tcp --dport 2181 -j ACCEPT, /etc/init.d/iptables save, service iptables restart)
Q: base path找不到
A: 启动zookeeper, 运行./zkCli.sh,使用zookeeer命令:ls / 看有没有base_path,没有就新建,如上面的例子,put /es_path/es1,ls /es_path/es1
Q: 在浏览器http访问测试时,报不支持post错误或访问为get方式
A: post方式不能用浏览器http直接访问测试,用代码发送post请求或用postman来测试;
Q: 除了Curator,还有其它实现手段吗
A: (1)zoologist(http://npm.taobao.org/package/zoologist,基于Curator REST接口以Node.js实现的包);(2)Spring Cloud Zookeeper(http://cloud.spring.io/spring-cloud-zookeeper/spring-cloud-zookeeper.html,基于ZK和curator,spring boot上的实现);(3)dubbo(http://dubbo.io/User+Guide-zh.htm,阿里框架,可以基于zookeeper和curator,并支持可视化交互monitor)。
Q: 有没有其它Sample
A: (1)GitHub - curator-playground-master, curator-disco-master, disco-java;(2)Link: http://www.jianshu.com/p/c0ba6d62dd2e
1、zookeeper 服务注册发现模型
流程:1)注册,2)发现:监听、负载均衡、故障检测、变更通知,3)调用。
From http://blog.cloudera.com/blog/2014/03/zookeeper-resilience-at-pinterest/
From http://www.techweb.com.cn/network/hardware/2015-12-25/2246973.shtml
From http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.2/bk_hadoop-ha/content/ha-hs2-service-discovery.html
2、单机代码
虚拟机准备:1)启动zookeeper,2)启动多个服务(如ElasticSearch, Tomcat之类的),3)一个服务(如tomcat、elasticsearch)对应多个instance(如节点1、节点2)
流程:
1)准备:CuratorFramework(可选择getClient或newClient方法),ServiceInstance;
2)注册:ServiceDiscovery (注册有两种方式:registerService/thisInstance);
3.1)负载均衡:ServiceProvider providerStrategy()
3.2)启动监听:ServiceProvider start(),一个provider对应一个服务;
4)通知:ServiceProvider getInstance()
5)调用: 有ip、port,想怎么用就怎么用吧~
import java.io.Closeable; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.retry.RetryNTimes; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.ServiceProvider; import org.apache.curator.x.discovery.UriSpec; import org.apache.curator.x.discovery.details.JsonInstanceSerializer; import org.apache.curator.x.discovery.strategies.RandomStrategy; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import com.google.common.collect.Lists; import com.google.common.collect.Maps; /* * http://www.techweb.com.cn/network/hardware/2015-12-25/2246973.shtml * http://blog.csdn.net/hxpjava1/article/details/8612228 * http://www.tuicool.com/articles/F3Avue * http://www.bkjia.com/Javabc/879495.html * http://curator.apache.org/apidocs/index.html * http://blog.csdn.net/john_f_lau/article/details/50660195 * * http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.curator%22 * share one CuratorFramework per ZooKeeper cluster * By default ServiceProvider uses Round-robin ProviderStrategy */ public class ZkTest01 { private static String basePath = "/es_path"; private static List<Closeable> closeableList = Lists.newArrayList(); // Register worker 1 private static ServiceDiscovery<Void> registerInZookeeper(CuratorFramework client, ServiceInstance<Void> instance) throws Exception { ServiceDiscovery<Void> serviceDiscovery; JsonInstanceSerializer<Void> serializer = new JsonInstanceSerializer<Void>( Void.class); //(3) serviceDiscovery = ServiceDiscoveryBuilder .builder(Void.class) .client(client) .serializer(serializer) .basePath(basePath) //.thisInstance(instance) //注册方式2 .build(); serviceDiscovery.start(); //(4) cd <ZK_ROOT>/bin, ./zkCli.sh return serviceDiscovery; } // Register worker 2 private static ServiceDiscovery<Void> registerInZookeeper2(CuratorFramework client, ServiceInstance<Void> instance) throws Exception { ServiceDiscovery<Void> serviceDiscovery; JsonInstanceSerializer<Void> serializer = new JsonInstanceSerializer<Void>( Void.class); //(3) serviceDiscovery = ServiceDiscoveryBuilder .builder(Void.class) .client(client) .serializer(serializer) .basePath(basePath) //.thisInstance(instance) //注册方式2 .build(); serviceDiscovery.start(); //(4) cd <ZK_ROOT>/bin, ./zkCli.sh return serviceDiscovery; } public static ServiceInstance<Void> getInstanceByName(ServiceDiscovery<Void> serviceDiscovery, String serviceName) throws Exception { //(5) 非注册方式,do not register in ZooKeeper, get instance from ServiceProvider ServiceProvider<Void> provider = serviceDiscovery.serviceProviderBuilder(). serviceName(serviceName). providerStrategy(new RandomStrategy<Void>()) .build(); provider.start(); closeableList.add(provider); return provider.getInstance(); } public static synchronized void close(){ for (Closeable closeable : closeableList) { CloseableUtils.closeQuietly(closeable); } } public static CuratorFramework getClient() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") .sessionTimeoutMs(5000) .connectionTimeoutMs(10000) .retryPolicy(retryPolicy) .namespace("text").build(); client.start(); return client; } public static void main(String[] args) throws Exception { // (1) CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("centos1:2182", new ExponentialBackoffRetry(1000, 1)); curatorFramework.start(); // (2) ServiceInstance<Void> instance1 = ServiceInstance.<Void>builder() .name("es1") .port(8080) .address("centos1") //address不写的话,会取本地ip //.payload(new Void(UUID.ra ndomUUID().toString(),"centos1",9200)) .uriSpec(new UriSpec("{scheme}://{address}:{port}")) .build(); ServiceDiscovery<Void> serviceDiscovery = registerInZookeeper2(curatorFramework, instance1); Iterator<String> ItrServices = serviceDiscovery.queryForNames().iterator(); while(ItrServices.hasNext()) { System.out.println("ItrServices~~~" + ItrServices.next()); } serviceDiscovery.registerService(instance1); //注册方式1 Iterator ItrInstances = serviceDiscovery.queryForInstances("es1").iterator(); while(ItrInstances.hasNext()) { System.out.println("ItrInstances~~~" + ItrInstances.next()); } ServiceInstance<Void> instance_loopup = getInstanceByName(serviceDiscovery,"es1"); System.out.println("address~~~" + instance_loopup.buildUriSpec()); System.out.println("payload~~~" + instance_loopup.getPayload()); //System.out.println("listenAddr~~~" + instance_loopup.getPayload().getListenAddress()); close(); CloseableUtils.closeQuietly(curatorFramework); } /* //register ServiceDiscovery = ServiceDiscoveryBuilder.builder.thisInstance(serviceInstance) start() //lookup (by load balancer) ServiceProvider = serviceDiscovery.serviceProviderBuilder().providerStrategy start() //invoke service ------- CuratorFramework = CuratorFrameworkFactory.newClient / builder start() ServiceInstance = ServiceInstance.builder */ }
3、单机负载均衡测试
多个服务是否按照负载均衡策略进行访问,需要利用REST http方式进行测试。单机测试如下:
ZkTest02.java
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.net.InetSocketAddress; import java.util.concurrent.Executors; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.ServiceProvider; import org.apache.curator.x.discovery.UriSpec; import org.apache.curator.x.discovery.details.JsonInstanceSerializer; import org.apache.curator.x.discovery.strategies.RandomStrategy; import org.apache.curator.x.discovery.strategies.RoundRobinStrategy; import org.slf4j.LoggerFactory; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import com.sun.net.httpserver.HttpServer; import com.sun.net.httpserver.spi.HttpServerProvider; public class ZkTest02 { private static String basePath = "/es_path"; private static ServiceDiscovery<Void> serviceDiscovery; private static ServiceProvider<Void> provider; public static void main(String[] args) throws IOException { httpserverService(); } public static CuratorFramework getClient() { CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("centos1:2182", new ExponentialBackoffRetry(1000, 1)); curatorFramework.start(); return curatorFramework; } public static ServiceDiscovery<Void> getServiceDiscovery(CuratorFramework client) throws Exception { JsonInstanceSerializer<Void> serializer = new JsonInstanceSerializer<Void>( Void.class); ServiceDiscovery<Void> serviceDiscovery = ServiceDiscoveryBuilder .builder(Void.class) .client(client) .serializer(serializer) .basePath(basePath) .build(); serviceDiscovery.start(); return serviceDiscovery; } public static ServiceInstance<Void> getInstance1() throws Exception { ServiceInstance<Void> instance1 = ServiceInstance.<Void>builder() .name("es1") .port(8080) .address("centos1") .uriSpec(new UriSpec("http://centos1:8080")) .build(); return instance1; } public static ServiceInstance<Void> getInstance2() throws Exception { ServiceInstance<Void> instance1 = ServiceInstance.<Void>builder() .name("es1") .port(9200) .address("centos1") .uriSpec(new UriSpec("http://centos1:9200")) .build(); return instance1; } public static ServiceProvider<Void> serviceProvider(ServiceDiscovery<Void> serviceDiscovery, String serviceName) throws Exception { ServiceProvider<Void> provider = serviceDiscovery.serviceProviderBuilder(). serviceName(serviceName). providerStrategy(new RandomStrategy<Void>()) .build(); provider.start(); return provider; } public static void process(String param) throws Exception { if (null == serviceDiscovery) { LoggerFactory.getLogger(""); serviceDiscovery = getServiceDiscovery(getClient()); serviceDiscovery.registerService(getInstance1()); serviceDiscovery.registerService(getInstance2()); } if (null == provider) { provider = serviceProvider(serviceDiscovery, param); } if (null != provider) { System.out.println("address~~~" + provider.getInstance().getAddress() + ", port~~~" + provider.getInstance().getPort()); // provider.close(); } } //启动服务,监听来自客户端的请求 public static void httpserverService() throws IOException { HttpServerProvider provider = HttpServerProvider.provider(); HttpServer httpserver =provider.createHttpServer(new InetSocketAddress(8080), 100);//监听端口6666,能同时接 受100个请求 httpserver.createContext("/sc/es", new MyHttpHandler()); //httpserver.setExecutor(null); //使用单线程 httpserver.setExecutor(Executors.newFixedThreadPool(5)); httpserver.start(); System.out.println("server started"); } //Http请求处理类 static class MyHttpHandler implements HttpHandler { public void handle(HttpExchange httpExchange) throws IOException { String responseMsg = "ok"; //响应信息 InputStream in = httpExchange.getRequestBody(); //获得输入流 BufferedReader reader = new BufferedReader(new InputStreamReader(in)); String temp = null; while((temp = reader.readLine()) != null) { System.out.println("client request:"+temp); try { process(temp); } catch (Exception e) { e.printStackTrace(); } } httpExchange.sendResponseHeaders(200, responseMsg.length()); //设置响应头属性及响应信息的长度 OutputStream out = httpExchange.getResponseBody(); //获得输出流 out.write(responseMsg.getBytes()); out.flush(); httpExchange.close(); } } }
SpringRestTestClient.java (测试类)
import java.net.URI; import org.springframework.web.client.RestClientException; import org.springframework.web.client.RestTemplate; public class SpringRestTestClient { public static final String REST_SERVICE_URI = "http://localhost:8080/sc"; // public static final String REST_SERVICE_URI = "http://localhost:6666/myApp"; private static void listAllUsers(){ RestTemplate restTemplate = new RestTemplate(); String param = "es1"; try { // URI uri = restTemplate.postForLocation(REST_SERVICE_URI+"/es", jsonParam, String.class); // if (null != uri){ // System.out.println("Location : "+uri.toASCIIString()); // } else { // System.out.println("uri is null~"); // } String result = restTemplate.postForObject(REST_SERVICE_URI+"/es", param, String.class); System.out.println("es-result~" + result); } catch (RestClientException e) { e.printStackTrace(); } } public static void main(String[] args) { listAllUsers(); } }
先启动ZkTest02,然后连续运行SpringRestTestClient 几次。ZkTest02那边的结果(sys out):
client request:es1 address~~~centos1, port~~~8080 client request:es1 address~~~centos1, port~~~9200 client request:es1 address~~~centos1, port~~~9200 client request:es1 address~~~centos1, port~~~8080 client request:es1 address~~~centos1, port~~~9200
4、Spring下负载均衡测试代码
另外,也可以用Spring controller来做测试,不过遇到一些引用包的问题,应该与spring相关,由于前面的方法已经满足PoC需求,暂时没有再试下去,有兴趣也可以参考以下代码:
package com.test; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.ServiceProvider; import org.apache.curator.x.discovery.UriSpec; import org.apache.curator.x.discovery.details.JsonInstanceSerializer; import org.apache.curator.x.discovery.strategies.RoundRobinStrategy; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.util.UriComponentsBuilder; @RestController @RequestMapping("/zk") public class ZkController { private static String basePath = "/es_path"; private ServiceDiscovery<Void> serviceDiscovery; @RequestMapping(value = "/es", method = RequestMethod.POST) public String postUser(@RequestBody String param, UriComponentsBuilder ucBuilder) throws Exception { if (null == serviceDiscovery) { LoggerFactory.getLogger(""); serviceDiscovery = getServiceDiscovery(getClient()); serviceDiscovery.registerService(getInstance1()); serviceDiscovery.registerService(getInstance2()); } System.out.println("param~~~" + param); ServiceProvider<Void> provider = serviceProvider(serviceDiscovery, param); if (null != provider) { System.out.println("address~~~" + provider.getInstance().getAddress() + ", port~~~" + provider.getInstance().getPort()); provider.close(); } return param; } public CuratorFramework getClient() { CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("centos1:2182", new ExponentialBackoffRetry(1000, 1)); curatorFramework.start(); return curatorFramework; } public ServiceDiscovery<Void> getServiceDiscovery(CuratorFramework client) throws Exception { JsonInstanceSerializer<Void> serializer = new JsonInstanceSerializer<Void>( Void.class); ServiceDiscovery<Void> serviceDiscovery = ServiceDiscoveryBuilder .builder(Void.class) .client(client) .serializer(serializer) .basePath(basePath) .build(); serviceDiscovery.start(); return serviceDiscovery; } public ServiceInstance<Void> getInstance1() throws Exception { ServiceInstance<Void> instance1 = ServiceInstance.<Void>builder() .name("es1") .port(8080) .address("centos1") .uriSpec(new UriSpec("http://centos1:8080")) .build(); return instance1; } public ServiceInstance<Void> getInstance2() throws Exception { ServiceInstance<Void> instance1 = ServiceInstance.<Void>builder() .name("es1") .port(9200) .address("centos1") .uriSpec(new UriSpec("http://centos1:9200")) .build(); return instance1; } public ServiceProvider<Void> serviceProvider(ServiceDiscovery<Void> serviceDiscovery, String serviceName) throws Exception { ServiceProvider<Void> provider = serviceDiscovery.serviceProviderBuilder(). serviceName(serviceName). providerStrategy(new RoundRobinStrategy<Void>()) .build(); provider.start(); return provider; } }
上面的代码只是基于zookeeer znode操作,对服务的监测还是需要自己实现的,比如一个provider - war程序服务,可以做一个REST接口出来进行监听。
5、Q&A
Q: curator没有现成的jar包
A: 上http://search.maven.org/
Q: Connection Timeout
A: 检查zookeeper、服务的端口是否打开(如在centos,打开端口命令为/sbin/iptables -I INPUT -p tcp --dport 2181 -j ACCEPT, /etc/init.d/iptables save, service iptables restart)
Q: base path找不到
A: 启动zookeeper, 运行./zkCli.sh,使用zookeeer命令:ls / 看有没有base_path,没有就新建,如上面的例子,put /es_path/es1,ls /es_path/es1
Q: 在浏览器http访问测试时,报不支持post错误或访问为get方式
A: post方式不能用浏览器http直接访问测试,用代码发送post请求或用postman来测试;
Q: 除了Curator,还有其它实现手段吗
A: (1)zoologist(http://npm.taobao.org/package/zoologist,基于Curator REST接口以Node.js实现的包);(2)Spring Cloud Zookeeper(http://cloud.spring.io/spring-cloud-zookeeper/spring-cloud-zookeeper.html,基于ZK和curator,spring boot上的实现);(3)dubbo(http://dubbo.io/User+Guide-zh.htm,阿里框架,可以基于zookeeper和curator,并支持可视化交互monitor)。
Q: 有没有其它Sample
A: (1)GitHub - curator-playground-master, curator-disco-master, disco-java;(2)Link: http://www.jianshu.com/p/c0ba6d62dd2e
相关文章推荐
- Eclipse控制台实现清屏的方法
- java 命令1:
- Java程序设计基础
- java常用到的类
- springMVC中controller的几种返回类型
- 四 :spring mvc controller
- spring stopwatch
- (Java实现) HDOJ 2023 求平均成绩 面向对象设计思想
- 二 :springMVC:modelandview,model,controller,参数传递
- [改善Java代码]若有必要,使用变长数组
- HDU2054JAVA
- wget下载jdk
- 一、SpringMVC基础入门,创建一个HelloWorld程序
- 二分查找
- 解决MyEclipse10.7吃内存以及卡死的方法
- java中定义一个CloneUtil 工具类
- Map扩展一对多的使用
- eclipse android logcat 只显示自己应用程序信息的设置方法
- Java并发之volatile二
- Java 优先级的问题