如何写一个RPC框架(三):服务注册与服务发现
2017-11-01 23:29
465 查看
在后续一段时间里, 我会写一系列文章来讲述如何实现一个RPC框架(我已经实现了一个示例框架, 代码在我的github上)。 这是系列第三篇文章, 主要讲述了服务注册和服务发现这一块。
在系列的第一篇文章中提到,我们的RPC框架需要有一个服务注册中心。 通过这个中心,服务可以把自己的信息注册进来,也可以获取到别的服务的信息(例如ip、端口、版本信息等)。这一块有个统一的名称,叫服务发现。
对于服务发现,现在有很多可供选择的工具,例如zookeeper, etcd或者是consul等。 有一篇文章专门对这三个工具做了对比: 服务发现:Zookeeper vs etcd vs Consul。 在我的框架中, 我选择使用Consul来实现服务发现。对于Consul不了解的朋友可以去看我之前写的关于Consul的博客。
Consul客户端也有一些Java的实现,我用到了consul-api。
这个接口很简单,向服务注册中心注册自己的地址。
对应的consul的实现:
这里我向consul注册服务的时候,还设定了健康状态检查方式为TCP连接方式, 即每过一秒,consul都会尝试与该地址建立TCP连接以验证服务状态。 除了TCP连接之外,consul还提供了http、ttl等多种检查方式。
另外一点值得注意的是,要确保id绝对唯一。 我能想到的比较直观的解决方案是serviceName + 本机ip + 本机port的组合。
首先,我们定义一个接口:
这个接口很简单,传入serviceName,获取一个可以访问的该service的地址。
对应的consul的实现:
这里我用到了LoadBalancer, 关于LB这块,后续文章会专门讲述。 除此之外, 这里的核心是在获取完服务地址之后,会watch该服务地址的变化, 并更新对应的LB中的地址列表。
就这样, 一个简单的服务注册与发现功能就实现了。 完整代码请看我的github。
在系列的第一篇文章中提到,我们的RPC框架需要有一个服务注册中心。 通过这个中心,服务可以把自己的信息注册进来,也可以获取到别的服务的信息(例如ip、端口、版本信息等)。这一块有个统一的名称,叫服务发现。
对于服务发现,现在有很多可供选择的工具,例如zookeeper, etcd或者是consul等。 有一篇文章专门对这三个工具做了对比: 服务发现:Zookeeper vs etcd vs Consul。 在我的框架中, 我选择使用Consul来实现服务发现。对于Consul不了解的朋友可以去看我之前写的关于Consul的博客。
Consul客户端也有一些Java的实现,我用到了consul-api。
服务注册
首先,我们定义一个接口:public interface ServiceRegistry { void register(String serviceName, ServiceAddress serviceAddress); }
这个接口很简单,向服务注册中心注册自己的地址。
对应的consul的实现:
public class ConsulServiceRegistry implements ServiceRegistry { private ConsulClient consulClient; public ConsulServiceRegistry(String consulAddress) { String address[] = consulAddress.split(":"); ConsulRawClient rawClient = new ConsulRawClient(address[0], Integer.valueOf(address[1])); consulClient = new ConsulClient(rawClient); } @Override public void register(String serviceName, ServiceAddress serviceAddress) { NewService newService = new NewService(); newService.setId(generateNewIdForService(serviceName, serviceAddress)); newService.setName(serviceName); newService.setTags(new ArrayList<>()); newService.setAddress(serviceAddress.getIp()); newService.setPort(serviceAddress.getPort()); // Set health check NewService.Check check = new NewService.Check(); check.setTcp(serviceAddress.toString()); check.setInterval("1s"); newService.setCheck(check); consulClient.agentServiceRegister(newService); } private String generateNewIdForService(String serviceName, ServiceAddress serviceAddress){ // serviceName + ip + port return serviceName + "-" + serviceAddress.getIp() + "-" + serviceAddress.getPort(); } }
这里我向consul注册服务的时候,还设定了健康状态检查方式为TCP连接方式, 即每过一秒,consul都会尝试与该地址建立TCP连接以验证服务状态。 除了TCP连接之外,consul还提供了http、ttl等多种检查方式。
另外一点值得注意的是,要确保id绝对唯一。 我能想到的比较直观的解决方案是serviceName + 本机ip + 本机port的组合。
服务发现
对于服务发现而言, 值得注意的是,我们需要去watch consul上值的变化, 并更新保存在应用中的服务的地址。首先,我们定义一个接口:
public interface ServiceDiscovery { String discover(String serviceName); }
这个接口很简单,传入serviceName,获取一个可以访问的该service的地址。
对应的consul的实现:
public class ConsulServiceDiscovery implements ServiceDiscovery { private ConsulClient consulClient; // 这里我用到了LoadBalancer, 关于LB这块,后续文章会专门讲述 Map<String, LoadBalancer<ServiceAddress>> loadBalancerMap = new ConcurrentHashMap<>(); public ConsulServiceDiscovery(String consulAddress) { String[] address = consulAddress.split(":"); ConsulRawClient rawClient = new ConsulRawClient(address[0], Integer.valueOf(address[1])); consulClient = new ConsulClient(rawClient); } @Override public String discover(String serviceName) { List<HealthService> healthServices; if (!loadBalancerMap.containsKey(serviceName)) { healthServices = consulClient.getHealthServices(serviceName, true, QueryParams.DEFAULT) .getValue(); loadBalancerMap.put(serviceName, buildLoadBalancer(healthServices)); // Watch consul longPolling(serviceName); } return loadBalancerMap.get(serviceName).next().toString(); } private void longPolling(String serviceName){ new Thread(new Runnable() { @Override public void run() { long consulIndex = -1; do { QueryParams param = QueryParams.Builder.builder() .setIndex(consulIndex) .build(); Response<List<HealthService>> healthyServices = consulClient.getHealthServices(serviceName, true, param); consulIndex = healthyServices.getConsulIndex(); log.debug("consul index for {} is: {}", serviceName, consulIndex); List<HealthService> healthServices = healthyServices.getValue(); log.debug("service addresses of {} is: {}", serviceName, healthServices); loadBalancerMap.put(serviceName, buildLoadBalancer(healthServices)); } while(true); } }).start(); } private LoadBalancer buildLoadBalancer(List<HealthService> healthServices) { return new RandomLoadBalancer(healthServices.stream() .map(healthService -> { HealthService.Service service =healthService.getService(); return new ServiceAddress(service.getAddress() , service.getPort()); }) .collect(Collectors.toList())); } }
这里我用到了LoadBalancer, 关于LB这块,后续文章会专门讲述。 除此之外, 这里的核心是在获取完服务地址之后,会watch该服务地址的变化, 并更新对应的LB中的地址列表。
就这样, 一个简单的服务注册与发现功能就实现了。 完整代码请看我的github。
相关文章推荐
- 一个简单RPC框架是如何炼成的(VI)——引入服务注册机制
- 简单RPC框架-基于Consul的服务注册与发现
- 【远程调用框架】如何实现一个简单的RPC框架(三)优化一:利用动态代理改变用户服务调用方式
- 简单RPC框架-基于Consul的服务注册与发现
- 简单RPC框架-基于Consul的服务注册与发现
- RPC框架中服务的注册与发现
- 如何写一个RPC框架(四):网络通信之客户端篇
- Service注册发现及其调用-分布式服务框架
- 一个简单RPC框架是如何炼成的(I)——开局篇
- 一个简单RPC框架是如何炼成的(III)——实现带参数的RPC调用
- go微服务框架go-micro深度学习(三) Registry服务的注册和发现
- 采用Best effort 1pc + 回滚补偿机制实现的一个distributed transaction (分布式事务框架).基于dubbo rpc服务上实现。
- 一个简单RPC框架是如何炼成的(II)——制定RPC消息
- 模拟dubbo 框架RPC调用及dubbo的服务动态注册,服务路由,负载均衡功能的思考
- go微服务框架go-micro深度学习(三) Registry服务的注册和发现
- 一起写RPC框架(十)RPC服务提供端二--服务的编织和注册
- 如何实现一个分布式 RPC 框架
- 如何实现一个分布式 RPC 框架
- 【远程调用框架】如何实现一个简单的RPC框架(四)优化二:改变底层通信框架
- spring-cloud 微服务框架集合 Eureke 服务注册于发现