目录
- 一、InitExecutor
- 二、CommandCenterInitFunc
- 三、HeartbeatSenderInitFunc
- 四、DefaultClusterClientInitFunc
- 五、MetricCallbackInit
- 六、ParamFlowStatisticSlotCallbackInit
- 七、DefaultClusterServerInitFunc
一、InitExecutor
1、 InitFunc接口的初始化是在InitExecutor的doInit()方法中实现的调用doInit()的地方有三处:;
(1)SentinelAutoConfiguration
@PostConstruct
private void init() {
...
//检查是否配置了 spring.cloud.sentinel.eager=true
if (properties.isEager()) {
InitExecutor.doInit();
}
}
(2)Env
public class Env {
public static final Sph sph = new CtSph();
static {
// If init fails, the process will exit.
InitExecutor.doInit();
}
}
(3)ClusterStateManager
static {
InitExecutor.doInit();
stateProperty.addListener(PROPERTY_LISTENER);
}
2、 doInit();
使用SPI方式,使用 ServiceLoader 获取META-INF/services目录下的com.alibaba.csp.sentinel.init.InitFunc
public static void doInit() {
//只被初始化一次
if (!initialized.compareAndSet(false, true)) {
return;
}
try {
//采用ServiceLoader的方式加载InitFunc *
ServiceLoader<InitFunc> loader = ServiceLoaderUtil.getServiceLoader(InitFunc.class);
List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
for (InitFunc initFunc : loader) {
RecordLog.info("[InitExecutor] Found init func: " + initFunc.getClass().getCanonicalName());
//对initFunc 进行排序
insertSorted(initList, initFunc);
}
for (OrderWrapper w : initList) {
//依次调用init()
w.func.init();
RecordLog.info(String.format("[InitExecutor] Executing %s with order %d",
w.func.getClass().getCanonicalName(), w.order));
}
} catch (Exception ex) {
RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex);
ex.printStackTrace();
} catch (Error error) {
RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error);
error.printStackTrace();
}
}
3、 insertSorted();
private static void insertSorted(List<OrderWrapper> list, InitFunc func) {
//解析出 order,根据order 进行排序
int order = resolveOrder(func);
int idx = 0;
for (; idx < list.size(); idx++) {
if (list.get(idx).getOrder() > order) {
break;
}
}
list.add(idx, new OrderWrapper(order, func));
}
private static int resolveOrder(InitFunc func) {
if (!func.getClass().isAnnotationPresent(InitOrder.class)) {
//没有 @InitOrder 注解时,默认最低优先级
return InitOrder.LOWEST_PRECEDENCE;
} else {
//获取 @InitOrder 注解 的value值
return func.getClass().getAnnotation(InitOrder.class).value();
}
}
二、CommandCenterInitFunc
初始化所有命令处理器,接收服务端发来的命令信息并进行处理。
1、 init();
@InitOrder(-1)
public class CommandCenterInitFunc implements InitFunc {
@Override
public void init() throws Exception {
//采用SPI方式获取到 SimpleHttpCommandCenter
CommandCenter commandCenter = CommandCenterProvider.getCommandCenter();
if (commandCenter == null) {
RecordLog.warn("[CommandCenterInitFunc] Cannot resolve CommandCenter");
return;
}
//命令处理中心启动前的逻辑
commandCenter.beforeStart();
//命令处理中心启动
commandCenter.start();
RecordLog.info("[CommandCenterInit] Starting command center: "
+ commandCenter.getClass().getCanonicalName());
}
}
2、 beforeStart();
public void beforeStart() throws Exception {
// 通过SPI方式获取所有的 CommandHandler
Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
//将命令处理器保存到内存缓存
registerCommands(handlers);
}
3、 start();
@Override
public void start() throws Exception {
int nThreads = Runtime.getRuntime().availableProcessors();
//业务线程池
this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10),
new NamedThreadFactory("sentinel-command-center-service-executor"),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
CommandCenterLog.info("EventTask rejected");
throw new RejectedExecutionException();
}
});
Runnable serverInitTask = new Runnable() {
//跟服务端的连接端口,默认是8719
int port;
{
try {
port = Integer.parseInt(TransportConfig.getPort());
} catch (Exception e) {
port = DEFAULT_PORT;
}
}
@Override
public void run() {
boolean success = false;
//创建ServerSocket 连接
ServerSocket serverSocket = getServerSocketFromBasePort(port);
if (serverSocket != null) {
CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());
socketReference = serverSocket;
//创建 ServerThread 线程任务
executor.submit(new ServerThread(serverSocket));
success = true;
port = serverSocket.getLocalPort();
} else {
CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work");
}
if (!success) {
port = PORT_UNINITIALIZED;
}
TransportConfig.setRuntimePort(port);
executor.shutdown();
}
};
new Thread(serverInitTask).start();
}
4、 ServerThread;
class ServerThread extends Thread {
private ServerSocket serverSocket;
ServerThread(ServerSocket s) {
this.serverSocket = s;
setName("sentinel-courier-server-accept-thread");
}
@Override
public void run() {
while (true) {
Socket socket = null;
try {
//阻塞接收消息
socket = this.serverSocket.accept();
setSocketSoTimeout(socket);
//创建任务
HttpEventTask eventTask = new HttpEventTask(socket);
//放入业务线程池
bizExecutor.submit(eventTask);
} catch (Exception e) {
CommandCenterLog.info("Server error", e);
if (socket != null) {
try {
socket.close();
} catch (Exception e1) {
CommandCenterLog.info("Error when closing an opened socket", e1);
}
}
try {
// In case of infinite log.
Thread.sleep(10);
} catch (InterruptedException e1) {
// Indicates the task should stop.
break;
}
}
}
}
}
三、HeartbeatSenderInitFunc
初始化心跳发送者,启动心跳线程,对sentinel服务端发起心跳
1、 init();
@Override
public void init() {
//心跳发送者 SimpleHttpHeartbeatSender
HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender();
if (sender == null) {
RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded");
return;
}
//初始化心跳发送任务线程池
initSchedulerIfNeeded();
//获取心跳发送周期,如果没有配置,使用默认值10s,private static final long DEFAULT_INTERVAL = 1000 * 10;
long interval = retrieveInterval(sender);
setIntervalIfNotExists(interval);
//调度心跳发送任务
scheduleHeartbeatTask(sender, interval);
}
2、 scheduleHeartbeatTask();
private void scheduleHeartbeatTask(/*@NonNull*/ final HeartbeatSender sender, /*@Valid*/ long interval) {
pool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//发送心跳
sender.sendHeartbeat();
} catch (Throwable e) {
RecordLog.warn("[HeartbeatSender] Send heartbeat error", e);
}
}
}, 5000, interval, TimeUnit.MILLISECONDS);
RecordLog.info("[HeartbeatSenderInit] HeartbeatSender started: "
+ sender.getClass().getCanonicalName());
}
3、 sender.sendHeartbeat();
@Override
public boolean sendHeartbeat() throws Exception {
//校验端口
if (TransportConfig.getRuntimePort() <= 0) {
RecordLog.info("[SimpleHttpHeartbeatSender] Command server port not initialized, won't send heartbeat");
return false;
}
//获取sentinel服务端地址
Tuple2<String, Integer> addrInfo = getAvailableAddress();
if (addrInfo == null) {
return false;
}
InetSocketAddress addr = new InetSocketAddress(addrInfo.r1, addrInfo.r2);
//请求信息
SimpleHttpRequest request = new SimpleHttpRequest(addr, TransportConfig.getHeartbeatApiPath());
//参数信息
request.setParams(heartBeat.generateCurrentMessage());
try {
//发送请求
SimpleHttpResponse response = httpClient.post(request);
if (response.getStatusCode() == OK_STATUS) {
return true;
} else if (clientErrorCode(response.getStatusCode()) || serverErrorCode(response.getStatusCode())) {
RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addr
+ ", http status code: " + response.getStatusCode());
}
} catch (Exception e) {
RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addr, e);
}
return false;
}
4、 httpClient.post();
通过Socket 连接进行发送
public SimpleHttpResponse post(SimpleHttpRequest request) throws IOException {
if (request == null) {
return null;
}
return request(request.getSocketAddress(),
RequestMethod.POST, request.getRequestPath(),
request.getParams(), request.getCharset(),
request.getSoTimeout());
}
private SimpleHttpResponse request(InetSocketAddress socketAddress,
RequestMethod type, String requestPath,
Map<String, String> paramsMap, Charset charset, int soTimeout)
throws IOException {
Socket socket = null;
BufferedWriter writer;
try {
socket = new Socket();
socket.setSoTimeout(soTimeout);
//连接服务端
socket.connect(socketAddress, soTimeout);
writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), charset));
requestPath = getRequestPath(type, requestPath, paramsMap, charset);
writer.write(getStatusLine(type, requestPath) + "\r\n");
if (charset != null) {
writer.write("Content-Type: application/x-www-form-urlencoded; charset=" + charset.name() + "\r\n");
} else {
writer.write("Content-Type: application/x-www-form-urlencoded\r\n");
}
writer.write("Host: " + socketAddress.getHostName() + "\r\n");
if (type == RequestMethod.GET) {
writer.write("Content-Length: 0\r\n");
writer.write("\r\n");
} else {
// POST method.
String params = encodeRequestParams(paramsMap, charset);
writer.write("Content-Length: " + params.getBytes(charset).length + "\r\n");
writer.write("\r\n");
writer.write(params);
}
writer.flush();
SimpleHttpResponse response = new SimpleHttpResponseParser().parse(socket.getInputStream());
socket.close();
socket = null;
return response;
} finally {
if (socket != null) {
try {
socket.close();
} catch (Exception ex) {
RecordLog.warn("Error when closing {} request to {} in SimpleHttpClient", type, socketAddress, ex);
}
}
}
}
四、DefaultClusterClientInitFunc
初始化集群客户端需要的编解码相关的类,TYPE_PING、TYPE_FLOW、TYPE_PARAM_FLOW
@InitOrder(0)
public class DefaultClusterClientInitFunc implements InitFunc {
@Override
public void init() throws Exception {
//初始化 Netty Encoder 使用的 EntityWriter
initDefaultEntityWriters();
//初始化 Netty Decoder 使用的 EntityDecoder
initDefaultEntityDecoders();
}
private void initDefaultEntityWriters() {
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PING, new PingRequestDataWriter());
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_FLOW, new FlowRequestDataWriter());
Integer maxParamByteSize = ClusterClientStartUpConfig.getMaxParamByteSize();
if (maxParamByteSize == null) {
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PARAM_FLOW, new ParamFlowRequestDataWriter());
} else {
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PARAM_FLOW, new ParamFlowRequestDataWriter(maxParamByteSize));
}
}
private void initDefaultEntityDecoders() {
ResponseDataDecodeRegistry.addDecoder(ClientConstants.TYPE_PING, new PingResponseDataDecoder());
ResponseDataDecodeRegistry.addDecoder(ClientConstants.TYPE_FLOW, new FlowResponseDataDecoder());
ResponseDataDecodeRegistry.addDecoder(ClientConstants.TYPE_PARAM_FLOW, new FlowResponseDataDecoder());
}
}
五、MetricCallbackInit
初始化统计相关的类 MetricEntryCallback、MetricExitCallback
public class MetricCallbackInit implements InitFunc {
@Override
public void init() throws Exception {
StatisticSlotCallbackRegistry.addEntryCallback(MetricEntryCallback.class.getCanonicalName(),
new MetricEntryCallback());
StatisticSlotCallbackRegistry.addExitCallback(MetricExitCallback.class.getCanonicalName(),
new MetricExitCallback());
}
}
六、ParamFlowStatisticSlotCallbackInit
初始化流控参数回调 ParamFlowStatisticEntryCallback、ParamFlowStatisticExitCallback
public class ParamFlowStatisticSlotCallbackInit implements InitFunc {
@Override
public void init() {
StatisticSlotCallbackRegistry.addEntryCallback(ParamFlowStatisticEntryCallback.class.getName(),
new ParamFlowStatisticEntryCallback());
StatisticSlotCallbackRegistry.addExitCallback(ParamFlowStatisticExitCallback.class.getName(),
new ParamFlowStatisticExitCallback());
}
}
七、DefaultClusterServerInitFunc
初始化服务端集群相关的一些类
@Override
public void init() throws Exception {
//EntityDecoder
initDefaultEntityDecoders();
//EntityWriter
initDefaultEntityWriters();
//RequestProcessor
initDefaultProcessors();
// Eagerly-trigger the SPI pre-load of token service.
//TokenService
TokenServiceProvider.getService();
RecordLog.info("[DefaultClusterServerInitFunc] Default entity codec and processors registered");
}