【Eureka】【源码+图解】【四】Eureka服务端启动过程(下)
目录
-
- 注册
-
- 4.1 初始化register任务
- 4.2 开启register的定时任务
- 4.3 执行register的定时任务
- 4.4 客户端发送register请求
- 4.5 服务端收到客户端register请求
- 4.6 register实例
- 4.7 向其它server节点同步
-
- 生产task
- 消费task,生产batch
- 消费batch
4. 注册
先看下整体流程
4.1 初始化register任务
DiscoveryClient
中instanceInfoReplicator
的定义
public class DiscoveryClient implements EurekaClient {
private InstanceInfoReplicator instanceInfoReplicator;
DiscoveryClient(...) {
......
initScheduledTasks();
......
}
private void initScheduledTasks() {
......
if (clientConfig.shouldRegisterWithEureka()) {
......
// 1. 初始化instanceInfoReplicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
// eureka.client.instanceInfoReplicationIntervalSeconds,周期性向服务端更新自己的信息,默认30秒
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2);
......
}
......
// 2. 开启注册任务
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
......
}
}
InstanceInfoReplicator
的构造函数
InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {
this.discoveryClient = discoveryClient;
// 当前微服务实例
this.instanceInfo = instanceInfo;
// 定时线程池
this.scheduler = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d")
.setDaemon(true) // 创建线程为守护线程
.build());
// 定时任务的引用,主动更新时用来判断是否取消上一个定时任务
this.scheduledPeriodicRef = new AtomicReference<Future>();
this.started = new AtomicBoolean(false);
// 限速器,限制主动更新的次数
this.rateLimiter = new RateLimiter(TimeUnit.MINUTES);
// 微服务实例更新间隔
this.replicationIntervalSeconds = replicationIntervalSeconds;
this.burstSize = burstSize;
// 1分钟允许主动更新的次数
this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds;
}
4.2 开启register的定时任务
class InstanceInfoReplicator implements Runnable {
private final AtomicBoolean started;
public void start(int initialDelayMs) {
if (started.compareAndSet(false, true)) {
instanceInfo.setIsDirty(); // 首次注册
// 客户端启动后eureka.client.instanceInfoReplicationIntervalSeconds秒后开始向server注册
// 因此客户端不是在启动的时候就向server注册,而是有一定延迟
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
}
4.3 执行register的定时任务
class InstanceInfoReplicator implements Runnable {
public void run() {
try {
// 1、获取当前微服务实例的最新信息:
// a、DataCenter
// b、续约信息,包括:续约间隔eureka.instance.leaseRenewalIntervalInSeconds,
// 过期时间eureka.instance.leaseExpirationDurationInSeconds
// c、实例状态InstanceStatus
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
// 2、向server端注册
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
// 安排下一次register任务,即周期性向服务端更新自己的最新信息
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
}
public class DiscoveryClient implements EurekaClient {
boolean register() throws Throwable {
EurekaHttpResponse<Void> httpResponse;
try {
// 最终会调用RestTemplateEurekaHttpClient.register(...)
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
throw e;
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
}
4.4 客户端发送register请求
public class RestTemplateEurekaHttpClient implements EurekaHttpClient {
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = serviceUrl + "apps/" + info.getAppName();
HttpHeaders headers = new HttpHeaders();
headers.add(HttpHeaders.ACCEPT_ENCODING, "gzip");
headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
// 使用restTemplate发送/apps/appName到server进行注册
ResponseEntity<Void> response = restTemplate.exchange(urlPath, HttpMethod.POST, new HttpEntity<>(info, headers),
Void.class);
return anEurekaHttpResponse(response.getStatusCodeValue()).headers(headersOf(response)).build();
}
}
4.5 服务端收到客户端register请求
public class ApplicationResource {
@POST
@Consumes({
"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
......
// 从客户端过来的请求isReplication为null,因此需要向其它server同步信息
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
}
4.6 register实例
服务端将客户端信息注册到缓存中
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
read.lock();
try {
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
......
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
if (existingLease != null && (existingLease.getHolder() != null)) {
// 如果当前的registrant不是最新的,则获取最新的
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
registrant = existingLease.getHolder();
}
} else {
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// 1. 记录续约实例的个数,更新数据以便服务端进行服务剔除及自我保护时使用
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
updateRenewsPerMinThreshold();
}
}
}
Lease<InstanceInfo> lease = new Lease<>(registrant, leaseDuration);
if (existingLease != null) {
// 更新当前续约时间
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
// 2. 添加到gMap,即添加到server端的registry列表
gMap.put(registrant.getId(), lease);
// 3. 更新recentRegisteredQueue
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
// 4. 更新status到overriddenInstanceStatusMap
......
// 5. 更新recentlyChangedQueue
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
// 6. 更新缓存
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
} finally {
read.unlock();
}
}
}
4.7 向其它server节点同步
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
......
// 1. isReplication判断是否需要同步到其他server节点,从客户端过来的请求需要同步
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
// peerEurekaNodes会动态更新,参考3.2.3节
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
......
// 2. 同步复制到其它节点
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
......
}
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
......
CurrentRequestVersion.set(Version.V2);
switch (action) {
......
case Register:
// 即PeerEurekaNode.register(info)
node.register(info);
break;
......
}
......
}
}
public class PeerEurekaNode {
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
// 这个batchingDispatcher的工作原理有些复杂,但是服务端的很多工作都需要它,所以下面会着重分析
batchingDispatcher.process(
taskId("register", info), // 创建人物ID
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
}, // 创建任务
expiryTime // 任务过期时间
);
}
}
先看下batchingDispatcher的定义
public class PeerEurekaNode {
private final TaskDispatcher<String, ReplicationTask> batchingDispatcher;
PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl,
HttpReplicationClient replicationClient, EurekaServerConfig config,
int batchSize, long maxBatchingDelayMs,
long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {
......
ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);
this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
// "target_" + 主机名
batcherName,
// eureka.server.maxElementsInPeerReplicationPool,默认10000
config.getMaxElementsInPeerReplicationPool(),
// 一个batch请求最多容纳多少个子请求,默认250
batchSize,
// eureka.server.maxThreadsForPeerReplication,默认20
config.getMaxThreadsForPeerReplication(),
// 多久发送一次batch请求,默认500毫秒
maxBatchingDelayMs,
// 网络阻塞则等待serverUnavailableSleepTimeMs后再重试,默认1000毫秒
serverUnavailableSleepTimeMs,
// 网络错误则等待retrySleepTimeMs后再重试,默认100
retrySleepTimeMs,
taskProcessor
);
......
}
}
再看下batchingDispatcher整个工作流程
整个过程的代码很是复杂,所以我们只看流程图中最关键的三步
1、 生产task;
2、 消费task,生产batch;
3、 消费batch,发送batch请求进行同步信息;
在详细分析之前我们先了解下整个过程的数据变化,请注意颜色的不同
好了,现在开始看源码
生产task
以register
为例
public class PeerEurekaNode {
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(
taskId("register", info),
// 注意Action.Register,表明这是一个register任务
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
}
class AcceptorExecutor<ID, T> {
void process(ID id, T task, long expiryTime) {
// 添加到acceptorQueue
acceptorQueue.add(new TaskHolder<ID, T>(id, task, expiryTime));
acceptedTasks++;
}
}
消费task,生产batch
class AcceptorRunner implements Runnable {
@Override
public void run() {
long scheduleTime = 0;
while (!isShutdown.get()) {
try {
// 1. 这个内部函数的作用是将reprocessQueue和acceptorQueue中的task添加到pendingTasks,不再细表
drainInputQueues();
int totalItems = processingOrder.size();
long now = System.currentTimeMillis();
if (scheduleTime < now) {
scheduleTime = now + trafficShaper.transmissionDelay();
}
// 2. 判断延时是否达到maxBatchingDelayMs
if (scheduleTime <= now) {
// 生产batch,具体源码不再细表
assignBatchWork();
assignSingleItemWork();
}
if (totalItems == processingOrder.size()) {
Thread.sleep(10);
}
} catch (InterruptedException ex) {
// Ignore
} catch (Throwable e) {
logger.warn("Discovery AcceptorThread error", e);
}
}
}
}
消费batch
static class BatchWorkerRunnable<ID, T> extends WorkerRunnable<ID, T> {
@Override
public void run() {
......
while (!isShutdown.get()) {
// 如果没有关停则死循环进行batch任务
List<TaskHolder<ID, T>> holders = getWork();
metrics.registerExpiryTimes(holders);
List<T> tasks = getTasksOf(holders);
// processor为ReplicationTaskProcessor
ProcessingResult result = processor.process(tasks);
switch (result) {
case Success:
break;
case Congestion:
case TransientError:
// 如果是网络错误或者网络拥塞导致失败则加入reprocessQueue等待重新执行
taskDispatcher.reprocess(holders, result);
break;
case PermanentError:
logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
}
metrics.registerTaskResult(result, tasks.size());
}
......
}
}
class ReplicationTaskProcessor implements TaskProcessor<ReplicationTask> {
@Override
public ProcessingResult process(List<ReplicationTask> tasks) {
ReplicationList list = createReplicationListOf(tasks);
try {
// 发送batch请求,replicationClient为JerseyReplicationClient
EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
int statusCode = response.getStatusCode();
if (!isSuccess(statusCode)) {
if (statusCode == 503) {
return ProcessingResult.Congestion; // 网络拥塞
} else {
return ProcessingResult.PermanentError;
}
} else {
// 分发结果
handleBatchResponse(tasks, response.getEntity().getResponseList());
}
} catch (Throwable e) {
if (maybeReadTimeOut(e)) {
return ProcessingResult.Congestion; // 网络拥塞
} else if (isNetworkConnectException(e)) {
return ProcessingResult.TransientError; // 网络连接问题
} else {
return ProcessingResult.PermanentError;
}
}
return ProcessingResult.Success;
}
}
接下来看下submitBatchUpdates
请求的流程图
着重分析下绿色部分PeerReplicationResource.batchReplication
public class PeerReplicationResource {
@Path("batch")
@POST
public Response batchReplication(ReplicationList replicationList) {
......
ReplicationListResponse batchResponse = new ReplicationListResponse();
for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {
......
// 将batch请求拆分成单个请求
batchResponse.addResponse(dispatch(instanceInfo));
......
}
return Response.ok(batchResponse).build();
......
}
private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) {
......
Builder singleResponseBuilder = new Builder();
switch (instanceInfo.getAction()) {
case Register:
// Register类型
singleResponseBuilder = handleRegister(instanceInfo, applicationResource);
break;
......
}
return singleResponseBuilder.build();
}
private static Builder handleRegister(ReplicationInstance instanceInfo, ApplicationResource applicationResource) {
// 注意,这里就回到(4.5节),只不过这里加了REPLICATION,证明这已经是同步信息,不用再同步到别的节点,后续和从客户端发送register请求相同
applicationResource.addInstance(instanceInfo.getInstanceInfo(), REPLICATION);
return new Builder().setStatusCode(Status.OK.getStatusCode());
}
}
至此,注册的整个过程就分析到此。
未完待续
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: