RemotingCommand
消息传输过程中的对数据内容的封装类,结构如下分四部分
1、 消息长度:消息的总长度,int类型,四个字节存储;
2、 序列化类型&&头部长度:int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;
3、 消息头数据:经过序列化后的消息头数据;
4、 消息主体数据:消息主体的二进制字节数据内容;
序列化类为SerializeType ,类型有两,json消息和RocketMQ自定义消息
public enum SerializeType {
JSON((byte) 0),
ROCKETMQ((byte) 1);
private byte code;
SerializeType(byte code) {
this.code = code;
}
}
public class RemotingCommand {
private int code;// 处理请求消息时根据这个字段来走不同的消息处理类,响应时0成功,其他失败
private LanguageCode language = LanguageCode.JAVA;// 双方实现语言
private int version = 0;// 请求方和应答方程序版本
private int opaque = requestId.getAndIncrement(); // AtomicInteger每次加一,请求id,响应与请求相同
private int flag = 0;// 标志位,判断是否oneway请求、是否是响应command
private String remark;// 返回信息
private HashMap<String, String> extFields;// 自定义字段
private transient CommandCustomHeader customHeader;// 自定义头,不进行序列化
// 序列化类型,参数rocketmq.serialize.type配置
private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
private static SerializeType serializeTypeConfigInThisServer = SerializeType.JSON;
public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type";
public static final String SERIALIZE_TYPE_ENV = "ROCKETMQ_SERIALIZE_TYPE";
static {
final String protocol = System.getProperty(SERIALIZE_TYPE_PROPERTY, System.getenv(SERIALIZE_TYPE_ENV));
if (!isBlank(protocol)) {
try {
serializeTypeConfigInThisServer = SerializeType.valueOf(protocol);
} catch (IllegalArgumentException e) {
throw new RuntimeException("parser specified protocol error. protocol=" + protocol, e);
}
}
}
private transient byte[] body;
protected RemotingCommand() {
}
public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
RemotingCommand cmd = new RemotingCommand();
cmd.setCode(code);
cmd.customHeader = customHeader;
setCmdVersion(cmd);
return cmd;
}
}
这些结构在看服务端和客户端socketChannel的handler代码时可以看出来,下边这两段是rocketmq的netty服务端和客户端的bootstrap代码,从源码看handler中如何处理remotingCommand和消息结构
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)//
// 忽略一些代码
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler());
}
});
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(NioServerSocketChannel.class)
// 忽略一些代码
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnetManageHandler(),
new NettyServerHandler());
}
});
NettyEncoder和NettyDecoder分别是对RemotingCommand的编码解码
IdleStateHandler是心跳
NettyConnectManageHandler是对Registered、Active、exceptionCaught等事件的发布和处理
NettyClientHandler和NettyServerHandler就是个channelRead0,调用NettyRemotingAbstract#processMessageReceived正经处理消息,上篇看过
主要看NettyEncoder和NettyDecoder
NettyEncoder
这里一对编解码处理器和netty自带的**LengthFieldPrepender和LengthFieldBasedFrameDecoder**类似,都是编码加上长度、解码根据长度解出消息,粘包拆包
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
ByteBuffer header = remotingCommand.encodeHeader();
out.writeBytes(header);
byte[] body = remotingCommand.getBody();
if (body != null) {
out.writeBytes(body);
}
} catch (Exception e) {
log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
if (remotingCommand != null) {
log.error(remotingCommand.toString());
}
RemotingUtil.closeChannel(ctx.channel());
}
}
}
//org.apache.rocketmq.remoting.protocol.RemotingCommand#encodeHeader()
public ByteBuffer encodeHeader() {
return encodeHeader(this.body != null ? this.body.length : 0);
}
// 这个返回除过消息体外的消息头bytrBuffer,
// 三部分,1)一个int的总长度 2)一个int的消息类型+头数据长度 3)头数据
public ByteBuffer encodeHeader(final int bodyLength) {
// 1> header length size
int length = 4;// 这个length是总长度
// 2> header data length
byte[] headerData;
// 这里根据command的序列化类型,json的转json,rocketMQ的将各内部变量按顺序组成byte[]
headerData = this.headerEncode();
length += headerData.length;
// 3> body data length
length += bodyLength;
// 分配空间,不包括消息体的byteBuffer
ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
// length
// 第一部分 一个int的总长度
result.putInt(length);
// header length
// 第二部分 一个int的消息类型+头数据长度
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
// 第三部分 头数据
result.put(headerData);
// 切读模式
result.flip();
return result;
}
private byte[] headerEncode() {
this.makeCustomHeaderToNet();
if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
return RocketMQSerializable.rocketMQProtocolEncode(this);
} else {
return RemotingSerializable.encode(this);
}
}
public static byte[] markProtocolType(int source, SerializeType type) {
byte[] result = new byte[4];
// 四个byte,高8位是消息序列化类型,后24位是消息头数据长度
result[0] = type.getCode();
result[1] = (byte) ((source >> 16) & 0xFF);
result[2] = (byte) ((source >> 8) & 0xFF);
result[3] = (byte) (source & 0xFF);
return result;
}
NettyDecoder
public class NettyDecoder extends LengthFieldBasedFrameDecoder {
private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private static final int FRAME_MAX_LENGTH = //
Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));
public NettyDecoder() {
// 最大处理消息总长度16M,长度段为4个字节
super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
}
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
try {
// 之前文章看过,详情看上边链接
// 大概就是,根据构造函数中的长度段,解出长度,粘包拆包取出对应长度的消息体
// 解析出的frame是除过第一部分的总长度的RemotingCommand
frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
return null;
}
ByteBuffer byteBuffer = frame.nioBuffer();
return RemotingCommand.decode(byteBuffer);
} catch (Exception e) {
log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
RemotingUtil.closeChannel(ctx.channel());
} finally {
if (null != frame) {
frame.release();
}
}
return null;
}
}
//org.apache.rocketmq.remoting.protocol.RemotingCommand#decode(java.nio.ByteBuffer)
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
int length = byteBuffer.limit();// 消息总长
int oriHeaderLen = byteBuffer.getInt();// RemotingCommand的第二部分,序列化类型和消息头长度
// 获取消息头长度,int型后24位
int headerLength = getHeaderLength(oriHeaderLen);
// 获取消息头数据
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
// getProtocolType获取前8位的序列化类型
// headerDecode根据序列化类型json或rocketmq,组装RemotingCommand
// fastJson \ 根据编码时对成员变量的写入顺序,反向组装
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
// 读出body数据体写入RemotingCommand
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;
}
public static int getHeaderLength(int length) {
return length & 0xFFFFFF;
}
public static SerializeType getProtocolType(int source) {
return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
}
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: