目录
-
- 续约
-
- 5.1 初始化
- 5.2 TimedSupervisorTask
- 5.3 renew()
- 5.4 服务端收到renew请求
- 5.5 服务端更新实例续约信息
- 5.6 同步到其他server节点
5. 续约
先看下整体流程
接下来分析绿色的6个步骤
5.1 初始化
public class DiscoveryClient implements EurekaClient {
private final ThreadPoolExecutor heartbeatExecutor;
private TimedSupervisorTask heartbeatTask;
DiscoveryClient(...) {
// 1. 创建续约线程池
heartbeatExecutor = new ThreadPoolExecutor(
1,
// eureka.client.heartbeatExecutorThreadPoolSize,默认2
clientConfig.getHeartbeatExecutorThreadPoolSize(),
0,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
);
}
private void initScheduledTasks() {
if (clientConfig.shouldRegisterWithEureka()) {
// eureka.instance.leaseRenewalIntervalInSeconds,续约间隔,默认30
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
// eureka.client.heartbeatExecutorExponentialBackOffBound,默认10
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
// 2. 包装成定时任务
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread() // 续约的真正逻辑
);
// 3. 开启定时任务
scheduler.schedule(
heartbeatTask,
renewalIntervalInSecs, TimeUnit.SECONDS);
}
}
}
5.2 TimedSupervisorTask
public class TimedSupervisorTask extends TimerTask {
@Override
public void run() {
Future<?> future = null;
try {
// 提交续约task到线程池
future = executor.submit(task);
// 阻塞等待直到返回结果或者超时
future.get(timeoutMillis, TimeUnit.MILLISECONDS);
// 下一次延时任务的时间
delay.set(timeoutMillis);
} catch (TimeoutException e) {
// 等待结果超时,指数级增长超时时间
// 第一次:currentDelay
// 第二次:currentDelay * 2
// ...
// 第n+1次:currentDelay * 2^n
// currentDelay * 2^n 必须小于eureka.instance.leaseRenewalIntervalInSeconds * eureka.client.heartbeatExecutorExponentialBackOffBound
// 否则取两者间最小值
timeoutCounter.increment();
long currentDelay = delay.get();
long newDelay = Math.min(maxDelay, currentDelay * 2);
delay.compareAndSet(currentDelay, newDelay);
} catch (RejectedExecutionException e) {
rejectedCounter.increment();
} catch (Throwable e) {
throwableCounter.increment();
} finally {
if (future != null) {
future.cancel(true);
}
if (!scheduler.isShutdown()) {
// 定时下一次续约时间
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
}
5.3 renew()
HeartbeatThread
获得线程资源,执行run()
方法
public class DiscoveryClient implements EurekaClient {
private class HeartbeatThread implements Runnable {
public void run() {
// 执行续约请求
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
// 发送续约请求
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
// 续约失败,重新注册
boolean success = register();
......
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
}
5.4 服务端收到renew请求
public class InstanceResource {
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
// 客户端发过来的请求为false,需要同步到其他server节点
boolean isFromReplicaNode = "true".equals(isReplication);
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
......
return response;
}
}
5.5 服务端更新实例续约信息
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
public boolean renew(String appName, String id, boolean isReplication) {
// 1. 获取实例所属的应用
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
// 2. 获取旧的实例信息
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
if (leaseToRenew == null) {
return false;
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// 3. 更新status
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
renewsLastMin.increment();
// 4. 更新lastUpdateTimestamp以便服务端服务剔除时检查检查
leaseToRenew.renew();
return true;
}
}
// 获取实例的更新状态
protected InstanceInfo.InstanceStatus getOverriddenInstanceStatus(InstanceInfo r,
Lease<InstanceInfo> existingLease,
boolean isReplication) {
// InstanceStatus: UP, DOWN, STARTING, OUT_OF_SERVICE, UNKNOWN;
InstanceStatusOverrideRule rule = new FirstMatchWinsCompositeRule(
// 如果r是UP or OUT_OF_SERVICE,继续往下判断;否则返回相应的值
new DownOrStartingRule(),
// 如果r不在overriddenInstanceStatusMap中继续往下判断,否则返回相应值
new OverrideExistsRule(overriddenInstanceStatusMap),
// 如果existingLease不为空且其值都不是UP or OUT_OF_SERVICE继续往下判断,否则返回相应的值
new LeaseExistsRule(),
// 返回r的status
new AlwaysMatchInstanceStatusRule());
return rule.apply(r, existingLease, isReplication).status();
}
}
5.6 同步到其他server节点
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
public boolean renew(final String appName, final String id, final boolean isReplication) {
if (super.renew(appName, id, isReplication)) {
// 复制到其他server节点,后面的除了Action.Heartbeat,其他的与register相同,参考4.7节
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
}
Eureka的续约功能整体流程就讲完了。
未完待续
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: