本文所有讨论都基于Dubbo 2.7.2
上一篇分析了Dubbo暴露服务的过程,当Provider服务启动后,网络传输层会使用Netty、Mina等框架接收处理数据,这一篇详细介绍一下这个处理Dubbo协议数据的过程。
协议格式
Dubbo协议格式如下:Header(16 bytes) + Body(n bytes)
Magic (2 byte)
常数=0xdabb
Req/Res (1 bit)
标识当前是一个Request还是一个Response。1 → Req,0 → Res
2 Way (1 bit)
只有是一个Request时才有用,标识是否需要服务器对当前请求返回一个值。1 → 需要返回值
Event (1 bit)
标识当前数据是否是一个Event,例如心跳。1 → 是一个Event
Serialization ID (5 bit)
序列化标识。2 → Hessian2,6 → FastJson,21 → Protobuf
1
2
3
4
5
6
7
8
9
10
11
12byte HESSIAN2_SERIALIZATION_ID = 2;
byte JAVA_SERIALIZATION_ID = 3;
byte COMPACTED_JAVA_SERIALIZATION_ID = 4;
byte FASTJSON_SERIALIZATION_ID = 6;
byte NATIVE_JAVA_SERIALIZATION_ID = 7;
byte KRYO_SERIALIZATION_ID = 8;
byte FST_SERIALIZATION_ID = 9;
byte NATIVE_HESSIAN_SERIALIZATION_ID = 10;
byte PROTOSTUFF_SERIALIZATION_ID = 12;
byte AVRO_SERIALIZATION_ID = 11;
byte GSON_SERIALIZATION_ID = 16;
byte PROTOBUF_JSON_SERIALIZATION_ID = 21;Status (1 byte)
只有是一个Response时才有用,标识当前response的状态,有点像Http Code
1
2
3
4
5
6
7
8
9
10
11byte OK = 20;
byte CLIENT_TIMEOUT = 30;
byte SERVER_TIMEOUT = 31;
byte CHANNEL_INACTIVE = 35;
byte BAD_REQUEST = 40;
byte BAD_RESPONSE = 50;
byte SERVICE_NOT_FOUND = 60;
byte SERVICE_ERROR = 70;
byte SERVER_ERROR = 80;
byte CLIENT_ERROR = 90;
byte SERVER_THREADPOOL_EXHAUSTED_ERROR = 100;Request ID (8 bytes)
每个Request都会分配一个ID
Data Length (4 bytes)
Payload的长度
PayLoad (n bytes)
RPC数据,长度由Data Length字段指明
协议解析
Dubbo里与协议解析相关的两部分是Exchanger和Transporter,Transporter以ByteBuf和Message为中心负责读写数据并序列化和反序列化,Exchanger以Request和Response为核心处理数据。
这样划分只是帮助理解,并不是必须这样。
看一下decode的源码
1 | //DubboCodec.java |
相关源码
上一篇讲到Provider暴露服务时,要启动服务等待Consumer连接,这里就从启动服务开始看吧。
1 | //HeaderExchanger |
Dubbo里的Handler很多,这里有必要详细解释一下,否则很快就晕了。我们可以认为DecodeHandler属于Transporter层,HeaderExchangeHandler属于Exchanger层,Exchanger里面又包含了Invoker,这样就完成了Transporter与Exchanger直到内部Invoker的交互。
上面有一行关键的代码:
1 | //先更新了URL里的parameter,更新后添加了一个参数threadname=DubboServerHandler-xxx:20880,这个参数会在后面线程池创建线程时使用 |
为了提高效率,I/O线程(对于Netty,这里指的是WorkerEventLoop)解析完数据后,Dubbo会创建自己的线程池进行异步处理,Dispatcher
负责将新到来的请求根据条件分发到线程池中,通过SPI机制,可以个性化Dispatcher的行为,比如Dubbo提供的ExecutionChannelHandler
,可以只对RPC请求才分发异步处理,而别的心跳事件不分发,还用I/O线程处理。
目前支持的几种Dispatcher:
1 | all=org.apache.dubbo.remoting.transport.dispatcher.all.AllDispatcher |
1 | public class AllDispatcher implements Dispatcher { |
底层网络框架以Netty为例,处理协议的各个Handler
如下,其中虚线左侧是Netty的Handler,处理codec和(de)serializaion;右侧是Dubbo自定义的Handler,用于处理RPC的请求。NettyServerHandler
是Netty与Dubbo的桥梁。两个SPI分别用于选择合适的Dispatcher和ThreadPool。
接着看一下DecodeHandler的源码
1 | public void received(Channel channel, Object message) throws RemotingException { |
接下来Exchanger开始处理
1 | //HeaderExchangeHandler.java |
涉及到的参数
名字 | 默认值 | 用途 |
---|---|---|
payload | 8M | dubbo协议中,payload的最大值 |
decode.in.io | true | 是否在I/O线程中进行decode |
dispather | all | 选择的Dispatcher策略 |
threadpool | fixed | 选择哪种线程池 |
参考