前段时间学习了netty,读了些源码,今天开始看RocketMQ是如何把netty封装为底层通信类,核心类有哪些
目录
remoting项目目录
RemotingServerTest
@BeforeClass创建服务端
NettyServerConfig
NettyRemotingServer
@BeforeClass创建客户端
NettyClientConfig
NettyRemotingClient
NettyRemotingAbstract
@Test testInvokeSync 同步调用
NettyRemotingAbstract#invokeSyncImpl
@Test testInvokeOneway 单向调用
NettyRemotingAbstract#invokeOnewayImpl
@Test testInvokeAsync 异步调用
NettyRemotingAbstract#invokeAsyncImpl
remoting项目目录
(截了两个重要的包):
其中的RemotingCommand是rocketMQ中各种消息的封装类,传送--> RocketMQ源码阅读(二)RemotingCommand、NettyEncoder和NettyDecoder
RemotingServerTest
从项目本身自带的测试类开始读源码,这里有一整套逻辑,便于理解。
总体来看, @BeforeClass创建了服务端和客户端, 三个@Test为三种调用方式示例(同步/异步/单向), @AfterClass关闭服务端和客户端,下边一个一个看
public class RemotingServerTest {
private static RemotingServer remotingServer;
private static RemotingClient remotingClient;
@BeforeClass
public static void setup() throws InterruptedException {
remotingServer = createRemotingServer();
remotingClient = createRemotingClient();
}
public static RemotingServer createRemotingServer() throws InterruptedException {
NettyServerConfig config = new NettyServerConfig();
RemotingServer remotingServer = new NettyRemotingServer(config);
remotingServer.registerProcessor(0, new NettyRequestProcessor() {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
request.setRemark("Hi " + ctx.channel().remoteAddress());
return request;
}
@Override
public boolean rejectRequest() {
return false;
}
}, Executors.newCachedThreadPool());
remotingServer.start();
return remotingServer;
}
public static RemotingClient createRemotingClient() {
NettyClientConfig config = new NettyClientConfig();
RemotingClient client = new NettyRemotingClient(config);
client.start();
return client;
}
@AfterClass
public static void destroy() {
remotingClient.shutdown();
remotingServer.shutdown();
}
@Test
public void testInvokeSync() throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException {
RequestHeader requestHeader = new RequestHeader();
requestHeader.setCount(1);
requestHeader.setMessageTitle("Welcome");
RemotingCommand request = RemotingCommand.createRequestCommand(0, requestHeader);
RemotingCommand response = remotingClient.invokeSync("localhost:8888", request, 1000 * 3);
assertTrue(response != null);
assertThat(response.getLanguage()).isEqualTo(LanguageCode.JAVA);
assertThat(response.getExtFields()).hasSize(2);
}
@Test
public void testInvokeOneway() throws InterruptedException, RemotingConnectException,
RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException {
RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
request.setRemark("messi");
remotingClient.invokeOneway("localhost:8888", request, 1000 * 3);
}
@Test
public void testInvokeAsync() throws InterruptedException, RemotingConnectException,
RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException {
final CountDownLatch latch = new CountDownLatch(1);
RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
request.setRemark("messi");
remotingClient.invokeAsync("localhost:8888", request, 1000 * 3, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
latch.countDown();
assertTrue(responseFuture != null);
assertThat(responseFuture.getResponseCommand().getLanguage()).isEqualTo(LanguageCode.JAVA);
assertThat(responseFuture.getResponseCommand().getExtFields()).hasSize(2);
}
});
latch.await();
}
}
@BeforeClass创建服务端
public static RemotingServer createRemotingServer() throws InterruptedException {
// 服务端各种参数配置类,比如group线程数等等
NettyServerConfig config = new NettyServerConfig();
// netty服务端ServerBootStrap封装类
RemotingServer remotingServer = new NettyRemotingServer(config);
// 设置服务端解析RemotingCommand的处理器
remotingServer.registerProcessor(0, new NettyRequestProcessor() {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
request.setRemark("Hi " + ctx.channel().remoteAddress());
return request;
}
@Override
public boolean rejectRequest() {
return false;
}
}, Executors.newCachedThreadPool());
remotingServer.start();
return remotingServer;
}
NettyServerConfig
服务端配置
public class NettyServerConfig implements Cloneable {
private int listenPort = 8888; // 服务端启动监听的端口
private int serverWorkerThreads = 8; // 这是执行handler任务的EventExecutorGroup
private int serverCallbackExecutorThreads = 0;
private int serverSelectorThreads = 3; // 默认启动的子group(处理客户端读写的group)中的eventloop数量
private int serverOnewaySemaphoreValue = 256;// 信号量控制并发量,服务端最大同时oneway调用256次
private int serverAsyncSemaphoreValue = 64;// 最大同时异步64个调用
private int serverChannelMaxIdleTimeSeconds = 120; // 断线重连的idleHandler参数
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;// socket套接字的send缓冲区大小
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;// receive缓冲区大小
private boolean serverPooledByteBufAllocatorEnable = true;// 分配缓冲区的allocator类型,true为池化
/**
* make make install
*
*
* ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \
* --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd
*/
private boolean useEpollNativeSelector = false;
// 。。。。getter/setter
}
NettyRemotingServer
主要是两个地方,实例化new方法和start启动服务端,new中实例化netty的组件BootStrap和实例化各种执行器组。start来启动ServerBootstrap
类中执行器组有四种,首先是两个EventLoop的group,父group来接收连接请求,子group来处理读写。 还有个defaultEventExecutorGroup来执行handler中各方法,一个publicExecutor在最终的handler种处理消息内容(默认的,可以自定义)
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
// 这是一组netty熟悉的api
private final ServerBootstrap serverBootstrap;
private final EventLoopGroup eventLoopGroupSelector;
private final EventLoopGroup eventLoopGroupBoss;
private final NettyServerConfig nettyServerConfig;
// 默认的处理requestCommand内容的执行器线程
private final ExecutorService publicExecutor;
// 连接事件监听器,用于心跳的事件监听
private final ChannelEventListener channelEventListener;
// 定时处理某些任务
private final Timer timer = new Timer("ServerHouseKeepingService", true);
// 执行handler方法的执行器,handlerRead\handlerActive等方法
private DefaultEventExecutorGroup defaultEventExecutorGroup;
// 一个类似AOP处理的类,publicExecutor处理command前后进行加工
private RPCHook rpcHook;
private int port = 0;
public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
this(nettyServerConfig, null);
}
public NettyRemotingServer(final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener) {
// server端和client端的共同父类,将通用的东西提了一下,这里是两种调用方式的信号量大小,在父类设置
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
this.channelEventListener = channelEventListener;
// 默认的处理requestCommand内容的执行器线程默认4个
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
// publicExecutor是固定大小线程池,默认4
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
// 接收客户端连接的group,只有一个selector线程
this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet()));
}
});
// 处理客户端读写的selector线程,默认3
if (RemotingUtil.isLinuxPlatform() //linux下是epoll
&& nettyServerConfig.isUseEpollNativeSelector()) {
this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
} else {
this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
}
}
@Override
// 注册每种RequestCommand对应的消息处理器processor和执行processor的执行器
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
ExecutorService executorThis = executor;
// 没传执行器,有默认的
if (null == executor) {
executorThis = this.publicExecutor;
}
// Pair对象啥都没有,只是将两个对象组合起来
Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
// 缓存,在真正处理消息时,根据消息requestCommand对应的code获取处理器处理
this.processorTable.put(requestCode, pair);
}
@Override
public void start() {
// 执行handler方法的执行器组,默认8个线程
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
// 正经装配server端
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024) // 客户端连接请求的队列大小
.option(ChannelOption.SO_REUSEADDR, true) // 允许重复使用本地地址和端口
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true) // true不延迟,禁止使用Nagle算法
.option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) // 这里设置发送的socket缓冲区大小
.option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) // 设置接收的socket缓冲区大小,这两个参数没想通啊,不应该在childOption设置吗?
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) // 绑定端口
.childHandler(new ChannelInitializer<SocketChannel>() {// 处理客户端消息的初始化处理器
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnetManageHandler(),
new NettyServerHandler());
}
});
// 池化的Allocator
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
// 启动
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress)sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
// 如果监听心跳事件的监听器存在,启动nettyEvent事件处理线程池分发事件
// 就是开了个线程,轮询事件队列,调用listenr的对应发放
if (this.channelEventListener != null) {
this.nettyEventExecuter.start();
}
// 定时扫描responseTable变量,在父类,每次同步异步调用需要等待响应,这变量存的就是请求id对应的ResponseFuture对象,定时清理超时的
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Exception e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
}
@BeforeClass创建客户端
public static RemotingClient createRemotingClient() {
NettyClientConfig config = new NettyClientConfig();
RemotingClient client = new NettyRemotingClient(config);
client.start();
return client;
}
NettyClientConfig
public class NettyClientConfig {
/**
* Worker thread number
*/
private int clientWorkerThreads = 4;// 处理handler方法的线程
// 处理消息内容requestCommand的线程数,默认当前机器的逻辑处理器数量
private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
//两种调用方式的并发控制信号量,可以用参数设置,默认是65535
private int clientOnewaySemaphoreValue = NettySystemConfig.CLIENT_ONEWAY_SEMAPHORE_VALUE;
private int clientAsyncSemaphoreValue = NettySystemConfig.CLIENT_ASYNC_SEMAPHORE_VALUE;
private int connectTimeoutMillis = 3000;// 连接超时
private long channelNotActiveInterval = 1000 * 60;// 多长时间不交互为不活跃
/**
* IdleStateEvent will be triggered when neither read nor write was performed for
* the specified period of this time. Specify {@code 0} to disable
*/
private int clientChannelMaxIdleTimeSeconds = 120;// 120秒没有读写会触发一个IdleStateEvent
// socket发送和接收缓冲区大小,默认65535
private int clientSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int clientSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
private boolean clientPooledByteBufAllocatorEnable = false;
private boolean clientCloseSocketIfTimeout = false;
//。。。。getter/setter
}
NettyRemotingClient
和服务端一样也是new和start方法
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
private static final long LOCK_TIMEOUT_MILLIS = 3000;
private final NettyClientConfig nettyClientConfig;
// netty客户端api
private final Bootstrap bootstrap = new Bootstrap();
private final EventLoopGroup eventLoopGroupWorker;
// 连接服务端时,用的锁
private final Lock lockChannelTables = new ReentrantLock();
// 每连接一个channel,放到这个map中,复用使用
private final ConcurrentHashMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<>();
private final Timer timer = new Timer("ClientHouseKeepingService", true);
private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<>();
private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<>();
private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
private final Lock lockNamesrvChannel = new ReentrantLock();
private final ExecutorService publicExecutor;
private final ChannelEventListener channelEventListener;
private DefaultEventExecutorGroup defaultEventExecutorGroup;
private RPCHook rpcHook;
public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
this(nettyClientConfig, null);
}
public NettyRemotingClient(final NettyClientConfig nettyClientConfig, //
final ChannelEventListener channelEventListener) {
// 父类两个信号量设置
super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
// 配置类
this.nettyClientConfig = nettyClientConfig;
// 心跳事件监听
this.channelEventListener = channelEventListener;
// 初始化默认的处理消息内容的执行器
int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
// 单线程selector
this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
}
});
}
public void start() {
// 执行handler方法的执行器,默认4个线程
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(//
nettyClientConfig.getClientWorkerThreads(), //
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
// 配置bootstrap,并没有connect,在调用的时候连接
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)//
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler());
}
});
// 同服务端,定时扫描responseTable,删除超时的
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Exception e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
// 有心跳监听,启动事件分发器
if (this.channelEventListener != null) {
this.nettyEventExecuter.start();
}
}
}
NettyRemotingAbstract
NettyRemotingAbstract类是NettyRemotingServer、NettyRemotingClient的父类,抽象了一些客户端和服务端通用的方法,主要是三种调用方式、解析请求/响应RemotingCommand等
public abstract class NettyRemotingAbstract {
// 异步和单向调用信号量
protected final Semaphore semaphoreOneway;
protected final Semaphore semaphoreAsync;
// 同步异步调用时记录的响应future
protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer, ResponseFuture>(256);
// 注册的各种请求对应的请求处理类和执行器
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
// 心跳等事件分发,有监听时启用
protected final NettyEventExecuter nettyEventExecuter = new NettyEventExecuter();
// 默认的请求处理
protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) {
this.semaphoreOneway = new Semaphore(permitsOneway, true);
this.semaphoreAsync = new Semaphore(permitsAsync, true);
}
}
processMessageReceived方法
**处理请求和响应的requestCommand和responseCommand,这个方法在server端和client端最后一个handler(****NettyServerHandler/**NettyClientHandler)的channelRead被调用
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
// 处理请求command
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
// 从注册表获取这个请求对应的处理类和执行器
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
// 没从注册表找到,使用默认的处理器
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
// opaque 是请求id,请求方在同一连接上的不同请求号
final int opaque = cmd.getOpaque();
if (pair != null) {
// 处理器和执行器不为null,new一个任务交给执行器执行,任务就是处理请求requestCommand
Runnable run = new Runnable() {
@Override
public void run() {
try {
// 上边提到的类似AOP对象,请求处理前后加工
RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
if (rpcHook != null) {
rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
}
// 对应请求处理器处理请求内容
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
if (rpcHook != null) {
rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
}
// 不是单向的就得给返回
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
PLOG.error("process request over, but response failed", e);
PLOG.error(cmd.toString());
PLOG.error(response.toString());
}
} else {
}
}
} catch (Throwable e) {
if (!"com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException"
.equals(e.getClass().getCanonicalName())) {
PLOG.error("process request exception", e);
PLOG.error(cmd.toString());
}
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
};
// 如果处理器拒绝,创建一个RemotingCommand返回
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
try {
// 封装上边的Runnable,交给执行器执行
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
// 线程池满了,任务被拒绝,异常返回
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
} else {
// 无对应的请求处理器,异常返回
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
PLOG.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}
// 处理响应command
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
// 从等待响应的responseFuture的集合中把对应的等待future取出来,设置成功状态
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
responseFuture.setResponseCommand(cmd);
// 释放内部一个信号量
responseFuture.release();
// 集合中移除
responseTable.remove(opaque);
// 如果是异步调用,执行回调方法
if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);
} else {
// 同步调用,同步底层使用CountDownLatch实现,这里就是countDownLatch.countDown();
responseFuture.putResponse(cmd);
}
} else {
PLOG.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
PLOG.warn(cmd.toString());
}
}
@Test testInvokeSync 同步调用
@Test
public void testInvokeSync() throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException {
// new一个header对象,但是序列化的时候不传这个,好像没用,就是在客户端表示一下
RequestHeader requestHeader = new RequestHeader();
requestHeader.setCount(1);
requestHeader.setMessageTitle("Welcome");
// 创建一个请求的RemotingCommand
RemotingCommand request = RemotingCommand.createRequestCommand(0, requestHeader);
// 连接参数地址并调用,超时3秒
RemotingCommand response = remotingClient.invokeSync("localhost:8888", request, 1000 * 3);
assertTrue(response != null);
assertThat(response.getLanguage()).isEqualTo(LanguageCode.JAVA);
assertThat(response.getExtFields()).hasSize(2);
}
NettyRemotingClient#invokeSync
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
// channel复用,bootstrap.connect连接返回的channel都会放到一个map中,供下次使用
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
// aop处理requestCommand
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
// 正经调用方法,在父类
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis);
if (this.rpcHook != null) {
this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
}
return response;
} catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
} catch (RemotingTimeoutException e) {
// 超时是否关闭channel,默认不关
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr, channel);
log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
NettyRemotingAbstract#invokeSyncImpl
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();
try {// 为这个请求id新建一个future,等待请求的响应
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
// 放入集合,key为请求id
this.responseTable.put(opaque, responseFuture);
//
final SocketAddress addr = channel.remoteAddress();
// 发送并设置监听回调
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
// 设置成功失败
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
// 集合中移除响应future
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
// 响应完成,同步阻塞结束,底层CountDownLatch实现
responseFuture.putResponse(null);
PLOG.warn("send a request command to channel <" + addr + "> failed.");
}
});
// 同步阻塞等待响应,CountDownLatch实现
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}
public class ResponseFuture {
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return this.responseCommand;
}
public void putResponse(final RemotingCommand responseCommand) {
this.responseCommand = responseCommand;
this.countDownLatch.countDown();
}
}
@Test testInvokeOneway 单向调用
@Test
public void testInvokeOneway() throws InterruptedException, RemotingConnectException,
RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException {
RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
request.setRemark("messi");
request.setBody("a".getBytes());
remotingClient.invokeOneway("localhost:8888", request, 1000 * 3);
}
NettyRemotingClient#invokeOneway
@Override
public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
this.invokeOnewayImpl(channel, request, timeoutMillis);
} catch (RemotingSendRequestException e) {
log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
NettyRemotingAbstract#invokeOnewayImpl
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
// 这一步设置的标志位暂不知道是啥
request.markOnewayRPC();
// 单向调用信号量控制,最大调用的并发数
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
try {
// 调用后接收到响应后,释放信号量
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
once.release();
if (!f.isSuccess()) {
PLOG.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
}
}
});
} catch (Exception e) {
// 释放信号量
once.release();
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
} else {
String info = String.format(
"invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
timeoutMillis, //
this.semaphoreOneway.getQueueLength(), //
this.semaphoreOneway.availablePermits()//
);
PLOG.warn(info);
throw new RemotingTimeoutException(info);
}
}
}
@Test testInvokeAsync 异步调用
@Test
public void testInvokeAsync() throws InterruptedException, RemotingConnectException,
RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException {
final CountDownLatch latch = new CountDownLatch(1);
RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
request.setRemark("messi");
// 异步调用,设置自定义InvokeCallback回调
remotingClient.invokeAsync("localhost:8888", request, 1000 * 3, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
latch.countDown();
assertTrue(responseFuture != null);
assertThat(responseFuture.getResponseCommand().getLanguage()).isEqualTo(LanguageCode.JAVA);
assertThat(responseFuture.getResponseCommand().getExtFields()).hasSize(2);
}
});
// 测试方法,等待响应后再结束
latch.await();
}
NettyRemotingClient#invokeAsync
@Override
public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException {
// 获取channel或connect到服务端获取一个channel
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
} catch (RemotingSendRequestException e) {
log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
NettyRemotingAbstract#invokeAsyncImpl
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
// 获取请求id
final int opaque = request.getOpaque();
// 并发信号量
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once);
this.responseTable.put(opaque, responseFuture);
try {
// 发送command,设置回调
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
// 设置结束标志,集合中移除此future
responseFuture.putResponse(null);
responseTable.remove(opaque);
try {
// 执行回调,释放信号量
executeInvokeCallback(responseFuture);
} catch (Throwable e) {
PLOG.warn("excute callback in writeAndFlush addListener, and callback throw", e);
} finally {
responseFuture.release();
}
PLOG.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
}
});
} catch (Exception e) {
responseFuture.release();
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
String info =
String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
timeoutMillis, //
this.semaphoreAsync.getQueueLength(), //
this.semaphoreAsync.availablePermits()//
);
PLOG.warn(info);
throw new RemotingTooMuchRequestException(info);
}
}
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: