02、RocketMQ源码分析:RemotingCommand、NettyEncoder和NettyDecoder 

RemotingCommand

消息传输过程中的对数据内容的封装类,结构如下分四部分

1、 消息长度:消息的总长度,int类型,四个字节存储;
2、 序列化类型&&头部长度:int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;
3、 消息头数据:经过序列化后的消息头数据;
4、 消息主体数据:消息主体的二进制字节数据内容;

序列化类为SerializeType ,类型有两,json消息和RocketMQ自定义消息

public enum SerializeType {
    JSON((byte) 0),
    ROCKETMQ((byte) 1);

    private byte code;

    SerializeType(byte code) {
        this.code = code;
    }
}
public class RemotingCommand {

    private int code;// 处理请求消息时根据这个字段来走不同的消息处理类,响应时0成功,其他失败
    private LanguageCode language = LanguageCode.JAVA;// 双方实现语言
    private int version = 0;// 请求方和应答方程序版本
    private int opaque = requestId.getAndIncrement(); // AtomicInteger每次加一,请求id,响应与请求相同
    private int flag = 0;// 标志位,判断是否oneway请求、是否是响应command
    private String remark;// 返回信息
    private HashMap<String, String> extFields;// 自定义字段
    private transient CommandCustomHeader customHeader;// 自定义头,不进行序列化

    // 序列化类型,参数rocketmq.serialize.type配置
    private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
    private static SerializeType serializeTypeConfigInThisServer = SerializeType.JSON;
    public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type";
    public static final String SERIALIZE_TYPE_ENV = "ROCKETMQ_SERIALIZE_TYPE";

    static {
        final String protocol = System.getProperty(SERIALIZE_TYPE_PROPERTY, System.getenv(SERIALIZE_TYPE_ENV));
        if (!isBlank(protocol)) {
            try {
                serializeTypeConfigInThisServer = SerializeType.valueOf(protocol);
            } catch (IllegalArgumentException e) {
                throw new RuntimeException("parser specified protocol error. protocol=" + protocol, e);
            }
        }
    }

    private transient byte[] body;

    protected RemotingCommand() {
    }

    public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
        RemotingCommand cmd = new RemotingCommand();
        cmd.setCode(code);
        cmd.customHeader = customHeader;
        setCmdVersion(cmd);
        return cmd;
    }
}

这些结构在看服务端和客户端socketChannel的handler代码时可以看出来,下边这两段是rocketmq的netty服务端和客户端的bootstrap代码,从源码看handler中如何处理remotingCommand和消息结构

       Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)//
       // 忽略一些代码
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(
                        defaultEventExecutorGroup,
                        new NettyEncoder(),
                        new NettyDecoder(),
                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                        new NettyConnectManageHandler(),
                        new NettyClientHandler());
                }
            });
       ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(NioServerSocketChannel.class)
               // 忽略一些代码
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(
                            defaultEventExecutorGroup,
                            new NettyEncoder(),
                            new NettyDecoder(),
                            new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                            new NettyConnetManageHandler(),
                            new NettyServerHandler());
                    }
                });

NettyEncoder和NettyDecoder分别是对RemotingCommand的编码解码

IdleStateHandler是心跳

NettyConnectManageHandler是对Registered、Active、exceptionCaught等事件的发布和处理

NettyClientHandler和NettyServerHandler就是个channelRead0,调用NettyRemotingAbstract#processMessageReceived正经处理消息,上篇看过

主要看NettyEncoder和NettyDecoder

NettyEncoder

这里一对编解码处理器和netty自带的**LengthFieldPrepender和LengthFieldBasedFrameDecoder**类似,都是编码加上长度、解码根据长度解出消息,粘包拆包

public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
    private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

    @Override
    public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
        throws Exception {
        try {
            ByteBuffer header = remotingCommand.encodeHeader();
            out.writeBytes(header);
            byte[] body = remotingCommand.getBody();
            if (body != null) {
                out.writeBytes(body);
            }
        } catch (Exception e) {
            log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
            if (remotingCommand != null) {
                log.error(remotingCommand.toString());
            }
            RemotingUtil.closeChannel(ctx.channel());
        }
    }
}

//org.apache.rocketmq.remoting.protocol.RemotingCommand#encodeHeader()
   public ByteBuffer encodeHeader() {
        return encodeHeader(this.body != null ? this.body.length : 0);
    }

// 这个返回除过消息体外的消息头bytrBuffer,
// 三部分,1)一个int的总长度  2)一个int的消息类型+头数据长度  3)头数据
    public ByteBuffer encodeHeader(final int bodyLength) {
        // 1> header length size
        int length = 4;// 这个length是总长度

        // 2> header data length
        byte[] headerData;
        // 这里根据command的序列化类型,json的转json,rocketMQ的将各内部变量按顺序组成byte[]
        headerData = this.headerEncode();

        length += headerData.length;

        // 3> body data length
        length += bodyLength;

        // 分配空间,不包括消息体的byteBuffer
        ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);

        // length
        // 第一部分 一个int的总长度
        result.putInt(length);

        // header length
        // 第二部分 一个int的消息类型+头数据长度
        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

        // header data
        // 第三部分 头数据
        result.put(headerData);

        // 切读模式
        result.flip();

        return result;
    }

   private byte[] headerEncode() {
        this.makeCustomHeaderToNet();
        if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
            return RocketMQSerializable.rocketMQProtocolEncode(this);
        } else {
            return RemotingSerializable.encode(this);
        }
    }

    public static byte[] markProtocolType(int source, SerializeType type) {
        byte[] result = new byte[4];

        // 四个byte,高8位是消息序列化类型,后24位是消息头数据长度
        result[0] = type.getCode();
        result[1] = (byte) ((source >> 16) & 0xFF);
        result[2] = (byte) ((source >> 8) & 0xFF);
        result[3] = (byte) (source & 0xFF);
        return result;
    }

NettyDecoder

public class NettyDecoder extends LengthFieldBasedFrameDecoder {
    private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
    private static final int FRAME_MAX_LENGTH = //
        Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));

    public NettyDecoder() {
        // 最大处理消息总长度16M,长度段为4个字节
        super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
    }

    @Override
    public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf frame = null;
        try {
            // 之前文章看过,详情看上边链接
            // 大概就是,根据构造函数中的长度段,解出长度,粘包拆包取出对应长度的消息体
            // 解析出的frame是除过第一部分的总长度的RemotingCommand
            frame = (ByteBuf) super.decode(ctx, in);
            if (null == frame) {
                return null;
            }

            ByteBuffer byteBuffer = frame.nioBuffer();

            return RemotingCommand.decode(byteBuffer);
        } catch (Exception e) {
            log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
            RemotingUtil.closeChannel(ctx.channel());
        } finally {
            if (null != frame) {
                frame.release();
            }
        }

        return null;
    }
}
//org.apache.rocketmq.remoting.protocol.RemotingCommand#decode(java.nio.ByteBuffer)
   public static RemotingCommand decode(final ByteBuffer byteBuffer) {
        int length = byteBuffer.limit();// 消息总长
        int oriHeaderLen = byteBuffer.getInt();// RemotingCommand的第二部分,序列化类型和消息头长度
        // 获取消息头长度,int型后24位
        int headerLength = getHeaderLength(oriHeaderLen);

        // 获取消息头数据
        byte[] headerData = new byte[headerLength];
        byteBuffer.get(headerData);

        // getProtocolType获取前8位的序列化类型
        // headerDecode根据序列化类型json或rocketmq,组装RemotingCommand
        // fastJson \ 根据编码时对成员变量的写入顺序,反向组装
        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

        // 读出body数据体写入RemotingCommand
        int bodyLength = length - 4 - headerLength;
        byte[] bodyData = null;
        if (bodyLength > 0) {
            bodyData = new byte[bodyLength];
            byteBuffer.get(bodyData);
        }
        cmd.body = bodyData;

        return cmd;
    }

    public static int getHeaderLength(int length) {
        return length & 0xFFFFFF;
    }

   public static SerializeType getProtocolType(int source) {
        return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
    }

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: