08、Eureka源码分析:Eureka客户端的服务获取

【Eureka】【源码+图解】【七】Eureka的下线功能

目录

    1. 获取服务
    • 7.1 初始化HeartBeat的task
  • 7.2 将task进一步包装成定时timerTask
  • 7.3 定时时间到,执行timeTask
  • 7.4 task获得线程资源,执行refreshRegistry()
  • 7.5 服务端接受请求
  • 7.6 获取Applications

7. 获取服务

整体流程如下:
*

7.1 初始化HeartBeat的task

public class DiscoveryClient implements EurekaClient {
   
     
    private final ThreadPoolExecutor cacheRefreshExecutor;
    
    DiscoveryClient(...) {
   
     
        ......
        // 1. 初始化获取服务的线程池
        cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, 
            // eureka.client.cacheRefreshExecutorThreadPoolSize,注册信息更新最大线程数,默认2
            clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );
        ......
    }
    private void initScheduledTasks() {
   
     
        if (clientConfig.shouldFetchRegistry()) {
   
     
            // eureka.client.registryFetchIntervalSeconds,更新间隔,默认30秒
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            // eureka.client.cacheRefreshExecutorExponentialBackOffBound,更新超时最大倍数,默认10
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            // 3. 初始化获取服务的task
            cacheRefreshTask = new TimedSupervisorTask(
                    "cacheRefresh",
                    scheduler,
                    cacheRefreshExecutor,
                    registryFetchIntervalSeconds,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new CacheRefreshThread()
            );
            // 4. 开启定时任务
            scheduler.schedule(
                    cacheRefreshTask,
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }
        ......
    }
    
    // 2. 定义获取服务的线程
    class CacheRefreshThread implements Runnable {
   
     
        public void run() {
   
     
            refreshRegistry();
        }
    }
}

7.2 将task进一步包装成定时timerTask

public class TimedSupervisorTask extends TimerTask {
   
     
    public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
                               int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
   
     
        this.name = name;
        this.scheduler = scheduler; // 定时调度器
        this.executor = executor; // 任务执行线程池
        this.timeoutMillis = timeUnit.toMillis(timeout);
        this.task = task; // 具体的task,即CacheRefreshThread
        this.delay = new AtomicLong(timeoutMillis); // 定时时间
        this.maxDelay = timeoutMillis * expBackOffBound; // 最大定时时间
        ......
    }
}

7.3 定时时间到,执行timeTask

public class TimedSupervisorTask extends TimerTask {
   
     
    @Override
    public void run() {
   
     
        Future<?> future = null;
        try {
   
     
            // 1. 将CacheRefreshThread提交到线程池,并用future接收结果
            future = executor.submit(task);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            // 2. 阻塞等待结果,等待时间eureka.client.registryFetchIntervalSeconds
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);
            // 3. 设置下一次执行任务的时间
            delay.set(timeoutMillis);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            successCounter.increment();
        } catch (TimeoutException e) {
   
     
            timeoutCounter.increment();
            long currentDelay = delay.get();
            long newDelay = Math.min(maxDelay, currentDelay * 2);
            // 等待结果超时,下一次执行任务时间为2*currentDelay
            // 最大延时为eureka.client.registryFetchIntervalSeconds * eureka.client.cacheRefreshExecutorExponentialBackOffBound
            delay.compareAndSet(currentDelay, newDelay);
        } catch (RejectedExecutionException e) {
   
     
            rejectedCounter.increment();
        } catch (Throwable e) {
   
     
            throwableCounter.increment();
        } finally {
   
     
            if (future != null) {
   
     
                future.cancel(true);
            }
            if (!scheduler.isShutdown()) {
   
     
                // 4. 定时下一次任务
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }
}

7.4 task获得线程资源,执行refreshRegistry()

public class DiscoveryClient implements EurekaClient {
   
     
    class CacheRefreshThread implements Runnable {
   
     
        public void run() {
   
     
            refreshRegistry();
        }
    }
    @VisibleForTesting
    void refreshRegistry() {
   
     
        try {
   
     
            ......
            // 1. 动态获取最新的RemoteRegions
            String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
            ......
            // 2. 获取最新的服务实例
            boolean success = fetchRegistry(remoteRegionsModified);
            if (success) {
   
     
                registrySize = localRegionApps.get().size();
                lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
            }
            
        } catch (Throwable e) {
   
     
            logger.error("Cannot fetch registry from server", e);
        }
    }
    private boolean fetchRegistry(boolean forceFullRegistryFetch) {
   
     
        ......
            // 3. 获取最新的服务实例并更新到本地
                getAndStoreFullRegistry();
            ......
            applications.setAppsHashCode(applications.getReconcileHashCode());
        ......
        return true;
    }
    private void getAndStoreFullRegistry() throws Throwable {
   
     
        long currentUpdateGeneration = fetchRegistryGeneration.get();
        Applications apps = null;
        // 4. 发送http请求到服务端
        EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
   
     
            apps = httpResponse.getEntity();
        }

        if (apps == null) {
   
     
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
   
     
            // 5. 过滤掉状态非UP的实例并更新到localRegionApps,缓存
            localRegionApps.set(this.filterAndShuffle(apps));
        } else {
   
     
        }
    }
}

7.5 服务端接受请求

public class ApplicationsResource {
   
     
    @Inject
    ApplicationsResource(EurekaServerContext eurekaServer) {
   
     
        this.serverConfig = eurekaServer.getServerConfig();
        this.registry = eurekaServer.getRegistry();
        // 缓存为AbstractInstanceRegistry.responseCache
        this.responseCache = registry.getResponseCache();
    }
    @GET
    public Response getContainers(@PathParam("version") String version,
                                  @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                                  @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                                  @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                                  @Context UriInfo uriInfo,
                                  @Nullable @QueryParam("regions") String regionsStr) {
   
     
        
        boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
        String[] regions = null;
        if (!isRemoteRegionRequested) {
   
     
            EurekaMonitors.GET_ALL.increment();
        } else {
   
     
            // 1. 设置regions
            regions = regionsStr.toLowerCase().split(",");
            Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
            EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
        }
        if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
   
     
            return Response.status(Status.FORBIDDEN).build();
        }
        // 2. 设置Version,默认V2
        CurrentRequestVersion.set(Version.toEnum(version));
        // 3. 设置keyType
        KeyType keyType = Key.KeyType.JSON;
        String returnMediaType = MediaType.APPLICATION_JSON;
        if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
   
     
            keyType = Key.KeyType.XML;
            returnMediaType = MediaType.APPLICATION_XML;
        }
        // 4. Key的唯一性确定请参考前文(3.2.2.1节)
        Key cacheKey = new Key(Key.EntityType.Application,
                ResponseCacheImpl.ALL_APPS,
                keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
        );

        Response response;
        // 5. 从responseCache获取值,客户端默认getGZIP
        if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
   
     
            response = Response.ok(responseCache.getGZIP(cacheKey))
                    .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                    .header(HEADER_CONTENT_TYPE, returnMediaType)
                    .build();
        } else {
   
     
            response = Response.ok(responseCache.get(cacheKey))
                    .build();
        }
        CurrentRequestVersion.remove();
        return response;
    }
}

7.6 获取Applications

public class ResponseCacheImpl implements ResponseCache {
   
     
    public byte[] getGZIP(Key key) {
   
     
        // shouldUseReadOnlyResponseCache = eureka.server.useReadOnlyResponseCache, 默认true
        Value payload = getValue(key, shouldUseReadOnlyResponseCache);
        if (payload == null) {
   
     
            return null;
        }
        return payload.getGzipped();
    }
    Value getValue(final Key key, boolean useReadOnlyCache) {
   
     
        Value payload = null;
        try {
   
     
            // 默认true,不管是true或false,第一次获取都会走readWriteCacheMap.get(key)
            // 第一次需要load,因此会走到generatePayload,见(3.2.2.1节),不再赘述
            if (useReadOnlyCache) {
   
     
                final Value currentPayload = readOnlyCacheMap.get(key);
                if (currentPayload != null) {
   
     
                    payload = currentPayload;
                } else {
   
     
                    payload = readWriteCacheMap.get(key);
                    readOnlyCacheMap.put(key, payload);
                }
            } else {
   
     
                payload = readWriteCacheMap.get(key);
            }
        } catch (Throwable t) {
   
     
            logger.error("Cannot get value for key : {}", key, t);
        }
        return payload;
    }
}

关于Eureka的分析到此告一段落,接下来开始学习LoadBalancer

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: