04、Eureka源码分析:Eureka服务端启动过程(下)

【Eureka】【源码+图解】【三】Eureka服务端启动过程(上)

目录

      • 3.2.3 PeerEurekaNodes
      • 3.2.3.1 获取副本serviceUrl
      • 3.2.3.2 创建副本节点
    • 3.2.4 EurekaServerContext
      • initialize
      • shutdown
    • 3.2.5 EurekaServerBootstrap
      • 3.2.5.1 向服务端获取注册实例
      • 3.2.5.2 服务剔除
    • 3.2.6 jerseyApplication
    • 3.2.7 EurekaServerInitializerConfiguration

3.2.3 PeerEurekaNodes

PeerEurekaNodes用于服务端之间相互复制,保证Eureka的高可用性

public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
   
     
	@Bean
	@ConditionalOnMissingBean
	public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs,
			ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
   
     
		return new RefreshablePeerEurekaNodes(
            registry, // 上一步的'server'
            this.eurekaServerConfig, // EurekaServerConfigBean,eureka.server.*的配置
            this.eurekaClientConfig, // EurekaClientConfigBean,eureka.client.*的配置
            serverCodecs,
			this.applicationInfoManager, // 客户端介绍过
            replicationClientAdditionalFilters); // 过滤器
	}
}

老惯例先看RefreshablePeerEurekaNodes的类图

*

PeerEurekaNode代表一个副本server,另一个需要同步信息的副本节点;PeerEurekaNodes维护一堆PeerEurekaNode

RefreshablePeerEurekaNodes的构造函数中没有什么值得关注的代码,但是父类中有个start()方法,这才是它的关键,先看下它做了什么

public class PeerEurekaNodes {
   
     
    public void start() {
   
     
        taskExecutor = Executors.newSingleThreadScheduledExecutor(...);
        ......
            // 首次更新,1、先从配置中获取副本server的URL,2、再创建副本server对象
            updatePeerEurekaNodes(resolvePeerUrls());
        // 定时更新任务
            Runnable peersUpdateTask = new Runnable() {
   
     
                @Override
                public void run() {
   
     
                    ......
                        // 更新副本节点
                        updatePeerEurekaNodes(resolvePeerUrls());
                    ......
                }
            };
        // 开启定时任务
            taskExecutor.scheduleWithFixedDelay(
                    peersUpdateTask,
                // eureka.server.peerEurekaNodesUpdateIntervalMs,默认10分钟
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    TimeUnit.MILLISECONDS
            );
        ......
    }
}

3.2.3.1 获取副本serviceUrl
    protected List<String> resolvePeerUrls() {
   
     
        InstanceInfo myInfo = applicationInfoManager.getInfo();
        // eureka.client.region,默认"us-east-1"
        // eureka.client.availabilityZones,默认空map
        // eureka.client.serviceUrl,默认{defaultZone=http://localhost:8761/eureka/}
        // 1、从配置文件获取region
        // 2、根据region从availabilityZones中获取zones
        // 3、根据zones从serviceUrl获取replicaUrls
        String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
        List<String> replicaUrls = EndpointUtils
                .getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));

        int idx = 0;
        // 剔除自身
        while (idx < replicaUrls.size()) {
   
     
            if (isThisMyUrl(replicaUrls.get(idx))) {
   
     
                replicaUrls.remove(idx);
            } else {
   
     
                idx++;
            }
        }
        return replicaUrls;
    }

举个栗子

application.yml

eureka:
  client:
    register-with-eureka: false
    region: china
    availabilityZones:
      cn: beijing,shanghai
      us: washington, newyork 
    serviceUrl: 
      shanghai: http://shanghai1/, http://shanghai2/
      beijing: http://beijing/
      newyork: http://newyork1/, http://newyork2/

1、 先获得region为cn;
2、 根据region获得zones为beijing,shanghai;
3、 根据zones从serviceUrl获得http://shanghai1/,http://shanghai2/,http://beijing/;

3.2.3.2 创建副本节点
    protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
   
     
        ......
        List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
        // 关掉不可用的节点
        if (!toShutdown.isEmpty()) {
   
     
            ......
        }
        // 创建新节点
        if (!toAdd.isEmpty()) {
   
     
            for (String peerUrl : toAdd) {
   
     
                newNodeList.add(createPeerEurekaNode(peerUrl));
            }
        }
        this.peerEurekaNodes = newNodeList;
        this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
    }

    protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
   
     
        // 看下面的类图
        HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
        String targetHost = hostFromUrl(peerEurekaNodeUrl);
        if (targetHost == null) {
   
     
            targetHost = "host";
        }
        return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
    }

*

还记得蓝色的类?他们是客户端与服务端通信的client,而红色的则是服务端与服务端同步信息用的,顶层接口是一样的。再看下replicationClient的创建过程

public class JerseyReplicationClient extends AbstractJerseyEurekaHttpClient implements HttpReplicationClient {
   
     
    public static JerseyReplicationClient createReplicationClient(EurekaServerConfig config, ServerCodecs serverCodecs, String serviceUrl) {
   
     
        EurekaJerseyClient jerseyClient;
            ......
            EurekaJerseyClientBuilder clientBuilder = new EurekaJerseyClientBuilder()
                // "Discovery-PeerNodeClient-" + hostname
                    .withClientName(jerseyClientName)
                    .withUserAgent("Java-EurekaClient-Replication")
                    .withEncoderWrapper(serverCodecs.getFullJsonCodec())
                    .withDecoderWrapper(serverCodecs.getFullJsonCodec())
                // eureka.server.peerNodeConnectTimeoutMs,连接超时时间,默认200毫秒
                    .withConnectionTimeout(config.getPeerNodeConnectTimeoutMs())
                // eureka.server.peerNodeReadTimeoutMs,读超时时间,默认200毫秒
                    .withReadTimeout(config.getPeerNodeReadTimeoutMs())
                // eureka.server.peerNodeTotalConnectionsPerHost,单节点最大连接数,默认500
                    .withMaxConnectionsPerHost(config.getPeerNodeTotalConnectionsPerHost())
                // eureka.server.peerNodeTotalConnections,最大连接数,默认1000
                    .withMaxTotalConnections(config.getPeerNodeTotalConnections())
                // eureka.server.peerNodeConnectionIdleTimeoutSeconds,连接空闲超时时间,默认30秒
                    .withConnectionIdleTimeout(config.getPeerNodeConnectionIdleTimeoutSeconds());
            ......
            jerseyClient = clientBuilder.build();
        ......
            // 过滤器
        ApacheHttpClient4 jerseyApacheClient = jerseyClient.getClient();
        jerseyApacheClient.addFilter(new DynamicGZIPContentEncodingFilter(config));
        EurekaServerIdentity identity = new EurekaServerIdentity(ip);
        jerseyApacheClient.addFilter(new EurekaIdentityHeaderFilter(identity));

        return new JerseyReplicationClient(jerseyClient, serviceUrl);
    }
}

start()方法介绍到此,它的作用就是创建并定时更新副本server节点,需要主动调用,老办法,打断点调试,是在DefaultEurekaServerContext.initialize()被调用,这里我们先不去分析,下一步就到它了。

3.2.4 EurekaServerContext

public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
   
     
	@Bean
	@ConditionalOnMissingBean
	public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry,
			PeerEurekaNodes peerEurekaNodes) {
   
     
		return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes,
				this.applicationInfoManager);
	}
}

从名字可以看出它就是一个上下文,充当工具类,提供当前server的组成类并进行一定的初始化,先看下类图

*

默认实现类DefaultEurekaServerContext的成员参数是不是很熟悉?我们看下两个比较重要的函数initialize()shutdown()

initialize
    // 初始化后执行
	@PostConstruct
    @Override
    public void initialize() {
   
     
        // 3.2.3的start()函数在这里正式启动
        peerEurekaNodes.start();
        try {
   
     
            // 3.2.2 PeerAwareInstanceRegistry.init()初始化
            registry.init(peerEurekaNodes);
        } catch (Exception e) {
   
     
            throw new RuntimeException(e);
        }
    }

shutdown
    // 注销时的清理工作
	@PreDestroy
    @Override
    public void shutdown() {
   
     
        // 关掉server
        registry.shutdown();
        // 关掉副本节点
        peerEurekaNodes.shutdown();
        // 关闭信息统计
        ServoControl.shutdown();
        EurekaMonitors.shutdown();
    }

3.2.5 EurekaServerBootstrap

public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
   
     
    @Bean
	public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
			EurekaServerContext serverContext) {
   
     
		return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig,
				registry, serverContext);
	}
}

这个类只是用了构造函数,其只是简单赋值。再看下它的public函数,发现了contextInitialized这个初始化函数,我们看下它做了什么

public class EurekaServerBootstrap {
   
     
    public void contextInitialized(ServletContext context) {
   
     
		......
			initEurekaServerContext();
            ......
	}
    protected void initEurekaServerContext() throws Exception {
   
     
		......
		EurekaServerContextHolder.initialize(this.serverContext);
        ......
		int registryCount = this.registry.syncUp();
		this.registry.openForTraffic(this.applicationInfoManager, registryCount);
        ......
	}
}

看下它的整个流程图,再着重分析要点

*

重点看下绿色的两部分

3.2.5.1 向服务端获取注册实例
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
   
     
    @Override
    public int syncUp() {
   
     
        int count = 0;
        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
   
     
            ......
                // eurekaClient即DiscoveryClient, 获取eureka.client.fetchRemoteRegionsRegistry上的Applications
            Applications apps = eurekaClient.getApplications();
            for (Application app : apps.getRegisteredApplications()) {
   
     
                for (InstanceInfo instance : app.getInstances()) {
   
     
                    ......
                        if (isRegisterable(instance)) {
   
     
                            // 将remoteRegion的实例存到本地,实现同步
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    ......
                }
            }
        }
        return count;
    }
}

3.2.5.2 服务剔除
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
   
     
	protected void postInit() {
   
     
        renewsLastMin.start();
        if (evictionTaskRef.get() != null) {
   
     
            evictionTaskRef.get().cancel();
        }
        // EvictionTask主要用于服务端实现服务剔除以及自我保护
        evictionTaskRef.set(new EvictionTask());
        evictionTimer.schedule(evictionTaskRef.get(),
                // 定期检查,eureka.server.evictionIntervalTimerInMs,默认60秒
                serverConfig.getEvictionIntervalTimerInMs(),
                serverConfig.getEvictionIntervalTimerInMs());
    }
}

接下来们看下EvictionTask中做了什么

public abstract class AbstractInstanceRegistry implements InstanceRegistry {
   
     
    public void evict(long additionalLeaseMs) {
   
     
        if (!isLeaseExpirationEnabled()) {
   
     
            return;
        }
        // 需满足下面两个条件才能进行服务剔除
        // eureka.server.enableSelfPreservation = true,默认为true,是否开启自我保护模式
        // renewsLastMin.getCount() < numberOfRenewsPerMinThreshold,
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        // 搜集过期的实例instance,
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
   
     
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
   
     
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
   
     
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    // 默认情况下90秒内如果客户端没向服务端完成续约就会被认为过期
                    // 可通过eureka.instance.leaseExpirationDurationInSeconds设置
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
   
     
                        expiredLeases.add(lease);
                    }
                }
            }
        }
        // 当前注册过的实例instance总数
        int registrySize = (int) getLocalRegistrySize();
        // eureka.server.renewalPercentThreshold,默认0.85
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        // 剔除实例个数的上限
        int evictionLimit = registrySize - registrySizeThreshold;
        // 实际剔除个数取两者较小值
        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
   
     
            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
   
     
                // 随机从expiredLeases选一个剔除
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);
                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                // 剔除实例
                // 1、从registry删除
                // 2、添加到recentCanceledQueue
                // 3、从overriddenInstanceStatusMap删除
                // 4、添加到recentlyChangedQueue
                // 5、从缓存responseCache中删除
                internalCancel(appName, id, false);
            }
        }
    }
}

至此就分析完了至此EurekaServerBootstrapcontextInitialized,总结下它主要做了两件事:1、同步其他节点的实例信息 2、开启服务剔除的定时task。那么它什么时候被调用呢?暂且不表,且往下看

3.2.6 jerseyApplication

jerseyApplication的作用:注册Controller

public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
   
     
    @Bean
	public javax.ws.rs.core.Application jerseyApplication(Environment environment, ResourceLoader resourceLoader) {
   
     

		ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false,
				environment);

		// 只包含@Path和@Provider的类
		provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
		provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
		// EUREKA_PACKAGES = new String[] { "com.netflix.discovery", "com.netflix.eureka" }
        // 只在这些包查找
        // 最终查找结果如下,这些类类似于我们平时常见的Controller,用于处理请求
        // [class com.netflix.eureka.resources.InstancesResource, 
        // class com.netflix.eureka.resources.ApplicationsResource, 
        // class com.netflix.eureka.resources.VIPResource, 
        // class com.netflix.eureka.resources.StatusResource, 
        // class com.netflix.discovery.provider.DiscoveryJerseyProvider, 
        // class com.netflix.eureka.resources.PeerReplicationResource, 
        // class com.netflix.eureka.resources.SecureVIPResource, 
        // class com.netflix.eureka.resources.ASGResource, 
        // class com.netflix.eureka.resources.ServerInfoResource]
		Set<Class<?>> classes = new HashSet<>();
		for (String basePackage : EUREKA_PACKAGES) {
   
     
			Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
			for (BeanDefinition bd : beans) {
   
     
				Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(), resourceLoader.getClassLoader());
				classes.add(cls);
			}
		}
		// Construct the Jersey ResourceConfig
		Map<String, Object> propsAndFeatures = new HashMap<>();
		propsAndFeatures.put(
				// Skip static content used by the webapp
				ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
				EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");
		DefaultResourceConfig rc = new DefaultResourceConfig(classes);
		rc.setPropertiesAndFeatures(propsAndFeatures);
		return rc;
	}
}

上述这些xxxResource类主要用于接收客户端的请求,常用的API如下

*

3.2.7 EurekaServerInitializerConfiguration

// 还记得之前客户端启动时介绍的SmartLifecycle吗?在Spring启动的过程中会调用它的start()方法
public class EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle, Ordered {
   
     
    @Override
	public void start() {
   
     
		new Thread(() -> {
   
     
			try {
   
     
				// eurekaServerBootstrap.contextInitialized的初始化在这里被调用
				eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
				log.info("Started Eureka Server");

				publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
				EurekaServerInitializerConfiguration.this.running = true;
				publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
			}
			catch (Exception ex) {
   
     
				// Help!
				log.error("Could not initialize Eureka servlet context", ex);
			}
		}).start();
	}
}

最后总结下服务端启动的过程中都做了什么

1、 初始化dashboard界面API;
2、 初始化服务端;
3、 创建用于相互复制的服务端实例,并定期更新这些实例;
4、 初始化客户端实例的缓存;
5、 开启定时更新自我保护的续约实例阈值的任务;
6、 初始化remoteRegion,获取这些region的注册实例信息并定期更新;
7、 初始化服务端的API;
8、 开启定时服务剔除的任务;

好了,客户端和服务端都已经启动了,接下来我们就可以分析具体的功能了

未完待续

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: