【Eureka】【源码+图解】【三】Eureka服务端启动过程(上)
目录
-
-
- 3.2.3 PeerEurekaNodes
-
- 3.2.3.1 获取副本serviceUrl
- 3.2.3.2 创建副本节点
- 3.2.4 EurekaServerContext
-
- initialize
- shutdown
- 3.2.5 EurekaServerBootstrap
-
- 3.2.5.1 向服务端获取注册实例
- 3.2.5.2 服务剔除
- 3.2.6 jerseyApplication
- 3.2.7 EurekaServerInitializerConfiguration
-
3.2.3 PeerEurekaNodes
PeerEurekaNodes用于服务端之间相互复制,保证Eureka的高可用性
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
@Bean
@ConditionalOnMissingBean
public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs,
ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
return new RefreshablePeerEurekaNodes(
registry, // 上一步的'server'
this.eurekaServerConfig, // EurekaServerConfigBean,eureka.server.*的配置
this.eurekaClientConfig, // EurekaClientConfigBean,eureka.client.*的配置
serverCodecs,
this.applicationInfoManager, // 客户端介绍过
replicationClientAdditionalFilters); // 过滤器
}
}
老惯例先看RefreshablePeerEurekaNodes的类图
PeerEurekaNode
代表一个副本server,另一个需要同步信息的副本节点;PeerEurekaNodes
维护一堆PeerEurekaNode
RefreshablePeerEurekaNodes
的构造函数中没有什么值得关注的代码,但是父类中有个start()
方法,这才是它的关键,先看下它做了什么
public class PeerEurekaNodes {
public void start() {
taskExecutor = Executors.newSingleThreadScheduledExecutor(...);
......
// 首次更新,1、先从配置中获取副本server的URL,2、再创建副本server对象
updatePeerEurekaNodes(resolvePeerUrls());
// 定时更新任务
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
......
// 更新副本节点
updatePeerEurekaNodes(resolvePeerUrls());
......
}
};
// 开启定时任务
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
// eureka.server.peerEurekaNodesUpdateIntervalMs,默认10分钟
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
......
}
}
3.2.3.1 获取副本serviceUrl
protected List<String> resolvePeerUrls() {
InstanceInfo myInfo = applicationInfoManager.getInfo();
// eureka.client.region,默认"us-east-1"
// eureka.client.availabilityZones,默认空map
// eureka.client.serviceUrl,默认{defaultZone=http://localhost:8761/eureka/}
// 1、从配置文件获取region
// 2、根据region从availabilityZones中获取zones
// 3、根据zones从serviceUrl获取replicaUrls
String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
List<String> replicaUrls = EndpointUtils
.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
int idx = 0;
// 剔除自身
while (idx < replicaUrls.size()) {
if (isThisMyUrl(replicaUrls.get(idx))) {
replicaUrls.remove(idx);
} else {
idx++;
}
}
return replicaUrls;
}
举个栗子
application.yml
eureka:
client:
register-with-eureka: false
region: china
availabilityZones:
cn: beijing,shanghai
us: washington, newyork
serviceUrl:
shanghai: http://shanghai1/, http://shanghai2/
beijing: http://beijing/
newyork: http://newyork1/, http://newyork2/
1、 先获得region为cn;
2、 根据region获得zones为beijing,shanghai;
3、 根据zones从serviceUrl获得http://shanghai1/,http://shanghai2/,http://beijing/;
3.2.3.2 创建副本节点
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
......
List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
// 关掉不可用的节点
if (!toShutdown.isEmpty()) {
......
}
// 创建新节点
if (!toAdd.isEmpty()) {
for (String peerUrl : toAdd) {
newNodeList.add(createPeerEurekaNode(peerUrl));
}
}
this.peerEurekaNodes = newNodeList;
this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}
protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
// 看下面的类图
HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
String targetHost = hostFromUrl(peerEurekaNodeUrl);
if (targetHost == null) {
targetHost = "host";
}
return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
}
还记得蓝色的类?他们是客户端与服务端通信的client,而红色的则是服务端与服务端同步信息用的,顶层接口是一样的。再看下replicationClient
的创建过程
public class JerseyReplicationClient extends AbstractJerseyEurekaHttpClient implements HttpReplicationClient {
public static JerseyReplicationClient createReplicationClient(EurekaServerConfig config, ServerCodecs serverCodecs, String serviceUrl) {
EurekaJerseyClient jerseyClient;
......
EurekaJerseyClientBuilder clientBuilder = new EurekaJerseyClientBuilder()
// "Discovery-PeerNodeClient-" + hostname
.withClientName(jerseyClientName)
.withUserAgent("Java-EurekaClient-Replication")
.withEncoderWrapper(serverCodecs.getFullJsonCodec())
.withDecoderWrapper(serverCodecs.getFullJsonCodec())
// eureka.server.peerNodeConnectTimeoutMs,连接超时时间,默认200毫秒
.withConnectionTimeout(config.getPeerNodeConnectTimeoutMs())
// eureka.server.peerNodeReadTimeoutMs,读超时时间,默认200毫秒
.withReadTimeout(config.getPeerNodeReadTimeoutMs())
// eureka.server.peerNodeTotalConnectionsPerHost,单节点最大连接数,默认500
.withMaxConnectionsPerHost(config.getPeerNodeTotalConnectionsPerHost())
// eureka.server.peerNodeTotalConnections,最大连接数,默认1000
.withMaxTotalConnections(config.getPeerNodeTotalConnections())
// eureka.server.peerNodeConnectionIdleTimeoutSeconds,连接空闲超时时间,默认30秒
.withConnectionIdleTimeout(config.getPeerNodeConnectionIdleTimeoutSeconds());
......
jerseyClient = clientBuilder.build();
......
// 过滤器
ApacheHttpClient4 jerseyApacheClient = jerseyClient.getClient();
jerseyApacheClient.addFilter(new DynamicGZIPContentEncodingFilter(config));
EurekaServerIdentity identity = new EurekaServerIdentity(ip);
jerseyApacheClient.addFilter(new EurekaIdentityHeaderFilter(identity));
return new JerseyReplicationClient(jerseyClient, serviceUrl);
}
}
start()
方法介绍到此,它的作用就是创建并定时更新副本server节点,需要主动调用,老办法,打断点调试,是在DefaultEurekaServerContext.initialize()
被调用,这里我们先不去分析,下一步就到它了。
3.2.4 EurekaServerContext
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
@Bean
@ConditionalOnMissingBean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry,
PeerEurekaNodes peerEurekaNodes) {
return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes,
this.applicationInfoManager);
}
}
从名字可以看出它就是一个上下文,充当工具类,提供当前server的组成类并进行一定的初始化,先看下类图
默认实现类DefaultEurekaServerContext
的成员参数是不是很熟悉?我们看下两个比较重要的函数initialize()
、shutdown()
initialize
// 初始化后执行
@PostConstruct
@Override
public void initialize() {
// 3.2.3的start()函数在这里正式启动
peerEurekaNodes.start();
try {
// 3.2.2 PeerAwareInstanceRegistry.init()初始化
registry.init(peerEurekaNodes);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
shutdown
// 注销时的清理工作
@PreDestroy
@Override
public void shutdown() {
// 关掉server
registry.shutdown();
// 关掉副本节点
peerEurekaNodes.shutdown();
// 关闭信息统计
ServoControl.shutdown();
EurekaMonitors.shutdown();
}
3.2.5 EurekaServerBootstrap
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
@Bean
public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
EurekaServerContext serverContext) {
return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig,
registry, serverContext);
}
}
这个类只是用了构造函数,其只是简单赋值。再看下它的public
函数,发现了contextInitialized
这个初始化函数,我们看下它做了什么
public class EurekaServerBootstrap {
public void contextInitialized(ServletContext context) {
......
initEurekaServerContext();
......
}
protected void initEurekaServerContext() throws Exception {
......
EurekaServerContextHolder.initialize(this.serverContext);
......
int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
......
}
}
看下它的整个流程图,再着重分析要点
重点看下绿色的两部分
3.2.5.1 向服务端获取注册实例
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
@Override
public int syncUp() {
int count = 0;
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
......
// eurekaClient即DiscoveryClient, 获取eureka.client.fetchRemoteRegionsRegistry上的Applications
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
......
if (isRegisterable(instance)) {
// 将remoteRegion的实例存到本地,实现同步
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
......
}
}
}
return count;
}
}
3.2.5.2 服务剔除
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
// EvictionTask主要用于服务端实现服务剔除以及自我保护
evictionTaskRef.set(new EvictionTask());
evictionTimer.schedule(evictionTaskRef.get(),
// 定期检查,eureka.server.evictionIntervalTimerInMs,默认60秒
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
}
接下来们看下EvictionTask
中做了什么
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
public void evict(long additionalLeaseMs) {
if (!isLeaseExpirationEnabled()) {
return;
}
// 需满足下面两个条件才能进行服务剔除
// eureka.server.enableSelfPreservation = true,默认为true,是否开启自我保护模式
// renewsLastMin.getCount() < numberOfRenewsPerMinThreshold,
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
// 搜集过期的实例instance,
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
// 默认情况下90秒内如果客户端没向服务端完成续约就会被认为过期
// 可通过eureka.instance.leaseExpirationDurationInSeconds设置
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// 当前注册过的实例instance总数
int registrySize = (int) getLocalRegistrySize();
// eureka.server.renewalPercentThreshold,默认0.85
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
// 剔除实例个数的上限
int evictionLimit = registrySize - registrySizeThreshold;
// 实际剔除个数取两者较小值
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// 随机从expiredLeases选一个剔除
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
// 剔除实例
// 1、从registry删除
// 2、添加到recentCanceledQueue
// 3、从overriddenInstanceStatusMap删除
// 4、添加到recentlyChangedQueue
// 5、从缓存responseCache中删除
internalCancel(appName, id, false);
}
}
}
}
至此就分析完了至此EurekaServerBootstrap
的contextInitialized
,总结下它主要做了两件事:1、同步其他节点的实例信息 2、开启服务剔除的定时task。那么它什么时候被调用呢?暂且不表,且往下看
3.2.6 jerseyApplication
jerseyApplication
的作用:注册Controller
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
@Bean
public javax.ws.rs.core.Application jerseyApplication(Environment environment, ResourceLoader resourceLoader) {
ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false,
environment);
// 只包含@Path和@Provider的类
provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
// EUREKA_PACKAGES = new String[] { "com.netflix.discovery", "com.netflix.eureka" }
// 只在这些包查找
// 最终查找结果如下,这些类类似于我们平时常见的Controller,用于处理请求
// [class com.netflix.eureka.resources.InstancesResource,
// class com.netflix.eureka.resources.ApplicationsResource,
// class com.netflix.eureka.resources.VIPResource,
// class com.netflix.eureka.resources.StatusResource,
// class com.netflix.discovery.provider.DiscoveryJerseyProvider,
// class com.netflix.eureka.resources.PeerReplicationResource,
// class com.netflix.eureka.resources.SecureVIPResource,
// class com.netflix.eureka.resources.ASGResource,
// class com.netflix.eureka.resources.ServerInfoResource]
Set<Class<?>> classes = new HashSet<>();
for (String basePackage : EUREKA_PACKAGES) {
Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
for (BeanDefinition bd : beans) {
Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(), resourceLoader.getClassLoader());
classes.add(cls);
}
}
// Construct the Jersey ResourceConfig
Map<String, Object> propsAndFeatures = new HashMap<>();
propsAndFeatures.put(
// Skip static content used by the webapp
ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");
DefaultResourceConfig rc = new DefaultResourceConfig(classes);
rc.setPropertiesAndFeatures(propsAndFeatures);
return rc;
}
}
上述这些xxxResource类主要用于接收客户端的请求,常用的API如下
3.2.7 EurekaServerInitializerConfiguration
// 还记得之前客户端启动时介绍的SmartLifecycle吗?在Spring启动的过程中会调用它的start()方法
public class EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle, Ordered {
@Override
public void start() {
new Thread(() -> {
try {
// eurekaServerBootstrap.contextInitialized的初始化在这里被调用
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
EurekaServerInitializerConfiguration.this.running = true;
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}).start();
}
}
最后总结下服务端启动的过程中都做了什么
1、 初始化dashboard界面API;
2、 初始化服务端;
3、 创建用于相互复制的服务端实例,并定期更新这些实例;
4、 初始化客户端实例的缓存;
5、 开启定时更新自我保护的续约实例阈值的任务;
6、 初始化remoteRegion,获取这些region的注册实例信息并定期更新;
7、 初始化服务端的API;
8、 开启定时服务剔除的任务;
好了,客户端和服务端都已经启动了,接下来我们就可以分析具体的功能了
未完待续
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: