【Eureka】【源码+图解】【二】Eureka客户端启动过程
目录
-
- 3.2 服务端
-
- 3.2.1 EurekaController
- 3.2.2 PeerAwareInstanceRegistry
-
- 3.2.2.1 初始化缓存
-
- 读写缓存generatePayload
- 只读缓存getCacheUpdateTask
- 3.2.2.2 定时更新自我保护的阈值
- 3.2.2.3 初始化其他区域的注册实例
-
- 3.2.2.3.1 创建discoveryJerseyClient
- 3.2.2.3.2 创建eurekaHttpClient
- 3.2.2.3.3 定时更新remoteRegion的注册实例信息
3.2 服务端
HelloWorld总是如此简单,接下来就要去看看eureka server内部的工作原理了。
如果要研究它的源码,我们总得找到一个入口,服务端的代码我们就只加了一个@EnableEurekaServer
就成功创建了一个server,跟着这个蛛丝马迹一探它的究竟
/**
* Annotation to activate Eureka Server related configuration.
* {@link EurekaServerAutoConfiguration}
*
* @author Dave Syer
* @author Biju Kunjummen
*
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {
}
好吧,啥都没有?别急,我们看下注解,它是用来激活Eureka Server的相关配置的,还告诉我们是在哪个配置文件,我们先看下EurekaServerAutoConfiguration
类图
配置类注册了几个和服务端工作相关的类,看不懂不要紧,我们暂时先放下,再详细看下EurekaServerAutoConfiguration
的源码,当然不会看全部,先看下头部的注解
@Configuration(proxyBeanMethods = false)
// 7、它的作用主要是启动定时线程,定期检查客户端实例的状态,实现自我保护机制
@Import(EurekaServerInitializerConfiguration.class)
// 还记得前面的@EnableEurekaServer吗?它@Import(EurekaServerMarkerConfiguration.class)注入了Marker类
// 这就是为什么eureka server启动时要加@EnableEurekaServer的原因
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
// 配置参数
@EnableConfigurationProperties({
EurekaDashboardProperties.class, // eureka.dashboard.*
InstanceRegistryProperties.class, // eureka.instance.registry.*
EurekaProperties.class }) // eureka.*
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
......
}
先介绍下这几个类的作用,看图
jerseyApplication:其实是它里面的各个xxxResource类,用于接收来自客户端以及其它服务端的请求
PeerAwareInstanceRegistry:处理请求
PeerEurekaNode:服务端与服务端相互复制实例信息
RemoteRegionRegistry:获取其他region的注册实例
而EurekaServerContext、EurekaServerBootstrap、EurekaServerInitializerConfiguration则是用来初始化途中的四个类的。
3.2.1 EurekaController
先看配置
@EnableConfigurationProperties({
EurekaDashboardProperties.class, ... })
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
@Bean
@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
public EurekaController eurekaController(EurekaProperties eurekaProperties) {
return new EurekaController(this.applicationInfoManager, eurekaProperties);
}
}
先看它的配置信息类EurekaDashboardProperties
@ConfigurationProperties("eureka.dashboard")
public class EurekaDashboardProperties {
// dashboard页面的跟path,
// 前面的HelloWorld中我们直接输入http://localhost:3333/就可以显示,
// 可配置eureka.dashboard.path修改
private String path = "/";
// 默认开启的,如果false我们就不能看到dashboard页面
private boolean enabled = true;
}
再看EurekaController
提供了两个page,第一个是默认,可以结合下面的图看;第二个是/lastn
@Controller
@RequestMapping("${eureka.dashboard.path:/}")
public class EurekaController {
@Value("${eureka.dashboard.path:/}")
private String dashboardPath = "";
@RequestMapping(method = RequestMethod.GET)
public String status(HttpServletRequest request, Map<String, Object> model) {
// 自身基础信息
populateBase(request, model);
// 已向它注册的实例信息
populateApps(model);
StatusInfo statusInfo;
......
model.put("statusInfo", statusInfo);
populateInstanceInfo(model, statusInfo);
filterReplicas(model, statusInfo);
return "eureka/status";
}
@RequestMapping(value = "/lastn", method = RequestMethod.GET)
public String lastn(HttpServletRequest request, Map<String, Object> model) {
......
return "eureka/lastn";
}
}
3.2.2 PeerAwareInstanceRegistry
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs) {
// 强制初始化,还记得前面客户端的DiscoveryClient吗?成为一个Server之前你得先是一个Client,
// 与客户端稍微不同的是,客户端的eurekaTransport.queryClient是RestTemplateEurekaHttpClient,
// 而服务端的是JerseyApplicationClient
this.eurekaClient.getApplications();
// 注意,这里创建的是InstanceRegistry类,不是InstanceRegistry接口
return new InstanceRegistry(this.eurekaServerConfig, // EurekaServerConfigBean,eureka.server.*的配置
this.eurekaClientConfig, // EurekaClientConfigBean,eureka.client.*的配置
serverCodecs, // 加密解密
this.eurekaClient, // 作为客户端身份的信息
// eureka.instance.registry.expectedNumberOfClientsSendingRenews,默认1
this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
// eureka.instance.registry.defaultOpenForTrafficCount,默认1
this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}
}
先看下InstanceRegistry类图
先看下InstanceRegistry的接口,这些接口的方法名是不是很熟悉?和客户端RestTemplateEurekaHttpClient
的方法一一对应,所以客户端发送的请求最终都会由InstanceRegistry
受理,在这里可以简单把InstanceRegistry
当做我们平时常见的service
。从类图中我们看到PeerAwareInstanceRegistry
接口定义了init()
方法,对于这种init
、start
等方法都值得去看一下
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
// 启动统计
this.numberOfReplicationsLastMin.start();
// 赋值副本节点
this.peerEurekaNodes = peerEurekaNodes;
// 1、初始化缓存
initializedResponseCache();
// 2、开启定时刷新
scheduleRenewalThresholdUpdateTask();
// 3、初始化
initRemoteRegionRegistry();
......
}
}
3.2.2.1 初始化缓存
ResponseCacheImpl
主要是缓存注册信息,当客户端发出请求的时候可以从缓存中取,还记得客户端启动时的fetchRegistry
吗?
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
@Override
public synchronized void initializedResponseCache() {
if (responseCache == null) {
responseCache = new ResponseCacheImpl(serverConfig, serverCodecs, this);
}
}
}
先看下类图
最重要的是两个绿色的缓存数据结构,先看下他们的比较
他们之前所不同的是一个map,一个是LoadingCache,从名字中大概能看出一个是静态的,一个是动态的。map是通过一个定时任务维护的,而LoadingCache是每次拿值的时候动态更新,我们先看下ResponseCacheImpl
构造函数
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
this.serverConfig = serverConfig;
this.serverCodecs = serverCodecs;
// useReadOnlyResponseCache,使用只读缓存,默认true
// 只读缓存:readOnlyCacheMap,每次读的时候先从自己里读,没有再去readWriteCacheMap读,读到值后更新
// 读写缓存:readWriteCacheMap,每次读值都会去获取最新的Applications
this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
this.registry = registry;
this.readWriteCacheMap =
CacheBuilder.newBuilder()
// eureka.server.initialCapacityOfResponseCache,默认1000,读写缓存大小
.initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
// eureka.server.responseCacheAutoExpirationInSeconds,缓存失效时间,默认180秒
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
.removalListener(...) // 监听RemovalNotification,当收到通知时从regionSpecificKeys删除相应的信息
.build(new CacheLoader<Key, Value>() {
@Override
public Value load(Key key) throws Exception {
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
// 1、根据key动态获取value
Value value = generatePayload(key);
return value;
}
});
// eureka.server.responseCacheUpdateIntervalMs,只读缓存更新时间,默认30*1000毫秒,即30秒
long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
if (shouldUseReadOnlyResponseCache) {
// 开启只读缓存定时更新任务
timer.schedule(getCacheUpdateTask(), // 2、定时更新只读缓存
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}
......
}
读写缓存generatePayload
从构造函数中看到readWriteCacheMap
的关键在于generatePayload(key)
public class ResponseCacheImpl implements ResponseCache {
private Value generatePayload(Key key) {
String payload;
switch (key.getEntityType()) {
case Application:
boolean isRemoteRegionRequested = key.hasRegions();
// key.getName()即appName
// 目的就是获得Application或者Applications,对于从客户端来的请求key的name为ALL_APPS
if (ALL_APPS.equals(key.getName())) {
// 不管isRemoteRegionRequested是true或false,最终都会走到registry.getApplicationsFromMultipleRegions
if (isRemoteRegionRequested) {
payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
} else {
payload = getPayLoad(key, registry.getApplications());
}
} else if (ALL_APPS_DELTA.equals(key.getName())) {
......
} else {
payload = getPayLoad(key, registry.getApplication(key.getName()));
}
break;
case VIP:
case SVIP:
payload = getPayLoad(key, getApplicationsForVip(key, registry));
break;
default:
payload = "";
break;
}
return new Value(payload);
}
}
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;
......
Applications apps = new Applications();
apps.setVersion(1L);
for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
// 先获取当前server的注册实例列表
......
}
if (includeRemoteRegion) {
// 再获取remoteRegions的注册实例
for (String remoteRegion : remoteRegions) {
// 在服务端启动的时候会初始化regionNameVSRemoteRegistry,并且每个RemoteRegionRegistry会定时更新注册实例的信息,可参考后文
RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
if (null != remoteRegistry) {
Applications remoteApps = remoteRegistry.getApplications();
......
} else {
logger.warn("No remote registry available for the remote region {}", remoteRegion);
}
}
}
apps.setAppsHashCode(apps.getReconcileHashCode());
return apps;
}
}
只读缓存getCacheUpdateTask
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
for (Key key : readOnlyCacheMap.keySet()) {
try {
CurrentRequestVersion.set(key.getVersion());
// 如果readOnlyCacheMap与readWriteCacheMap的值,如果不相同则把readWriteCacheMap中的值更新到readOnlyCacheMap
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
} finally {
CurrentRequestVersion.remove();
}
}
}
};
}
3.2.2.2 定时更新自我保护的阈值
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
private void scheduleRenewalThresholdUpdateTask() {
// 每隔15分钟更新自我保护的阈值,也就是如果续约的实例低于这个阈值就会发出预警
timer.schedule(new TimerTask() {
@Override
public void run() {
updateRenewalThreshold();
}
}, serverConfig.getRenewalThresholdUpdateIntervalMs(),
// eureka.server.renewalThresholdUpdateIntervalMs,默认15分钟
serverConfig.getRenewalThresholdUpdateIntervalMs());
}
private void updateRenewalThreshold() {
......
Applications apps = eurekaClient.getApplications();
int count = 0;
// 统计当前已注册的实例总数
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
if (this.isRegisterable(instance)) {
++count;
}
}
}
synchronized (lock) {
if ((count) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfClientsSendingRenews)
|| (!this.isSelfPreservationModeEnabled())) {
// 更新实例总数
this.expectedNumberOfClientsSendingRenews = count;
updateRenewsPerMinThreshold();
}
}
......
}
protected void updateRenewsPerMinThreshold() {
// 这个值的作用时,server会每隔一段时间检查续约的实例,续约成功的实例个数要达到numberOfRenewsPerMinThreshold,不然会发出警告,即自我保护
// 实例总数 * 单个实例1分钟需注册的次数 * 百分比
this.numberOfRenewsPerMinThreshold = (int) (
// 当前已注册的实例总数
this.expectedNumberOfClientsSendingRenews
// eureka.server.expectedClientRenewalIntervalSeconds,客户端续约时间,默认30秒
// 60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds(),1分钟要注册的次数
* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
// eureka.server.renewalPercentThreshold,续约比例不能低于这个值,默认0.85,否则就会发出警告
* serverConfig.getRenewalPercentThreshold());
}
}
3.2.2.3 初始化其他区域的注册实例
RemoteRegionRegistry的作用主要是获取运行在其他region的server的实例注册信息
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
protected void initRemoteRegionRegistry() throws MalformedURLException {
// eureka.server.remoteRegionUrlsWithName,默认new HashMap<>(),key为region,value为region的url
Map<String, String> remoteRegionUrlsWithName = serverConfig.getRemoteRegionUrlsWithName();
if (!remoteRegionUrlsWithName.isEmpty()) {
allKnownRemoteRegions = new String[remoteRegionUrlsWithName.size()];
int remoteRegionArrayIndex = 0;
for (Map.Entry<String, String> remoteRegionUrlWithName : remoteRegionUrlsWithName.entrySet()) {
// 初始化RemoteRegionRegistry,初始化完后会开启一个定时更新RemoteRegion的注册实例列表,详见后文
RemoteRegionRegistry remoteRegionRegistry = new RemoteRegionRegistry(
serverConfig,
clientConfig,
serverCodecs,
remoteRegionUrlWithName.getKey(),
new URL(remoteRegionUrlWithName.getValue()));
// 更新到regionNameVSRemoteRegistry
regionNameVSRemoteRegistry.put(remoteRegionUrlWithName.getKey(), remoteRegionRegistry);
allKnownRemoteRegions[remoteRegionArrayIndex++] = remoteRegionUrlWithName.getKey();
}
}
}
}
先看下RemoteRegionRegistry类图
从图中可以看到,它的作用与客户单的DiscoveryClient
和服务端的InstanceRegistry
作用差不多,它负责的是与其他region的服务端的通信,再看下其构造函数。其构造函数代码过多,为了简便先看下创建流程图
着重分析下绿色的部分
3.2.2.3.1 创建discoveryJerseyClient
discoveryJerseyClient即EurekaJerseyClientImpl,作用是提供ApacheHttpClient4,用于获取其他region的注册实例
public class RemoteRegionRegistry implements LookupService<String> {
public RemoteRegionRegistry(...) {
......
EurekaJerseyClientBuilder clientBuilder = new EurekaJerseyClientBuilder()
.withUserAgent("Java-EurekaClient-RemoteRegion")
.withEncoderWrapper(serverCodecs.getFullJsonCodec())
.withDecoderWrapper(serverCodecs.getFullJsonCodec())
// eureka.server.remoteRegionConnectTimeoutMs,默认1000毫秒
.withConnectionTimeout(serverConfig.getRemoteRegionConnectTimeoutMs())
// eureka.server.remoteRegionReadTimeoutMs,默认1000毫秒
.withReadTimeout(serverConfig.getRemoteRegionReadTimeoutMs())
// eureka.server.remoteRegionTotalConnectionsPerHost,默认500
.withMaxConnectionsPerHost(serverConfig.getRemoteRegionTotalConnectionsPerHost())
// eureka.server.remoteRegionTotalConnections,默认1000
.withMaxTotalConnections(serverConfig.getRemoteRegionTotalConnections())
// eureka.server.remoteRegionConnectionIdleTimeoutSeconds,默认30秒
.withConnectionIdleTimeout(serverConfig.getRemoteRegionConnectionIdleTimeoutSeconds());
if (remoteRegionURL.getProtocol().equals("http")) {
......
} else {
clientBuilder.withClientName("Discovery-RemoteRegionSecureClient-" + regionName)
.withTrustStoreFile(
// eureka.server.remoteRegionTrustStore,默认null
serverConfig.getRemoteRegionTrustStore(),
// eureka.server.remoteRegionTrustStorePassword,默认changeit
serverConfig.getRemoteRegionTrustStorePassword()
);
}
discoveryJerseyClient = clientBuilder.build();
discoveryApacheClient = discoveryJerseyClient.getClient();
}
}
3.2.2.3.2 创建eurekaHttpClient
过程不再赘述,这里简单比较下不同角色所用的client,作用也是用于获取其他region的注册实例
3.2.2.3.3 定时更新remoteRegion的注册实例信息
public class RemoteRegionRegistry implements LookupService<String> {
public RemoteRegionRegistry(EurekaServerConfig serverConfig,
EurekaClientConfig clientConfig,
ServerCodecs serverCodecs,
String regionName,
URL remoteRegionURL) {
......
// 1、定义更新remote region的server的注册信息的task
Runnable remoteRegionFetchTask = new Runnable() {
@Override
public void run() {
// 具体逻辑,最终会到fetchRemoteRegistry函数
if (fetchRegistry()) {
readyForServingData = true;
}
}
};
// 2、创建执行task的线程池
ThreadPoolExecutor remoteRegionFetchExecutor = new ThreadPoolExecutor(
1,
// eureka.server.remoteRegionFetchThreadPoolSize,默认20
serverConfig.getRemoteRegionFetchThreadPoolSize(),
0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
// 3、创建定时调度器
scheduler = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder()
.setNameFormat("Eureka-RemoteRegionCacheRefresher_" + regionName + "-%d")
.setDaemon(true)
.build());
// 4、开启定时任务,每次完成更新后再过30秒进行下一次更新,循环反复
scheduler.schedule(
new TimedSupervisorTask(
"RemoteRegionFetch_" + regionName,
scheduler,
remoteRegionFetchExecutor,
// eureka.server.remoteRegionRegistryFetchInterval, 默认30秒
serverConfig.getRemoteRegionRegistryFetchInterval(),
TimeUnit.SECONDS,
5, // 最大超时倍数
remoteRegionFetchTask
),
serverConfig.getRemoteRegionRegistryFetchInterval(), TimeUnit.SECONDS);
}
private Applications fetchRemoteRegistry(boolean delta) {
// eureka.server.experimental.transport.enabled,默认false
if (shouldUseExperimentalTransport()) {
......
// 最终由JerseyApplicationClient发出http请求
EurekaHttpResponse<Applications> httpResponse = delta ? eurekaHttpClient.getDelta() : eurekaHttpClient.getApplications();
......
} else {
......
// delta为false
String urlPath = delta ? "apps/delta" : "apps/";
// discoveryApacheClient为ApacheHttpClient4
response = discoveryApacheClient.resource(this.remoteRegionURL + urlPath)
.accept(MediaType.APPLICATION_JSON_TYPE)
.get(ClientResponse.class);
......
}
return null;
}
}
未完待续
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: