您的位置:首页 > 编程语言 > Java开发

SpringCloud--Eureka服务注册和发现

2018-02-15 23:33 761 查看
Eureka是SpringCloud家族中的一个组件,因为它的有服务注册和发现的机制,所以很适合用于做注册中心。Eureka有服务端和客户端,注册中心作为服务端,我们提供的服务作为客户端注册到服务端上,由Eureka统一管理。

作为注册中心,它内部运行机制是什么样的?下面我就带着下面这些问题来学习Eureka。

1.如何去开发一个集成spring cloud eureka程序?
2.服务提供者怎么注册到服务中心的?
3.服务中心怎么接收注册请求?
4.服务中心怎么存储?
5.服务中心自身是怎么实现高可用的?
6.服务集群之间怎么同步信息?如何去重?
7.服务中心如何检查服务提供者是否正常?
8.服务提供者如果下架服务?


1.如何去开发一个集成spring cloud eureka程序?

下面就开发一个伪集群(在单机上)的Eureka程序:

服务端pom.xml文件主要配置:

<!-- spring boot 封装spring
starter封装、自动配置autoconfiguration
-->
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-parent</artifactId>
<version>Edgware.SR2</version>
<relativePath />
</parent>
<!-- 后面引用不用加版本号 -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Edgware.SR2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- spring-boot-starter-web web项目,集成容器tomcat -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- spring-boot-starter-actuator 管理工具/web 查看堆栈,动态刷新配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- cloud eureka组件 注册中心 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka-server</artifactId>
</dependency>
</dependencies>


bootstrap.yml文件:

# 应用名称
spring:
application:
name: eureka-server


application.yml文件:

# 上下文初始化加载
info:
name: Eureka server
contact: wenthkim

spring:
profiles:
active: eureka1
spring:
profiles: eureka1
server:
port: 8761
eureka:
client:
# 是否注册到eurekaserver
registerWithEureka: false
# 是否拉取信息
fetchRegistry: true
# eureka server地址
serviceUrl:
defaultZone: http://eureka1:8761/eureka/,http://eureka2:8762/eureka/,http://eureka3:8763/eureka/ server:
# false 关闭自我保护,不管如何都要剔除心跳检测异常的服务
enableSelfPreservation: true
# updatePeerEurekaNodes执行间隔
peerEurekaNodesUpdateIntervalMs: 10000000
waitTimeInMsWhenSyncEmpty: 0
instance:
hostname: eureka1
metadataMap:
instanceId: ${spring.application.name}:${vcap.application.instance_id:${spring.application.instance_id:${random.value}}}


启动java类:

@SpringBootApplication
@EnableEurekaServer
public class EurekaApp {
public static void main(String[] args) {
new SpringApplicationBuilder(EurekaApp.class).web(true).run(args);
}
}


下面把bootstrap.yml文件的port和hostname(需要在自己电脑C:\Windows\System32\drivers\etc下的hosts文件配置对应的域名)分别改成8761,8762,8763和eureka1,eureka2,eureka3,分别启动Eureka程序即可搭建一个高可用的注册中心。

启动成功后,浏览器输入http://localhost:8761/即看到成功界面



2.服务提供者怎么注册到服务中心的?

注册到服务中心的主要都有哪些信息:

ip、port(端口)、instance_id(实例id)、name(服务名)等。

当eureka客户端启动的时候,EurekaClientConfiguration读书客户端配置信息,创建一个InstanceInfo实例,然后交给ApplicationInfoManager管理,下面看段代码:

@Configuration
@ConditionalOnMissingRefreshScope
protected static class EurekaClientConfiguration {

@Autowired
private ApplicationContext context;

@Autowired(required = false)
private DiscoveryClientOptionalArgs optionalArgs;

@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager,
EurekaClientConfig config) {
return new CloudEurekaClient(manager, config, this.optionalArgs,
this.context);
}

@Bean
@ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
public ApplicationInfoManager eurekaApplicationInfoManager(
EurekaInstanceConfig config) {
//获取配置 创建实例
InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
return new ApplicationInfoManager(config, instanceInfo);
}
}


在实例化过程中,DiscoveryClient的HeartbeatThread定时任务会不断扫描,找到未注册的实例,并注册到服务中心,下面DiscoveryClient类的某段代码:

/**
* The heartbeat task that renews the lease in the given intervals.
* 心跳,注册定时任务
*/
private class HeartbeatThread implements Runnable {

public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}

/**
* Renew with the eureka service by making the appropriate REST call
*/
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
//向注册中心发送一个注册请求
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
//当注册中心返回404,证明这个实例没注册,则进行注册,否则返回注册成功
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
return register();
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
return false;
}
}

/**
* Register with the eureka service by making the appropriate REST call.
* 向服务端注册方法
*/
boolean register() throws Throwable {
logger.info(PREFIX + appPathIdentifier + ": registering service...");
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}


3.服务中心怎么接收注册请求?

通过rest接口接收,spring cloud 集成eureka server原生包中的Jersey RESTful接口,JerseyApi



下面跟着源码查看接收流程(eureka server是通过filter拦截请求的),代码如下:

入口EurekaServerAutoConfiguration类下的jerseyFilterRegistration会增加一个filter用于拦截注册请求

@Bean
public FilterRegistrationBean jerseyFilterRegistration(Application eurekaJerseyApp) {
FilterRegistrationBean bean = new FilterRegistrationBean();
bean.setFilter(new ServletContainer(eurekaJerseyApp));
bean.setOrder(2147483647);
//拦截上图客户端注册请求
bean.setUrlPatterns(Collections.singletonList("/eureka/*"));
return bean;
}


由ApplicationResource类下的addInstance受理请求,代码如下 :

@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info, @HeaderParam("x-netflix-discovery-replication") String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
if (this.isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (this.isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (this.isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!this.appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + this.appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
} else {
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier)dataCenterInfo).getId();
if (this.isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(this.serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
}

if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo)dataCenterInfo;
String effectiveId = amazonInfo.get(MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
//进行注册,注册成功返回204,isReplication防止循环传播
this.registry.register(info, "true".equals(isReplication));
return Response.status(204).build();
}
}


调用InstanceRegistry类的register方法注册成功并发布EurekaInstanceRegisteredEvent注册事件,代码如下:

/**
* 注册并发布注册事件
*/
public void register(InstanceInfo info, boolean isReplication) {
this.handleRegistration(info, this.resolveInstanceLeaseDuration(info), isReplication);
super.register(info, isReplication);
}

/**
* 发布注册事件
*/
private void handleRegistration(InstanceInfo info, int leaseDuration, boolean isReplication) {
this.log("register " + info.getAppName() + ", vip " + info.getVIPAddress() + ", leaseDuration " + leaseDuration + ", isReplication " + isReplication);
this.publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration, isReplication));
}


4.服务中心怎么存储?

查看AbstractInstanceRegistry类下的register方法可以清楚地看到客户端信息是存在ConcurrentHashMap里面的。

5.服务中心自身是怎么实现高可用的?

通过对等的eureka server实例,当eureka服务端启动时,会找到配置文件所填其它服务端地址,相互注册。例子,比如现在有eureka1,eureka2,eureka3。当有服务注册到eureka1时,eureka1会发出注册事件,此时eureka2,eureka3会同步这个实例信息。

6.服务集群之间怎么同步信息?如何去重?

eurekaserver初始化时,维护了一个PeerEurekaNodes.peerEurekaNodes列表,

当需要同步更新信息的时候, 遍历所有节点,并传播信息,会调用PeerAwareInstanceRegistryImpl类的这几个方法,cancel/register/renew,代码如下:

public boolean cancel(String appName, String id, boolean isReplication) {
if (super.cancel(appName, id, isReplication)) {
this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Cancel, appName, id, (InstanceInfo)null, (InstanceStatus)null, isReplication);
Object var4 = this.lock;
synchronized(this.lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
this.expectedNumberOfRenewsPerMin -= 2;
this.numberOfRenewsPerMinThreshold = (int)((double)this.expectedNumberOfRenewsPerMin * this.serverConfig.getRenewalPercentThreshold());
}

return true;
}
} else {
return false;
}
}

public void register(InstanceInfo info, boolean isReplication) {
int leaseDuration = 90;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}

super.register(info, leaseDuration, isReplication);
this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Register, info.getAppName(), info.getId(), info, (InstanceStatus)null, isReplication);
}

public boolean renew(String appName, String id, boolean isReplication) {
if (super.renew(appName, id, isReplication)) {
this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Heartbeat, appName, id, (InstanceInfo)null, (InstanceStatus)null, isReplication);
return true;
} else {
return false;
}
}
/**
* 遍历所有节点
*/
private void replicateToPeers(PeerAwareInstanceRegistryImpl.Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();

try {
if (isReplication) {
this.numberOfReplicationsLastMin.increment();
}

if (this.peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}

Iterator var8 = this.peerEurekaNodes.getPeerEurekaNodes().iterator();

while(var8.hasNext()) {
PeerEurekaNode node = (PeerEurekaNode)var8.next();
if (!this.peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
this.replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
}
} finally {
tracer.stop();
}

}
/**
* 对不同的事件进行处理
*/
private void replicateInstanceActionsToPeers(PeerAwareInstanceRegistryImpl.Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch(action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = (InstanceStatus)this.overriddenInstanceStatusMap.get(id);
infoFromRegistry = this.getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = this.getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = this.getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
}
} catch (Throwable var9) {
logger.error("Cannot replicate information to {} for action {}", new Object[]{node.getServiceUrl(), action.name(), var9});
}

}


7.服务中心如何检查服务提供者是否正常?

服务剔除: 长时间没有给心跳 默认90秒

当eureka启动初始化上下文的时候,会启动一个定时任务EvictionTask(检测异常服务)。

代码有点多就不一一贴出来了。代码入口为EurekaServerInitializerConfiguration类的start方法,最后调到AbstractInstanceRegistry类的postInit启动EvictionTask。

当eureka server开启自我保护的情况下,不会剔除任何服务。即eureka.server.enableSelfPreservation=true的时候。自我保护例子:

当一个服务有10个实例,默认提交心跳时间为30秒。

此时阀值为0.8,

那么每分钟需要收到心跳包的个数为:10*0.8*2=16,

如果一分钟内,eureka收到的心跳请求没达到16个,eureka则怀疑是网络抖动,此时不会剔除任何服务。

8.服务提供者如果下架服务?

当spring上下文关闭时下架eureka服务,程序入口EurekaDiscoveryClientConfiguration类的onApplicationEvent方法,最终调用DiscoveryClient类的shutdown,unregister,cancel方法下架服务,代码如下:

@EventListener(ContextClosedEvent.class)
public void onApplicationEvent(ContextClosedEvent event) {
// register in case meta data changed
stop();
this.eurekaClient.shutdown();
}
/**
* Shuts down Eureka Client. Also sends a deregistration request to the
* eureka server.
*/
@PreDestroy
@Override
public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");

if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}

cancelScheduledTasks();

// If APPINFO was registered
if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka()) {
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
unregister();
}

if (eurekaTransport != null) {
eurekaTransport.shutdown();
}

heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();

logger.info("Completed shut down of DiscoveryClient");
}
}

/**
* unregister w/ the eureka service.
*/
void unregister() {
// It can be null if shouldRegisterWithEureka == false
if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
try {
logger.info("Unregistering ...");
EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
logger.info(PREFIX + appPathIdentifier + " - deregister  status: " + httpResponse.getStatusCode());
} catch (Exception e) {
logger.error(PREFIX + appPathIdentifier + " - de-registration failed" + e.getMessage(), e);
}
}
}


从此开始微服务的学习之旅,本人正在入门学习,如理解有误欢迎指正,欢迎有兴趣的小伙伴一起交流。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: