您的位置:首页 > 编程语言 > Java开发

SpringCloud——服务注册与发现Eureka以及注册源码解析

2018-02-01 14:42 1461 查看
springboot基础:http://blog.51cto.com/qinbin/2067361服务注册与发现


关系: 1.服务提供者在启动时,向注册中心注册自己提供的服务。 2.服务消费者在启动时,向注册中心订阅自己所需的服务。 3.注册中心返回服务提供者地址给消费者。 4.服务消费者从提供者地址中调用消费者。Eureka介绍: Spring Cloud Eureka 是Spring Cloud Netflix 微服务套件中的一部分, 它基于Netflix Eureka 做了二次封装, 主要负责完成微服务架构中的服务治理功能。Spring Cloud 通过为Eureka 增加了Spring Boot 风格的自动化配置,我们只需通过简单引入依赖和注解配置就能让Spring Boot 构建的微服务应用轻松地与Eureka 服务治理体系进行整合。
Netflix Eureka
Spring Cloud Eureka, 使用Netflix Eureka来实现服务注册与发现, 它既包含了服务端组件,也包含了客户端组件,并且服务端与客户端均采用Java编写,所以Eureka主要适用于通过Java实现的分布式系统,或是与NM兼容语言构建的系统。但是, 由于Eureka服务端的服务治理机制提供了完备的RESTfulAPL所以它也支持将非Java语言构建的微服务应用纳入Eureka的服务治理体系中来。只是在使用其他语言平台的时候,需要自己来实现Eureka的客户端程序。不过庆幸的是,在目前几个较为流行的开发平台上,都已经有了一些针对Eureka 注册中心的客户端实现框架, 比如.NET平台的Steeltoe、Node.js 的eureka-js-client等。
Eureka服务端, 我们也称为服务注册中心。它同其他服务注册中心一样,支持高可用配置。它依托于强一致性提供良好的服务实例可用性, 可以应对多种不同的故障场景。如果Eureka以集群模式部署,当集群中有分片出现故障时,那么Eureka就转入自我保护模式。它允许在分片故障期间继续提供服务的发现和注册,当故障分片恢复运行时, 集群中的其他分片会把它们的状态再次同步回来。以在AWS 上的实践为例, Netflix推荐每个可用的区域运行一个Eureka服务端,通过它来形成集群。不同可用区域的服务注册中心通过异步模式互相复制各自的状态,这意味着在任意给定的时间点每个实例关于所有服务的状态是有细微差别的。
Eureka客户端,主要处理服务的注册与发现。客户端服务通过注解和参数配置的方式,嵌入在客户端应用程序的代码中, 在应用程序运行时,Eureka客户端向注册中心注册自身提供的服务并周期性地发送心跳来更新它的服务租约。同时,它也能从服务端查询当前注册的服务信息并把它们缓存到本地并周期性地刷新服务状态。

Eureka环境搭建一、服务端搭建1.新建项目





2.配置
# server (eureka 默认端口为:8761)
server.port=8761

# spring
spring.application.name=spring-cloud-server

# eureka
# 是否注册到eureka
eureka.client.register-with-eureka=false
# 是否从eureka获取注册信息
eureka.client.fetch-registry=false
# eureka服务器的地址(注意:地址最后面的 /eureka/ 这个是固定值)
eureka.client.serviceUrl.defaultZone=http://localhost:${server.port}/eureka/
@EnableEurekaServer
@SpringBootApplication
public class CloudTestEurekaServerApplication {

public static void main(String[] args) {
SpringApplication.run(CloudTestEurekaServerApplication.class, args);
}
}
3.启动服务



二、客户端(provider)搭建1.创建项目



2.配置
# server
server.port=7777

# spring
spring.application.name=spring-cloud-provider

# eureka
eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/
@EnableEurekaClient
@SpringBootApplication
public class CloudTestEurekaProviderApplication {

public static void main(String[] args) {
SpringApplication.run(CloudTestEurekaProviderApplication.class, args);
}
}
3.启动服务

三、消费者(consumer)搭建



自我保护
当我们在本地调试基于Eureka的程序时, 基本上都会碰到这样一个问题, 在服务注册中心的信息面板中出现类似下面的红色警告信息:
EMERGENCY! EUREKA MAY BE INCORRECTLY CLAIMING INSTANCES ARE UP WHEN THEY'RE NOT.RENEWALS ARE LESSER TH邸THRESHOLD AND HENCE THE INSTANCES ARE NOT BEING EXPI邸D JUST TO BE SAFE.
实际上, 该警告就是触发了EurekaServer的自我保护机制。之前我们介绍过, 服务注册到EurekaServer之后,会维护一个心跳连接,告诉EurekaServer自己还活着。EurekaServer在运行期间,会统计心跳失败的比例在15分钟之内是否低于85%, 如果出现低于的情况(在单机调试的时候很容易满足, 实际在生产环境上通常是由于网络不稳定导致), EurekaServer会将当前的实例注册信息保护起来, 让这些实例不会过期, 尽可能保护这些注册信息。但是, 在这段保护期间内实例若出现问题, 那么客户端很容易拿到实际已经不存在的服务实例, 会出现调用失败的清况, 所以客户端必须要有容错机制, 比如可以使用请求重试、断路器等机制。

Euraka服务启动过程源码解析启动日志如下

[           main] o.s.j.e.a.AnnotationMBeanExporter        : Located managed bean 'environmentManager': registering with JMX server as MBean [org.springframework.cloud.context.environment:name=environmentManager,type=EnvironmentManager]
[           main] o.s.j.e.a.AnnotationMBeanExporter        : Located managed bean 'serviceRegistryEndpoint': registering with JMX server as MBean [org.springframework.cloud.client.serviceregistry.endpoint:name=serviceRegistryEndpoint,type=ServiceRegistryEndpoint]
[           main] o.s.j.e.a.AnnotationMBeanExporter        : Located managed bean 'refreshScope': registering with JMX server as MBean [org.springframework.cloud.context.scope.refresh:name=refreshScope,type=RefreshScope]
[           main] o.s.j.e.a.AnnotationMBeanExporter        : Located managed bean 'configurationPropertiesRebinder': registering with JMX server as MBean [org.springframework.cloud.context.properties:name=configurationPropertiesRebinder,context=ab7395e,type=ConfigurationPropertiesRebinder]
[           main] o.s.j.e.a.AnnotationMBeanExporter        : Located managed bean 'refreshEndpoint': registering with JMX server as MBean [org.springframework.cloud.endpoint:name=refreshEndpoint,type=RefreshEndpoint]
[           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 0
[           main] o.s.c.n.e.s.EurekaServiceRegistry        : Registering application spring-cloud-server with eureka with status UP
[      Thread-11] o.s.c.n.e.server.EurekaServerBootstrap   : Setting the eureka configuration..
[      Thread-11] o.s.c.n.e.server.EurekaServerBootstrap   : Eureka data center value eureka.datacenter is not set, defaulting to default
[      Thread-11] o.s.c.n.e.server.EurekaServerBootstrap   : Eureka environment value eureka.environment is not set, defaulting to test
[      Thread-11] o.s.c.n.e.server.EurekaServerBootstrap   : isAws returned false
[      Thread-11] o.s.c.n.e.server.EurekaServerBootstrap   : Initialized server context
[      Thread-11] c.n.e.r.PeerAwareInstanceRegistryImpl    : Got 1 instances from neighboring DS node
[      Thread-11] c.n.e.r.PeerAwareInstanceRegistryImpl    : Renew threshold is: 1
[      Thread-11] c.n.e.r.PeerAwareInstanceRegistryImpl    : Changing status to UP
[      Thread-11] e.s.EurekaServerInitializerConfiguration : Started Eureka Server
[           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8761 (http)
[           main] .s.c.n.e.s.EurekaAutoServiceRegistration : Updating port to 8761
[           main] c.m.e.CloudTestEurekaServerApplication   : Started CloudTestEurekaServerApplication in 4.076 seconds (JVM running for 4.54)
[nio-8761-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring FrameworkServlet 'dispatcherServlet'
[nio-8761-exec-1] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization started
[nio-8761-exec-1] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization completed in 11 ms
[a-EvictionTimer] c.n.e.registry.AbstractInstanceRegistry  : Running the evict task with compensationTime 0ms
2.可以看到在"Started Eureka Server"这一行,发现执行了类EurekaServerInitializerConfiguration,所以它是程序入口,进入:
@Configuration
public class EurekaServerInitializerConfiguration
implements ServletContextAware, SmartLifecycle, Ordered {
@Override
public void start() {
new Thread(new Runnable() {
@Override
public void run() {
try {
//TODO: is this class even needed now?
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");

publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
EurekaServerInitializerConfiguration.this.running = true;
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}
}).start();
}
3.可以发现这个类上面有注解@Configuration,说明这个类可以被spring容器感知到,然后实例化,并且会执行start()方法,开启一个线程执行功能;然后再进入contextInitialied方法:
public class EurekaServerBootstrap {
public void contextInitialized(ServletContext context) {
try {
initEurekaEnvironment();
initEurekaServerContext();

context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
4.可发现上面方法主要有两个功能:环境初始化和服务初始化,这里只看服务初始化,进入initEurekaServerContext()方法,可以看到下面代码:
public class EurekaServerBootstrap {
protected void initEurekaServerContext() throws Exception {
// For backward compatibility
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);

if (isAws(this.applicationInfoManager.getInfo())) {
this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
this.eurekaClientConfig, this.registry, this.applicationInfoManager);
this.awsBinder.start();
}

EurekaServerContextHolder.initialize(this.serverContext);

log.info("Initialized server context");

// Copy registry from neighboring eureka node
int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);

// Register all monitoring statistics.
EurekaMonitors.registerAllStats();
}
5.上面代码首先初始化server上下文,然后再去注册。可以看到先获得变量registryCount(注册表),然后通过调用openForTraffic方法,为注册监测数据做准备,或者可以这样说(检测监测的数据是否存活,如果不存活,做剔除操作),下面是函数一步一步进入的情况:

public class InstanceRegistry extends PeerAwareInstanceRegistryImpl
implements ApplicationContextAware {
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
super.openForTraffic(applicationInfoManager,
count == 0 ? this.defaultOpenForTrafficCount : count);
}
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// Renewals happen every 30 seconds and for a minute it should be a factor of 2.
this.expectedNumberOfRenewsPerMin = count * 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
logger.info("Got " + count + " instances from neighboring DS node");
logger.info("Renew threshold is: " + numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
super.postInit();
}
进入postInit()方法:
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
之前在日志中有一句:"Running the evict task with compensationTime 0ms",这句话就是做节点剔除操作,就是在EvictionTask()方法执行的。
失效剔除
有些时候, 我们的服务实例并不一定会正常下线, 可能由于内存溢出、网络故障等原因使得服务不能正常工作, 而服务注册中心并未收到“服务下线” 的请求。为了从服务列表中将这些无法提供服务的实例剔除, Eureka Server在启动的时候会创建一个定时任务,默认每隔一段时间(默认为60秒) 将当前清单中超时(默认为90秒)没有续约的服务剔除出去。
class EvictionTask extends TimerTask {

private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);

@Override
public void run() {
try {
long compensationTimeMs = getCompensationTimeMs();
logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
evict(compensationTimeMs);
} catch (Throwable e) {
logger.error("Could not run the evict task", e);
}
}

/**
* compute a compensation time defined as the actual time this task was executed since the prev iteration,
* vs the configured amount of time for execution. This is useful for cases where changes in time (due to
* clock skew or gc for example) causes the actual eviction task to execute later than the desired time
* according to the configured cycle.
*/
long getCompensationTimeMs() {
long currNanos = getCurrentTimeNano();
long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
if (lastNanos == 0l) {
return 0l;
}

long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
return compensationTime <= 0l ? 0l : compensationTime;
}

long getCurrentTimeNano() {  // for testing
return System.nanoTime();
}

}
Eureka客户端注册过程

入口:DiscoveryClient
首先上日志信息:
[           main] com.netflix.discovery.DiscoveryClient    : Initializing Eureka in region us-east-1
[           main] com.netflix.discovery.DiscoveryClient    : Client configured to neither register nor query for data.
[           main] com.netflix.discovery.DiscoveryClient    : Discovery Client initialized at timestamp 1517475139464 with initial instances count: 0
[           main] c.n.eureka.DefaultEurekaServerContext    : Initializing ...
进入DiscoveryClient,首先看它的构造函数,里面执行initScheduledTasks()方法进行注册
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
if (args != null) {
this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
this.eventListeners.addAll(args.getEventListeners());
this.preRegistrationHandler = args.preRegistrationHandler;
} else {
this.healthCheckCallbackProvider = null;
this.healthCheckHandlerProvider = null;
this.preRegistrationHandler = null;
}

this.applicationInfoManager = applicationInfoManager;
InstanceInfo myInfo = applicationInfoManager.getInfo();

clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();
instanceInfo = myInfo;
if (myInfo != null) {
appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
} else {
logger.warn("Setting instanceInfo to a passed in null value");
}

this.backupRegistryProvider = backupRegistryProvider;

this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
localRegionApps.set(new Applications());

fetchRegistryGeneration = new AtomicLong(0);

remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));

if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}

if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}

logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
logger.info("Client configured to neither register nor query for data.");
scheduler = null;
heartbeatExecutor = null;
cacheRefreshExecutor = null;
eurekaTransport = null;
instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);

initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());

return;  // no need to setup up an network tasks and we are done
}

try {
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());

heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
);  // use direct handoff

cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
);  // use direct handoff

eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);

AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}

if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}

// call and execute the pre registration handler before all background tasks (inc registration) is started
if (this.preRegistrationHandler != null) {
this.preRegistrationHandler.beforeRegistration();
}
initScheduledTasks();

try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}

// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);

initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
}
2.进入initScheduledTasks()方法,里面有两个大if代码块,"clientConfig.shouldRegisterWithEureka()"是核心逻辑,用于向Eureka注册
/**
* Initializes all scheduled tasks.
*/
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}

if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);

// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize

statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}

@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};

if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}

instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
3.上面代码执行了onDemandUpdate()方法,进入可见:
public boolean onDemandUpdate() {
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
scheduler.submit(new Runnable() {
@Override
public void run() {
logger.debug("Executing on-demand update of local InstanceInfo");

Future latestPeriodic = scheduledPeriodicRef.get();
if (latestPeriodic != null && !latestPeriodic.isDone()) {
logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
latestPeriodic.cancel(false);
}

InstanceInfoReplicator.this.run();
}
});
return true;
} else {
logger.warn("Ignoring onDemand update due to rate limiter");
return false;
}
}

public void run() {
try {
discoveryClient.refreshInstanceInfo();

Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
4.可以看见,执行了register()方法进行注册,进入:
boolean register() throws Throwable {
logger.info(PREFIX + appPathIdentifier + ": registering service...");
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
5.最后函数返回204,所以当注册状态为204,即为注册成功。现在客户端注册成功了,就应该到了服务端接收注册的过程:
Eureka接收注册的过程

入口ApplicationResource


@Produces({"application/xml", "application/json"})
public class ApplicationResource {
/**
* Registers information about a particular instance for an
* {@link com.netflix.discovery.shared.Application}.
*
* @param info
*            {@link InstanceInfo} information of the instance.
* @param isReplication
*            a header parameter containing information whether this is
*            replicated from other nodes.
*/
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
}

// handle cases where clients may be registering with bad DataCenterInfo with missing data
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}

registry.register(info, "true".equals(isReplication));
return Response.status(204).build();  // 204 to be backwards compatible
}
2.可以看到,最终也是返回204;同时一步一步进入register方法,看是如何完成注册的。
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
super.register(info, isReplication);
}
/**
* Registers the information about the {@link InstanceInfo} and replicates
* this information to all peer eureka nodes. If this is replication event
* from other replica nodes then it is not replicated.
*
* @param info
*            the {@link InstanceInfo} to be registered and replicated.
* @param isReplication
*            true if this is a replication event from other replica nodes,
*            false otherwise.
*/
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
/**
* Registers a new instance with a given duration.
*
* @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
*/
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold
// (1
// for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}

// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);

// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
由于Eureka并没有数据库,索引通过map放在数据库中。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spring cloud