10、Dubbo源码解析:dubbo服务的完整调用过程

参考文献dubbo官网-dubbo服务的完整调用过程

感受学习业内优秀开源分布式框架的底层rpc实现。

调用过程大致可以分为六个阶段,这里只贴出服务调用各个阶段的调用栈进行备忘,详细源码分析请点击原文链接进行阅读

1服务消费方(dubbo-consumer)发布请求
调用栈

proxy0#sayHello(String)
  —> InvokerInvocationHandler#invoke(Object, Method, Object[])
    —> MockClusterInvoker#invoke(Invocation)
      —> AbstractClusterInvoker#invoke(Invocation)
        —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
          —> Filter#invoke(Invoker, Invocation)  // 包含多个 Filter 调用
            —> ListenerInvokerWrapper#invoke(Invocation) 
              —> AbstractInvoker#invoke(Invocation) 
                —> DubboInvoker#doInvoke(Invocation)
                  —> ReferenceCountExchangeClient#request(Object, int)
                    —> HeaderExchangeClient#request(Object, int)
                      —> HeaderExchangeChannel#request(Object, int)
                        —> AbstractPeer#send(Object)
                          —> AbstractClient#send(Object, boolean)
                            —> NettyChannel#send(Object, boolean)
                              —> NioClientSocketChannel#write(Object)


2请求(request)编码
dubbo数据包
*
请求头

偏移量(Bit) 字段 取值
0 ~ 7 魔数高位 0xda00
8 ~ 15 魔数低位 0xbb
16 数据包类型 0 - Response, 1 - Request
17 调用方式 仅在第16位被设为1的情况下有效,0 - 单向调用,1 - 双向调用
18 事件标识 0 - 当前数据包是请求或响应包,1 - 当前数据包是心跳包
19 ~ 23 序列化器编号 2 - Hessian2Serialization 3 - JavaSerialization 4 - CompactedJavaSerialization 6 - FastJsonSerialization 7 - NativeJavaSerialization 8 - KryoSerialization 9 - FstSerialization
24 ~ 31 状态 20 - OK 30 - CLIENT_TIMEOUT 31 - SERVER_TIMEOUT 40 - BAD_REQUEST 50 - BAD_RESPONSE …
32 ~ 95 请求编号 共8字节,运行时生成
96 ~ 127 消息体长度 运行时计算

调用栈

ExchangeCodec#encode(Channel channel, ChannelBuffer buffer, Object msg)
	->ExchangeCodec#encodeRequest(Channel channel, ChannelBuffer buffer, Request req)
		->DubboCodec#encodeRequestData(Channel channel, ObjectOutput out, Object data, String version)


3服务提供方(dubbo-provider)解码请求
调用栈

ExchangeCodec#decode(Channel channel, ChannelBuffer buffer)
	->ExchangeCodec#decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header)
		->DubboCodec#decodeBody(Channel channel, InputStream is, byte[] header)
			->DecodeableRpcInvocation#decode


4服务提供方调用服务
提供方处理请求的线程模型
解码器将数据包解析成 Request 对象后,NettyHandler 的 messageReceived 方法紧接着会收到这个对象,并将这个对象继续向下传递。这期间该对象会被依次传递给 NettyServer、MultiMessageHandler、HeartbeatHandler 以及 AllChannelHandler。最后由 AllChannelHandler 将该对象封装到 Runnable 实现类对象中,并将 Runnable 放入线程池中执行后续的调用逻辑。

调用栈

NettyHandler#messageReceived(ChannelHandlerContext, MessageEvent)
  —> AbstractPeer#received(Channel, Object)
    —> MultiMessageHandler#received(Channel, Object)
      —> HeartbeatHandler#received(Channel, Object)
        —> AllChannelHandler#received(Channel, Object)
          —> ExecutorService#execute(Runnable)    // 由线程池执行后续的调用逻辑

在不同的子线程里进行实际的服务调用,整个调用栈如下

ChannelEventRunnable#run()
  —> DecodeHandler#received(Channel, Object)
    —> HeaderExchangeHandler#received(Channel, Object)
      —> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
        —> DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
          —> Filter#invoke(Invoker, Invocation)
            —> AbstractProxyInvoker#invoke(Invocation)
              —> Wrapper0#invokeMethod(Object, String, Class[], Object[])
                —> DemoServiceImpl#sayHello(String)


5服务提供方返回调用结果(response)
服务提供方调用指定服务后,会将调用结果封装到 Response 对象中,并将该对象返回给服务消费方。服务提供方也是通过 NettyChannel 的 send 方法将 Response 对象返回。

调用栈

ExchangeCodec#(Channel channel, ChannelBuffer buffer, Object msg)
	->ExchangeCodec#encodeResponse(Channel channel, ChannelBuffer buffer, Response res)
		->DubboCodec#encodeResponseData(Channel channel, ObjectOutput out, Object data, String version)


6服务消费方接收调用结果
服务消费方在收到响应数据后,首先要做的事情是对响应数据进行解码,得到 Response 对象。然后再将该对象传递给下一个入站处理器,这个入站处理器就是 NettyHandler。接下来 NettyHandler 会将这个对象继续向下传递,最后 AllChannelHandler 的 received 方法会收到这个对象,并将这个对象派发到线程池中。

响应数据解码-调用栈

DubboCodec#decodeBody(Channel channel, InputStream is, byte[] header)
	->DecodeableRpcResult#DecodeableRpcResult(Channel channel, Response response, InputStream is, Invocation invocation, byte id)
		->DecodeableRpcResult#decode()
			->DecodeableRpcResult#decode(Channel channel, InputStream input)

解码接收到的数据的线程与rpc请求最初发起的线程必定不是同一个线程,所以最后要解决的问题就是如何将调用结果传递给用户线程。dubbo设计了一个类似于 java.util.concurrent.Future的ResponseFuture(具体实现类为DefaultFuture)。使用ReentrantLock进行线程通讯。在用户线程发起请求时会调用condition.awit()对用户线程进行阻塞,在接收到响应结果后反序列化并塞入DefaultFuture的response字段,此时调用condition.signal()唤醒用户线程,用户线程便可拿到序列化后的结果。更具体的实现推荐直接阅读源码,这里只给出调用栈。

序列化结果传递-调用栈

HeaderExchangeHandler#received(Channel channel, Object message)
	->HeaderExchangeHandler#handleResponse(Channel channel, Response response)
		->DefaultFuture#received(Channel channel, Response response)
			->DefaultFuture#doReceived(Response res)

一般情况下,服务消费方会并发调用多个服务,每个用户线程发送请求后,会调用不同 DefaultFuture 对象的 get 方法进行等待。 一段时间后,服务消费方的线程池会收到多个响应对象。这个时候要考虑一个问题,如何将每个响应对象传递给相应的 DefaultFuture 对象,且不出错。答案是通过调用编号。DefaultFuture 被创建时,会要求传入一个 Request 对象。此时 DefaultFuture 可从 Request 对象中获取调用编号,并将 <调用编号, DefaultFuture 对象> 映射关系存入到静态 Map 中,即 FUTURES。线程池中的线程在收到 Response 对象后,会根据 Response 对象中的调用编号到 FUTURES 集合中取出相应的 DefaultFuture 对象,然后再将 Response 对象设置到 DefaultFuture 对象中。最后再唤醒用户线程,这样用户线程即可从 DefaultFuture 对象中获取调用结果了。整个过程大致如下图:
*

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: