01、RocketMQ源码分析:remoting模块核心类

前段时间学习了netty,读了些源码,今天开始看RocketMQ是如何把netty封装为底层通信类,核心类有哪些

目录

remoting项目目录

RemotingServerTest

@BeforeClass创建服务端

NettyServerConfig

NettyRemotingServer

@BeforeClass创建客户端

NettyClientConfig

NettyRemotingClient

NettyRemotingAbstract

@Test testInvokeSync 同步调用

NettyRemotingAbstract#invokeSyncImpl

@Test testInvokeOneway 单向调用

NettyRemotingAbstract#invokeOnewayImpl

@Test testInvokeAsync 异步调用

NettyRemotingAbstract#invokeAsyncImpl


remoting项目目录

(截了两个重要的包):

***

其中的RemotingCommand是rocketMQ中各种消息的封装类,传送--> RocketMQ源码阅读(二)RemotingCommand、NettyEncoder和NettyDecoder

RemotingServerTest

从项目本身自带的测试类开始读源码,这里有一整套逻辑,便于理解。

总体来看, @BeforeClass创建了服务端和客户端, 三个@Test为三种调用方式示例(同步/异步/单向), @AfterClass关闭服务端和客户端,下边一个一个看

public class RemotingServerTest {
    private static RemotingServer remotingServer;
    private static RemotingClient remotingClient;

    @BeforeClass
    public static void setup() throws InterruptedException {
        remotingServer = createRemotingServer();
        remotingClient = createRemotingClient();
    }

    public static RemotingServer createRemotingServer() throws InterruptedException {
        NettyServerConfig config = new NettyServerConfig();
        RemotingServer remotingServer = new NettyRemotingServer(config);
        remotingServer.registerProcessor(0, new NettyRequestProcessor() {
            @Override
            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
                request.setRemark("Hi " + ctx.channel().remoteAddress());
                return request;
            }

            @Override
            public boolean rejectRequest() {
                return false;
            }
        }, Executors.newCachedThreadPool());

        remotingServer.start();

        return remotingServer;
    }

    public static RemotingClient createRemotingClient() {
        NettyClientConfig config = new NettyClientConfig();
        RemotingClient client = new NettyRemotingClient(config);
        client.start();
        return client;
    }

    @AfterClass
    public static void destroy() {
        remotingClient.shutdown();
        remotingServer.shutdown();
    }

    @Test
    public void testInvokeSync() throws InterruptedException, RemotingConnectException,
        RemotingSendRequestException, RemotingTimeoutException {
        RequestHeader requestHeader = new RequestHeader();
        requestHeader.setCount(1);
        requestHeader.setMessageTitle("Welcome");
        RemotingCommand request = RemotingCommand.createRequestCommand(0, requestHeader);
        RemotingCommand response = remotingClient.invokeSync("localhost:8888", request, 1000 * 3);
        assertTrue(response != null);
        assertThat(response.getLanguage()).isEqualTo(LanguageCode.JAVA);
        assertThat(response.getExtFields()).hasSize(2);

    }

    @Test
    public void testInvokeOneway() throws InterruptedException, RemotingConnectException,
        RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException {

        RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
        request.setRemark("messi");
        remotingClient.invokeOneway("localhost:8888", request, 1000 * 3);
    }

    @Test
    public void testInvokeAsync() throws InterruptedException, RemotingConnectException,
        RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException {

        final CountDownLatch latch = new CountDownLatch(1);
        RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
        request.setRemark("messi");
        remotingClient.invokeAsync("localhost:8888", request, 1000 * 3, new InvokeCallback() {
            @Override
            public void operationComplete(ResponseFuture responseFuture) {
                latch.countDown();
                assertTrue(responseFuture != null);
                assertThat(responseFuture.getResponseCommand().getLanguage()).isEqualTo(LanguageCode.JAVA);
                assertThat(responseFuture.getResponseCommand().getExtFields()).hasSize(2);
            }
        });
        latch.await();
    }
}

@BeforeClass创建服务端

    public static RemotingServer createRemotingServer() throws InterruptedException {
        // 服务端各种参数配置类,比如group线程数等等
        NettyServerConfig config = new NettyServerConfig();
        
        // netty服务端ServerBootStrap封装类
        RemotingServer remotingServer = new NettyRemotingServer(config);

        // 设置服务端解析RemotingCommand的处理器
        remotingServer.registerProcessor(0, new NettyRequestProcessor() {
            @Override
            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
                request.setRemark("Hi " + ctx.channel().remoteAddress());
                return request;
            }

            @Override
            public boolean rejectRequest() {
                return false;
            }
        }, Executors.newCachedThreadPool());

        remotingServer.start();

        return remotingServer;
    }

NettyServerConfig

服务端配置

public class NettyServerConfig implements Cloneable {
    private int listenPort = 8888; // 服务端启动监听的端口
    private int serverWorkerThreads = 8;  // 这是执行handler任务的EventExecutorGroup
    private int serverCallbackExecutorThreads = 0;
    private int serverSelectorThreads = 3; // 默认启动的子group(处理客户端读写的group)中的eventloop数量
    private int serverOnewaySemaphoreValue = 256;// 信号量控制并发量,服务端最大同时oneway调用256次
    private int serverAsyncSemaphoreValue = 64;// 最大同时异步64个调用
    private int serverChannelMaxIdleTimeSeconds = 120; // 断线重连的idleHandler参数

    private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;// socket套接字的send缓冲区大小
    private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;// receive缓冲区大小
    private boolean serverPooledByteBufAllocatorEnable = true;// 分配缓冲区的allocator类型,true为池化

    /**
     * make make install
     *
     *
     * ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \
     * --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd
     */
    private boolean useEpollNativeSelector = false;

// 。。。。getter/setter
}

NettyRemotingServer

主要是两个地方,实例化new方法和start启动服务端,new中实例化netty的组件BootStrap和实例化各种执行器组。start来启动ServerBootstrap

类中执行器组有四种,首先是两个EventLoop的group,父group来接收连接请求,子group来处理读写。 还有个defaultEventExecutorGroup来执行handler中各方法,一个publicExecutor在最终的handler种处理消息内容(默认的,可以自定义)

public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {

    private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

    // 这是一组netty熟悉的api
    private final ServerBootstrap serverBootstrap;
    private final EventLoopGroup eventLoopGroupSelector;
    private final EventLoopGroup eventLoopGroupBoss;
    private final NettyServerConfig nettyServerConfig;

    // 默认的处理requestCommand内容的执行器线程
    private final ExecutorService publicExecutor;

    // 连接事件监听器,用于心跳的事件监听
    private final ChannelEventListener channelEventListener;

    // 定时处理某些任务
    private final Timer timer = new Timer("ServerHouseKeepingService", true);
    
    // 执行handler方法的执行器,handlerRead\handlerActive等方法
    private DefaultEventExecutorGroup defaultEventExecutorGroup;

    // 一个类似AOP处理的类,publicExecutor处理command前后进行加工
    private RPCHook rpcHook;

    private int port = 0;

    public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
        this(nettyServerConfig, null);
    }

    public NettyRemotingServer(final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener) {
        // server端和client端的共同父类,将通用的东西提了一下,这里是两种调用方式的信号量大小,在父类设置
        super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());

        this.serverBootstrap = new ServerBootstrap();
        this.nettyServerConfig = nettyServerConfig;
        this.channelEventListener = channelEventListener;

        // 默认的处理requestCommand内容的执行器线程默认4个
        int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
        if (publicThreadNums <= 0) {
            publicThreadNums = 4;
        }

        // publicExecutor是固定大小线程池,默认4
        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
            }
        });

        // 接收客户端连接的group,只有一个selector线程
        this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet()));
            }
        });

        // 处理客户端读写的selector线程,默认3
        if (RemotingUtil.isLinuxPlatform() //linux下是epoll
            && nettyServerConfig.isUseEpollNativeSelector()) {
            this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        } else {
            this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        }
    }

  @Override 
    // 注册每种RequestCommand对应的消息处理器processor和执行processor的执行器
    public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
        ExecutorService executorThis = executor;
        // 没传执行器,有默认的
        if (null == executor) {
            executorThis = this.publicExecutor;
        }

        // Pair对象啥都没有,只是将两个对象组合起来
        Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);

        // 缓存,在真正处理消息时,根据消息requestCommand对应的code获取处理器处理
        this.processorTable.put(requestCode, pair);
    }

   @Override
    public void start() {
        // 执行handler方法的执行器组,默认8个线程
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });

        // 正经装配server端
        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024) // 客户端连接请求的队列大小
                .option(ChannelOption.SO_REUSEADDR, true) // 允许重复使用本地地址和端口
                .option(ChannelOption.SO_KEEPALIVE, false) 
                .childOption(ChannelOption.TCP_NODELAY, true)  // true不延迟,禁止使用Nagle算法
                .option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())  // 这里设置发送的socket缓冲区大小
                .option(ChannelOption.SO_RCVBUF,  nettyServerConfig.getServerSocketRcvBufSize())  // 设置接收的socket缓冲区大小,这两个参数没想通啊,不应该在childOption设置吗?
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) // 绑定端口
                .childHandler(new ChannelInitializer<SocketChannel>() {// 处理客户端消息的初始化处理器
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(
                            defaultEventExecutorGroup,
                            new NettyEncoder(),
                            new NettyDecoder(),
                            new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                            new NettyConnetManageHandler(),
                            new NettyServerHandler());
                    }
                });

         // 池化的Allocator
        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
            // 启动
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress)sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        // 如果监听心跳事件的监听器存在,启动nettyEvent事件处理线程池分发事件
        // 就是开了个线程,轮询事件队列,调用listenr的对应发放
        if (this.channelEventListener != null) { 
            this.nettyEventExecuter.start();
        }

        // 定时扫描responseTable变量,在父类,每次同步异步调用需要等待响应,这变量存的就是请求id对应的ResponseFuture对象,定时清理超时的
        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Exception e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }
}

@BeforeClass创建客户端

    public static RemotingClient createRemotingClient() {
        NettyClientConfig config = new NettyClientConfig();
        RemotingClient client = new NettyRemotingClient(config);
        client.start();
        return client;
    }

NettyClientConfig

public class NettyClientConfig {
    /**
     * Worker thread number
     */
    private int clientWorkerThreads = 4;// 处理handler方法的线程

    // 处理消息内容requestCommand的线程数,默认当前机器的逻辑处理器数量
    private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();

    //两种调用方式的并发控制信号量,可以用参数设置,默认是65535
    private int clientOnewaySemaphoreValue = NettySystemConfig.CLIENT_ONEWAY_SEMAPHORE_VALUE;
    private int clientAsyncSemaphoreValue = NettySystemConfig.CLIENT_ASYNC_SEMAPHORE_VALUE;

    private int connectTimeoutMillis = 3000;// 连接超时
    private long channelNotActiveInterval = 1000 * 60;// 多长时间不交互为不活跃

    /**
     * IdleStateEvent will be triggered when neither read nor write was performed for
     * the specified period of this time. Specify {@code 0} to disable
     */
    private int clientChannelMaxIdleTimeSeconds = 120;// 120秒没有读写会触发一个IdleStateEvent

    // socket发送和接收缓冲区大小,默认65535
    private int clientSocketSndBufSize = NettySystemConfig.socketSndbufSize;
    private int clientSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
    private boolean clientPooledByteBufAllocatorEnable = false;
    private boolean clientCloseSocketIfTimeout = false;

  //。。。。getter/setter
}

NettyRemotingClient

和服务端一样也是new和start方法

public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {

    private static final long LOCK_TIMEOUT_MILLIS = 3000;

    private final NettyClientConfig nettyClientConfig;
 
    // netty客户端api
    private final Bootstrap bootstrap = new Bootstrap();
    private final EventLoopGroup eventLoopGroupWorker;
    
    // 连接服务端时,用的锁
    private final Lock lockChannelTables = new ReentrantLock();
    // 每连接一个channel,放到这个map中,复用使用
    private final ConcurrentHashMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<>();

    private final Timer timer = new Timer("ClientHouseKeepingService", true);

    private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<>();
    private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<>();
    private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
    private final Lock lockNamesrvChannel = new ReentrantLock();

    private final ExecutorService publicExecutor;
    private final ChannelEventListener channelEventListener;
    private DefaultEventExecutorGroup defaultEventExecutorGroup;
    private RPCHook rpcHook;

    public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
        this(nettyClientConfig, null);
    }

    public NettyRemotingClient(final NettyClientConfig nettyClientConfig, //
                               final ChannelEventListener channelEventListener) {
        // 父类两个信号量设置
        super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());

        // 配置类
        this.nettyClientConfig = nettyClientConfig;
        
        // 心跳事件监听
        this.channelEventListener = channelEventListener;

        // 初始化默认的处理消息内容的执行器
        int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
        if (publicThreadNums <= 0) {
            publicThreadNums = 4;
        }

        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
            }
        });

        // 单线程selector
        this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
            }
        });
    }

  public void start() {
        // 执行handler方法的执行器,默认4个线程
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(//
            nettyClientConfig.getClientWorkerThreads(), //
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
                }
            });

        // 配置bootstrap,并没有connect,在调用的时候连接
        Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)//
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_KEEPALIVE, false)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(
                        defaultEventExecutorGroup,
                        new NettyEncoder(),
                        new NettyDecoder(),
                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                        new NettyConnectManageHandler(),
                        new NettyClientHandler());
                }
            });

        // 同服务端,定时扫描responseTable,删除超时的
        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    NettyRemotingClient.this.scanResponseTable();
                } catch (Exception e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);

        // 有心跳监听,启动事件分发器
        if (this.channelEventListener != null) {
            this.nettyEventExecuter.start();
        }
    }
}

NettyRemotingAbstract

NettyRemotingAbstract类是NettyRemotingServer、NettyRemotingClient的父类,抽象了一些客户端和服务端通用的方法,主要是三种调用方式、解析请求/响应RemotingCommand等

public abstract class NettyRemotingAbstract {

    // 异步和单向调用信号量
    protected final Semaphore semaphoreOneway;
    protected final Semaphore semaphoreAsync;

    // 同步异步调用时记录的响应future
    protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable =
        new ConcurrentHashMap<Integer, ResponseFuture>(256);

    // 注册的各种请求对应的请求处理类和执行器
    protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
        new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);

    // 心跳等事件分发,有监听时启用
    protected final NettyEventExecuter nettyEventExecuter = new NettyEventExecuter();

    // 默认的请求处理
    protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;

    public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) {
        this.semaphoreOneway = new Semaphore(permitsOneway, true);
        this.semaphoreAsync = new Semaphore(permitsAsync, true);
    }

}

processMessageReceived方法

**处理请求和响应的requestCommand和responseCommand,这个方法在server端和client端最后一个handler(****NettyServerHandler/**NettyClientHandler)的channelRead被调用

   public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }

   // 处理请求command
    public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
        // 从注册表获取这个请求对应的处理类和执行器
        final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
        // 没从注册表找到,使用默认的处理器
        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;

        // opaque 是请求id,请求方在同一连接上的不同请求号
        final int opaque = cmd.getOpaque();

        if (pair != null) {
            // 处理器和执行器不为null,new一个任务交给执行器执行,任务就是处理请求requestCommand
            Runnable run = new Runnable() {
                @Override
                public void run() {
                    try {
                        // 上边提到的类似AOP对象,请求处理前后加工
                        RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
                        if (rpcHook != null) {
                            rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                        }

                        // 对应请求处理器处理请求内容
                        final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);

                        if (rpcHook != null) {
                            rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                        }

                        // 不是单向的就得给返回
                        if (!cmd.isOnewayRPC()) {
                            if (response != null) {
                                response.setOpaque(opaque);
                                response.markResponseType();
                                try {
                                    ctx.writeAndFlush(response);
                                } catch (Throwable e) {
                                    PLOG.error("process request over, but response failed", e);
                                    PLOG.error(cmd.toString());
                                    PLOG.error(response.toString());
                                }
                            } else {

                            }
                        }
                    } catch (Throwable e) {
                        if (!"com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException"
                            .equals(e.getClass().getCanonicalName())) {
                            PLOG.error("process request exception", e);
                            PLOG.error(cmd.toString());
                        }

                        if (!cmd.isOnewayRPC()) {
                            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //
                                RemotingHelper.exceptionSimpleDesc(e));
                            response.setOpaque(opaque);
                            ctx.writeAndFlush(response);
                        }
                    }
                }
            };

            // 如果处理器拒绝,创建一个RemotingCommand返回
            if (pair.getObject1().rejectRequest()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[REJECTREQUEST]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
                return;
            }

            try {
                // 封装上边的Runnable,交给执行器执行
                final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                pair.getObject2().submit(requestTask);
            } catch (RejectedExecutionException e) {
                // 线程池满了,任务被拒绝,异常返回
                if (!cmd.isOnewayRPC()) {
                    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                        "[OVERLOAD]system busy, start flow control for a while");
                    response.setOpaque(opaque);
                    ctx.writeAndFlush(response);
                }
            }
        } else {
            // 无对应的请求处理器,异常返回
            String error = " request type " + cmd.getCode() + " not supported";
            final RemotingCommand response =
                RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
            PLOG.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
        }
    }

 // 处理响应command
   public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
        final int opaque = cmd.getOpaque();
        
        // 从等待响应的responseFuture的集合中把对应的等待future取出来,设置成功状态
        final ResponseFuture responseFuture = responseTable.get(opaque);
        if (responseFuture != null) {
            responseFuture.setResponseCommand(cmd);

            // 释放内部一个信号量
            responseFuture.release();

            // 集合中移除
            responseTable.remove(opaque);

            // 如果是异步调用,执行回调方法
            if (responseFuture.getInvokeCallback() != null) {
                executeInvokeCallback(responseFuture);
            } else {
             // 同步调用,同步底层使用CountDownLatch实现,这里就是countDownLatch.countDown();
                responseFuture.putResponse(cmd);
            }
        } else {
            PLOG.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            PLOG.warn(cmd.toString());
        }
    }

@Test testInvokeSync 同步调用

    @Test
    public void testInvokeSync() throws InterruptedException, RemotingConnectException,
        RemotingSendRequestException, RemotingTimeoutException {
        // new一个header对象,但是序列化的时候不传这个,好像没用,就是在客户端表示一下
        RequestHeader requestHeader = new RequestHeader();
        requestHeader.setCount(1);
        requestHeader.setMessageTitle("Welcome");

        // 创建一个请求的RemotingCommand
        RemotingCommand request = RemotingCommand.createRequestCommand(0, requestHeader);
        // 连接参数地址并调用,超时3秒
        RemotingCommand response = remotingClient.invokeSync("localhost:8888", request, 1000 * 3);
        assertTrue(response != null);
        assertThat(response.getLanguage()).isEqualTo(LanguageCode.JAVA);
        assertThat(response.getExtFields()).hasSize(2);
    }

NettyRemotingClient#invokeSync

    @Override
    public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {

        // channel复用,bootstrap.connect连接返回的channel都会放到一个map中,供下次使用
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                // aop处理requestCommand
                if (this.rpcHook != null) {
                    this.rpcHook.doBeforeRequest(addr, request);
                }
                // 正经调用方法,在父类
                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis);
                if (this.rpcHook != null) {
                    this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
                }
                return response;
            } catch (RemotingSendRequestException e) {
                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            } catch (RemotingTimeoutException e) {
                // 超时是否关闭channel,默认不关
                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                    this.closeChannel(addr, channel);
                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
                }
                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }

NettyRemotingAbstract#invokeSyncImpl

    public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        final int opaque = request.getOpaque();

        try {// 为这个请求id新建一个future,等待请求的响应
            final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
            // 放入集合,key为请求id
            this.responseTable.put(opaque, responseFuture);

            // 
            final SocketAddress addr = channel.remoteAddress();
            // 发送并设置监听回调
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    // 设置成功失败
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    } else {
                        responseFuture.setSendRequestOK(false);
                    }

                    // 集合中移除响应future
                    responseTable.remove(opaque);
                    responseFuture.setCause(f.cause());

                    // 响应完成,同步阻塞结束,底层CountDownLatch实现
                    responseFuture.putResponse(null);
                    PLOG.warn("send a request command to channel <" + addr + "> failed.");
                }
            });

            // 同步阻塞等待响应,CountDownLatch实现
            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            if (null == responseCommand) {
                if (responseFuture.isSendRequestOK()) {
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
                } else {
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }

            return responseCommand;
        } finally {
            this.responseTable.remove(opaque);
        }
    }

public class ResponseFuture {
    public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
        this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
        return this.responseCommand;
    }

    public void putResponse(final RemotingCommand responseCommand) {
        this.responseCommand = responseCommand;
        this.countDownLatch.countDown();
    }
}

@Test testInvokeOneway 单向调用

   @Test
    public void testInvokeOneway() throws InterruptedException, RemotingConnectException,
        RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException {

        RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
        request.setRemark("messi");
        request.setBody("a".getBytes());
        remotingClient.invokeOneway("localhost:8888", request, 1000 * 3);
    }

NettyRemotingClient#invokeOneway

   @Override
    public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
        RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                if (this.rpcHook != null) {
                    this.rpcHook.doBeforeRequest(addr, request);
                }
                this.invokeOnewayImpl(channel, request, timeoutMillis);
            } catch (RemotingSendRequestException e) {
                log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }

NettyRemotingAbstract#invokeOnewayImpl

    public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        // 这一步设置的标志位暂不知道是啥
        request.markOnewayRPC();

        // 单向调用信号量控制,最大调用的并发数
        boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
            try {
                // 调用后接收到响应后,释放信号量
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        once.release();
                        if (!f.isSuccess()) {
                            PLOG.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
                        }
                    }
                });
            } catch (Exception e) {
                // 释放信号量
                once.release();
                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
            }
        } else {
            if (timeoutMillis <= 0) {
                throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
            } else {
                String info = String.format(
                    "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
                    timeoutMillis, //
                    this.semaphoreOneway.getQueueLength(), //
                    this.semaphoreOneway.availablePermits()//
                );
                PLOG.warn(info);
                throw new RemotingTimeoutException(info);
            }
        }
    }

@Test testInvokeAsync 异步调用

    @Test
    public void testInvokeAsync() throws InterruptedException, RemotingConnectException,
        RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException {

        final CountDownLatch latch = new CountDownLatch(1);
        RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
        request.setRemark("messi");

        // 异步调用,设置自定义InvokeCallback回调
        remotingClient.invokeAsync("localhost:8888", request, 1000 * 3, new InvokeCallback() {
            @Override
            public void operationComplete(ResponseFuture responseFuture) {
                latch.countDown();
                assertTrue(responseFuture != null);
                assertThat(responseFuture.getResponseCommand().getLanguage()).isEqualTo(LanguageCode.JAVA);
                assertThat(responseFuture.getResponseCommand().getExtFields()).hasSize(2);
            }
        });

        // 测试方法,等待响应后再结束
        latch.await();
    }

NettyRemotingClient#invokeAsync

    @Override
    public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
        throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException {
        // 获取channel或connect到服务端获取一个channel
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                if (this.rpcHook != null) {
                    this.rpcHook.doBeforeRequest(addr, request);
                }
                this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
            } catch (RemotingSendRequestException e) {
                log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }

NettyRemotingAbstract#invokeAsyncImpl

   public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        
        // 获取请求id
        final int opaque = request.getOpaque();
        // 并发信号量
        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);

            final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once);
            this.responseTable.put(opaque, responseFuture);
            try {
                // 发送command,设置回调
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        if (f.isSuccess()) {
                            responseFuture.setSendRequestOK(true);
                            return;
                        } else {
                            responseFuture.setSendRequestOK(false);
                        }

                        // 设置结束标志,集合中移除此future
                        responseFuture.putResponse(null);
                        responseTable.remove(opaque);
                        try {
                            // 执行回调,释放信号量
                            executeInvokeCallback(responseFuture);
                        } catch (Throwable e) {
                            PLOG.warn("excute callback in writeAndFlush addListener, and callback throw", e);
                        } finally {
                            responseFuture.release();
                        }

                        PLOG.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                    }
                });
            } catch (Exception e) {
                responseFuture.release();

                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
            }
        } else {
            String info =
                String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
                    timeoutMillis, //
                    this.semaphoreAsync.getQueueLength(), //
                    this.semaphoreAsync.availablePermits()//
                );
            PLOG.warn(info);
            throw new RemotingTooMuchRequestException(info);
        }
    }

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: