03、Eureka源码分析:Eureka服务端启动过程(上)

【Eureka】【源码+图解】【二】Eureka客户端启动过程

目录

    • 3.2 服务端
    • 3.2.1 EurekaController
    • 3.2.2 PeerAwareInstanceRegistry
      • 3.2.2.1 初始化缓存
        • 读写缓存generatePayload
        • 只读缓存getCacheUpdateTask
      • 3.2.2.2 定时更新自我保护的阈值
      • 3.2.2.3 初始化其他区域的注册实例
        • 3.2.2.3.1 创建discoveryJerseyClient
        • 3.2.2.3.2 创建eurekaHttpClient
        • 3.2.2.3.3 定时更新remoteRegion的注册实例信息

3.2 服务端

HelloWorld总是如此简单,接下来就要去看看eureka server内部的工作原理了。

如果要研究它的源码,我们总得找到一个入口,服务端的代码我们就只加了一个@EnableEurekaServer就成功创建了一个server,跟着这个蛛丝马迹一探它的究竟

/**
 * Annotation to activate Eureka Server related configuration.
 * {@link EurekaServerAutoConfiguration}
 *
 * @author Dave Syer
 * @author Biju Kunjummen
 *
 */

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {
   
     

}

好吧,啥都没有?别急,我们看下注解,它是用来激活Eureka Server的相关配置的,还告诉我们是在哪个配置文件,我们先看下EurekaServerAutoConfiguration类图

*

配置类注册了几个和服务端工作相关的类,看不懂不要紧,我们暂时先放下,再详细看下EurekaServerAutoConfiguration的源码,当然不会看全部,先看下头部的注解

@Configuration(proxyBeanMethods = false)
// 7、它的作用主要是启动定时线程,定期检查客户端实例的状态,实现自我保护机制
@Import(EurekaServerInitializerConfiguration.class)
// 还记得前面的@EnableEurekaServer吗?它@Import(EurekaServerMarkerConfiguration.class)注入了Marker类
// 这就是为什么eureka server启动时要加@EnableEurekaServer的原因 
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
// 配置参数
@EnableConfigurationProperties({
   
      
    EurekaDashboardProperties.class,  // eureka.dashboard.*
    InstanceRegistryProperties.class, // eureka.instance.registry.*
	EurekaProperties.class }) // eureka.*
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
   
     
    ......
}

先介绍下这几个类的作用,看图
*

jerseyApplication:其实是它里面的各个xxxResource类,用于接收来自客户端以及其它服务端的请求

PeerAwareInstanceRegistry:处理请求

PeerEurekaNode:服务端与服务端相互复制实例信息

RemoteRegionRegistry:获取其他region的注册实例

EurekaServerContextEurekaServerBootstrapEurekaServerInitializerConfiguration则是用来初始化途中的四个类的。

3.2.1 EurekaController

先看配置

@EnableConfigurationProperties({
   
      EurekaDashboardProperties.class, ... })
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
   
     
    @Bean
	@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
	public EurekaController eurekaController(EurekaProperties eurekaProperties) {
   
     
		return new EurekaController(this.applicationInfoManager, eurekaProperties);
	}
}

先看它的配置信息类EurekaDashboardProperties

@ConfigurationProperties("eureka.dashboard")
public class EurekaDashboardProperties {
   
     
    // dashboard页面的跟path,
    // 前面的HelloWorld中我们直接输入http://localhost:3333/就可以显示,
    // 可配置eureka.dashboard.path修改
    private String path = "/";
    // 默认开启的,如果false我们就不能看到dashboard页面
    private boolean enabled = true;
}

再看EurekaController提供了两个page,第一个是默认,可以结合下面的图看;第二个是/lastn

@Controller
@RequestMapping("${eureka.dashboard.path:/}")
public class EurekaController {
   
     
    @Value("${eureka.dashboard.path:/}")
	private String dashboardPath = "";
    
	@RequestMapping(method = RequestMethod.GET)
	public String status(HttpServletRequest request, Map<String, Object> model) {
   
     
        // 自身基础信息
		populateBase(request, model);
        // 已向它注册的实例信息
		populateApps(model);
		StatusInfo statusInfo;
		......
		model.put("statusInfo", statusInfo);
		populateInstanceInfo(model, statusInfo);
		filterReplicas(model, statusInfo);
		return "eureka/status";
	}
    
	@RequestMapping(value = "/lastn", method = RequestMethod.GET)
	public String lastn(HttpServletRequest request, Map<String, Object> model) {
   
     
		......
		return "eureka/lastn";
	}    
}

*

3.2.2 PeerAwareInstanceRegistry

public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
   
     
	@Bean
	public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs) {
   
     
        // 强制初始化,还记得前面客户端的DiscoveryClient吗?成为一个Server之前你得先是一个Client,
        // 与客户端稍微不同的是,客户端的eurekaTransport.queryClient是RestTemplateEurekaHttpClient,
        // 而服务端的是JerseyApplicationClient
		this.eurekaClient.getApplications(); 
        // 注意,这里创建的是InstanceRegistry类,不是InstanceRegistry接口
		return new InstanceRegistry(this.eurekaServerConfig, // EurekaServerConfigBean,eureka.server.*的配置
                                    this.eurekaClientConfig, // EurekaClientConfigBean,eureka.client.*的配置
                                    serverCodecs, // 加密解密
                                    this.eurekaClient, // 作为客户端身份的信息
                // eureka.instance.registry.expectedNumberOfClientsSendingRenews,默认1
				this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
                // eureka.instance.registry.defaultOpenForTrafficCount,默认1
				this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
	}
}

先看下InstanceRegistry类图

*

先看下InstanceRegistry的接口,这些接口的方法名是不是很熟悉?和客户端RestTemplateEurekaHttpClient的方法一一对应,所以客户端发送的请求最终都会由InstanceRegistry受理,在这里可以简单把InstanceRegistry当做我们平时常见的service。从类图中我们看到PeerAwareInstanceRegistry接口定义了init()方法,对于这种initstart等方法都值得去看一下

public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
   
     
    @Override
    public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
   
     
        // 启动统计
        this.numberOfReplicationsLastMin.start();
        // 赋值副本节点
        this.peerEurekaNodes = peerEurekaNodes;
        // 1、初始化缓存
        initializedResponseCache();
        // 2、开启定时刷新
        scheduleRenewalThresholdUpdateTask();
        // 3、初始化
        initRemoteRegionRegistry();
        ......
    }
}

3.2.2.1 初始化缓存

ResponseCacheImpl主要是缓存注册信息,当客户端发出请求的时候可以从缓存中取,还记得客户端启动时的fetchRegistry吗?

public abstract class AbstractInstanceRegistry implements InstanceRegistry {
   
     
    
    @Override
    public synchronized void initializedResponseCache() {
   
     
        if (responseCache == null) {
   
     
            responseCache = new ResponseCacheImpl(serverConfig, serverCodecs, this);
        }
    }
}

先看下类图

*

最重要的是两个绿色的缓存数据结构,先看下他们的比较

*

他们之前所不同的是一个map,一个是LoadingCache,从名字中大概能看出一个是静态的,一个是动态的。map是通过一个定时任务维护的,而LoadingCache是每次拿值的时候动态更新,我们先看下ResponseCacheImpl构造函数

    ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
   
     
        this.serverConfig = serverConfig;
        this.serverCodecs = serverCodecs;
        // useReadOnlyResponseCache,使用只读缓存,默认true
        // 只读缓存:readOnlyCacheMap,每次读的时候先从自己里读,没有再去readWriteCacheMap读,读到值后更新
        // 读写缓存:readWriteCacheMap,每次读值都会去获取最新的Applications
        this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
        this.registry = registry;
        this.readWriteCacheMap =
                CacheBuilder.newBuilder()
            // eureka.server.initialCapacityOfResponseCache,默认1000,读写缓存大小
            .initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
            // eureka.server.responseCacheAutoExpirationInSeconds,缓存失效时间,默认180秒
            .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
            .removalListener(...) // 监听RemovalNotification,当收到通知时从regionSpecificKeys删除相应的信息
            .build(new CacheLoader<Key, Value>() {
   
     
                            @Override
                            public Value load(Key key) throws Exception {
   
     
                                if (key.hasRegions()) {
   
     
                                    Key cloneWithNoRegions = key.cloneWithoutRegions();
                                    regionSpecificKeys.put(cloneWithNoRegions, key);
                                }
                                // 1、根据key动态获取value
                                Value value = generatePayload(key);
                                return value;
                            }
                        });
        // eureka.server.responseCacheUpdateIntervalMs,只读缓存更新时间,默认30*1000毫秒,即30秒
		long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
        if (shouldUseReadOnlyResponseCache) {
   
     
            // 开启只读缓存定时更新任务
            timer.schedule(getCacheUpdateTask(), // 2、定时更新只读缓存
                    new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                            + responseCacheUpdateIntervalMs),
                    responseCacheUpdateIntervalMs);
        }
        ......
    }

读写缓存generatePayload

从构造函数中看到readWriteCacheMap的关键在于generatePayload(key)

public class ResponseCacheImpl implements ResponseCache {
   
     
	private Value generatePayload(Key key) {
   
     
        String payload;
        switch (key.getEntityType()) {
   
     
            case Application:
                boolean isRemoteRegionRequested = key.hasRegions();
                // key.getName()即appName
                // 目的就是获得Application或者Applications,对于从客户端来的请求key的name为ALL_APPS
                if (ALL_APPS.equals(key.getName())) {
   
     
                    // 不管isRemoteRegionRequested是true或false,最终都会走到registry.getApplicationsFromMultipleRegions
                    if (isRemoteRegionRequested) {
   
     
                        payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
                    } else {
   
     
                        payload = getPayLoad(key, registry.getApplications());
                    }
                } else if (ALL_APPS_DELTA.equals(key.getName())) {
   
     
                    ......
                } else {
   
     
                    payload = getPayLoad(key, registry.getApplication(key.getName()));
                }
                break;
            case VIP:
            case SVIP:
                payload = getPayLoad(key, getApplicationsForVip(key, registry));
                break;
            default:
                payload = "";
                break;
        }
        return new Value(payload);
    }
}

public abstract class AbstractInstanceRegistry implements InstanceRegistry {
   
     
	public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
   
     

        boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;
        ......
        Applications apps = new Applications();
        apps.setVersion(1L);
        for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
   
     
            // 先获取当前server的注册实例列表
            ......
        }
        if (includeRemoteRegion) {
   
     
            // 再获取remoteRegions的注册实例
            for (String remoteRegion : remoteRegions) {
   
     
                // 在服务端启动的时候会初始化regionNameVSRemoteRegistry,并且每个RemoteRegionRegistry会定时更新注册实例的信息,可参考后文
                RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
                if (null != remoteRegistry) {
   
     
                    Applications remoteApps = remoteRegistry.getApplications();
                    ......
                } else {
   
     
                    logger.warn("No remote registry available for the remote region {}", remoteRegion);
                }
            }
        }
        apps.setAppsHashCode(apps.getReconcileHashCode());
        return apps;
    }
}

只读缓存getCacheUpdateTask
    private TimerTask getCacheUpdateTask() {
   
     
        return new TimerTask() {
   
     
            @Override
            public void run() {
   
     
                for (Key key : readOnlyCacheMap.keySet()) {
   
     
                    try {
   
     
                        CurrentRequestVersion.set(key.getVersion());
                        // 如果readOnlyCacheMap与readWriteCacheMap的值,如果不相同则把readWriteCacheMap中的值更新到readOnlyCacheMap
                        Value cacheValue = readWriteCacheMap.get(key);
                        Value currentCacheValue = readOnlyCacheMap.get(key);
                        if (cacheValue != currentCacheValue) {
   
     
                            readOnlyCacheMap.put(key, cacheValue);
                        }
                    } catch (Throwable th) {
   
     
                    } finally {
   
     
                        CurrentRequestVersion.remove();
                    }
                }
            }
        };
    }

3.2.2.2 定时更新自我保护的阈值
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
   
     
	private void scheduleRenewalThresholdUpdateTask() {
   
     
        // 每隔15分钟更新自我保护的阈值,也就是如果续约的实例低于这个阈值就会发出预警
        timer.schedule(new TimerTask() {
   
     
                           @Override
                           public void run() {
   
     
                               updateRenewalThreshold();
                           }
                       }, serverConfig.getRenewalThresholdUpdateIntervalMs(),
                // eureka.server.renewalThresholdUpdateIntervalMs,默认15分钟
                serverConfig.getRenewalThresholdUpdateIntervalMs());
    }

    private void updateRenewalThreshold() {
   
     
        ......
            Applications apps = eurekaClient.getApplications();
            int count = 0;
        // 统计当前已注册的实例总数
            for (Application app : apps.getRegisteredApplications()) {
   
     
                for (InstanceInfo instance : app.getInstances()) {
   
     
                    if (this.isRegisterable(instance)) {
   
     
                        ++count;
                    }
                }
            }
            synchronized (lock) {
   
     
                if ((count) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfClientsSendingRenews)
                        || (!this.isSelfPreservationModeEnabled())) {
   
     
                    // 更新实例总数
                    this.expectedNumberOfClientsSendingRenews = count;
                    updateRenewsPerMinThreshold();
                }
            }
            ......
    }

	protected void updateRenewsPerMinThreshold() {
   
     
        // 这个值的作用时,server会每隔一段时间检查续约的实例,续约成功的实例个数要达到numberOfRenewsPerMinThreshold,不然会发出警告,即自我保护
        // 实例总数 * 单个实例1分钟需注册的次数 * 百分比
        this.numberOfRenewsPerMinThreshold = (int) (
            // 当前已注册的实例总数
            this.expectedNumberOfClientsSendingRenews 
            // eureka.server.expectedClientRenewalIntervalSeconds,客户端续约时间,默认30秒
            // 60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds(),1分钟要注册的次数
                * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
            // eureka.server.renewalPercentThreshold,续约比例不能低于这个值,默认0.85,否则就会发出警告
                * serverConfig.getRenewalPercentThreshold());
    }
}

3.2.2.3 初始化其他区域的注册实例

RemoteRegionRegistry的作用主要是获取运行在其他region的server的实例注册信息

public abstract class AbstractInstanceRegistry implements InstanceRegistry {
   
     
    protected void initRemoteRegionRegistry() throws MalformedURLException {
   
     
        // eureka.server.remoteRegionUrlsWithName,默认new HashMap<>(),key为region,value为region的url
        Map<String, String> remoteRegionUrlsWithName = serverConfig.getRemoteRegionUrlsWithName();
        if (!remoteRegionUrlsWithName.isEmpty()) {
   
     
            allKnownRemoteRegions = new String[remoteRegionUrlsWithName.size()];
            int remoteRegionArrayIndex = 0;
            for (Map.Entry<String, String> remoteRegionUrlWithName : remoteRegionUrlsWithName.entrySet()) {
   
     
            	// 初始化RemoteRegionRegistry,初始化完后会开启一个定时更新RemoteRegion的注册实例列表,详见后文
                RemoteRegionRegistry remoteRegionRegistry = new RemoteRegionRegistry(
                        serverConfig,
                        clientConfig,
                        serverCodecs,
                        remoteRegionUrlWithName.getKey(),
                        new URL(remoteRegionUrlWithName.getValue()));
                // 更新到regionNameVSRemoteRegistry
                regionNameVSRemoteRegistry.put(remoteRegionUrlWithName.getKey(), remoteRegionRegistry);
                allKnownRemoteRegions[remoteRegionArrayIndex++] = remoteRegionUrlWithName.getKey();
            }
        }
    }
}

先看下RemoteRegionRegistry类图

*

从图中可以看到,它的作用与客户单的DiscoveryClient和服务端的InstanceRegistry作用差不多,它负责的是与其他region的服务端的通信,再看下其构造函数。其构造函数代码过多,为了简便先看下创建流程图

*

着重分析下绿色的部分

3.2.2.3.1 创建discoveryJerseyClient

discoveryJerseyClient即EurekaJerseyClientImpl,作用是提供ApacheHttpClient4,用于获取其他region的注册实例

public class RemoteRegionRegistry implements LookupService<String> {
   
     
    public RemoteRegionRegistry(...) {
   
     
        ......
        EurekaJerseyClientBuilder clientBuilder = new EurekaJerseyClientBuilder()
                .withUserAgent("Java-EurekaClient-RemoteRegion")
                .withEncoderWrapper(serverCodecs.getFullJsonCodec())
                .withDecoderWrapper(serverCodecs.getFullJsonCodec())
            // eureka.server.remoteRegionConnectTimeoutMs,默认1000毫秒
                .withConnectionTimeout(serverConfig.getRemoteRegionConnectTimeoutMs())
            // eureka.server.remoteRegionReadTimeoutMs,默认1000毫秒
                .withReadTimeout(serverConfig.getRemoteRegionReadTimeoutMs())
            // eureka.server.remoteRegionTotalConnectionsPerHost,默认500
                .withMaxConnectionsPerHost(serverConfig.getRemoteRegionTotalConnectionsPerHost())
            // eureka.server.remoteRegionTotalConnections,默认1000
                .withMaxTotalConnections(serverConfig.getRemoteRegionTotalConnections())
            // eureka.server.remoteRegionConnectionIdleTimeoutSeconds,默认30秒
                .withConnectionIdleTimeout(serverConfig.getRemoteRegionConnectionIdleTimeoutSeconds());

        if (remoteRegionURL.getProtocol().equals("http")) {
   
     
            ......
        } else {
   
     
            clientBuilder.withClientName("Discovery-RemoteRegionSecureClient-" + regionName)
                    .withTrustStoreFile(
                // eureka.server.remoteRegionTrustStore,默认null
                            serverConfig.getRemoteRegionTrustStore(),
                // eureka.server.remoteRegionTrustStorePassword,默认changeit
                            serverConfig.getRemoteRegionTrustStorePassword()
                    );
        }
        discoveryJerseyClient = clientBuilder.build();
        discoveryApacheClient = discoveryJerseyClient.getClient();
    }
}

3.2.2.3.2 创建eurekaHttpClient

过程不再赘述,这里简单比较下不同角色所用的client,作用也是用于获取其他region的注册实例
*

3.2.2.3.3 定时更新remoteRegion的注册实例信息
public class RemoteRegionRegistry implements LookupService<String> {
   
     
    public RemoteRegionRegistry(EurekaServerConfig serverConfig,
                                EurekaClientConfig clientConfig,
                                ServerCodecs serverCodecs,
                                String regionName,
                                URL remoteRegionURL) {
   
     
        ......
        // 1、定义更新remote region的server的注册信息的task
        Runnable remoteRegionFetchTask = new Runnable() {
   
     
            @Override
            public void run() {
   
     
                // 具体逻辑,最终会到fetchRemoteRegistry函数
                    if (fetchRegistry()) {
   
     
                        readyForServingData = true;
                    }
            }
        };

        // 2、创建执行task的线程池
        ThreadPoolExecutor remoteRegionFetchExecutor = new ThreadPoolExecutor(
                1, 
            // eureka.server.remoteRegionFetchThreadPoolSize,默认20
            serverConfig.getRemoteRegionFetchThreadPoolSize(), 
            0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); 

        // 3、创建定时调度器
        scheduler = Executors.newScheduledThreadPool(1,
                new ThreadFactoryBuilder()
                        .setNameFormat("Eureka-RemoteRegionCacheRefresher_" + regionName + "-%d")
                        .setDaemon(true)
                        .build());

        // 4、开启定时任务,每次完成更新后再过30秒进行下一次更新,循环反复
        scheduler.schedule(
                new TimedSupervisorTask(
                        "RemoteRegionFetch_" + regionName,
                        scheduler,
                        remoteRegionFetchExecutor,
                    // eureka.server.remoteRegionRegistryFetchInterval, 默认30秒
                        serverConfig.getRemoteRegionRegistryFetchInterval(),
                        TimeUnit.SECONDS,
                        5,  // 最大超时倍数
                        remoteRegionFetchTask
                ),
                serverConfig.getRemoteRegionRegistryFetchInterval(), TimeUnit.SECONDS);
    }
    
    private Applications fetchRemoteRegistry(boolean delta) {
   
     
        // eureka.server.experimental.transport.enabled,默认false
        if (shouldUseExperimentalTransport()) {
   
     
            ......
                // 最终由JerseyApplicationClient发出http请求
                EurekaHttpResponse<Applications> httpResponse = delta ? eurekaHttpClient.getDelta() : eurekaHttpClient.getApplications();
            ......
        } else {
   
     
            ......
                // delta为false
                String urlPath = delta ? "apps/delta" : "apps/";
            // discoveryApacheClient为ApacheHttpClient4
                response = discoveryApacheClient.resource(this.remoteRegionURL + urlPath)
                        .accept(MediaType.APPLICATION_JSON_TYPE)
                        .get(ClientResponse.class);
            ......
        }
        return null;
    }
}

未完待续

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: