Dubbo 协议解析

本文所有讨论都基于Dubbo 2.7.2

上一篇分析了Dubbo暴露服务的过程,当Provider服务启动后,网络传输层会使用Netty、Mina等框架接收处理数据,这一篇详细介绍一下这个处理Dubbo协议数据的过程。

协议格式

Dubbo协议格式如下:Header(16 bytes) + Body(n bytes)

  • Magic (2 byte)

    常数=0xdabb

  • Req/Res (1 bit)

    标识当前是一个Request还是一个Response。1 → Req,0 → Res

  • 2 Way (1 bit)

    只有是一个Request时才有用,标识是否需要服务器对当前请求返回一个值。1 → 需要返回值

  • Event (1 bit)

    标识当前数据是否是一个Event,例如心跳。1 → 是一个Event

  • Serialization ID (5 bit)

    序列化标识。2 → Hessian2,6 → FastJson,21 → Protobuf

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    byte HESSIAN2_SERIALIZATION_ID = 2;
    byte JAVA_SERIALIZATION_ID = 3;
    byte COMPACTED_JAVA_SERIALIZATION_ID = 4;
    byte FASTJSON_SERIALIZATION_ID = 6;
    byte NATIVE_JAVA_SERIALIZATION_ID = 7;
    byte KRYO_SERIALIZATION_ID = 8;
    byte FST_SERIALIZATION_ID = 9;
    byte NATIVE_HESSIAN_SERIALIZATION_ID = 10;
    byte PROTOSTUFF_SERIALIZATION_ID = 12;
    byte AVRO_SERIALIZATION_ID = 11;
    byte GSON_SERIALIZATION_ID = 16;
    byte PROTOBUF_JSON_SERIALIZATION_ID = 21;
  • Status (1 byte)

    只有是一个Response时才有用,标识当前response的状态,有点像Http Code

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    byte OK = 20;
    byte CLIENT_TIMEOUT = 30;
    byte SERVER_TIMEOUT = 31;
    byte CHANNEL_INACTIVE = 35;
    byte BAD_REQUEST = 40;
    byte BAD_RESPONSE = 50;
    byte SERVICE_NOT_FOUND = 60;
    byte SERVICE_ERROR = 70;
    byte SERVER_ERROR = 80;
    byte CLIENT_ERROR = 90;
    byte SERVER_THREADPOOL_EXHAUSTED_ERROR = 100;
  • Request ID (8 bytes)

    每个Request都会分配一个ID

  • Data Length (4 bytes)

    Payload的长度

  • PayLoad (n bytes)

    RPC数据,长度由Data Length字段指明

协议解析

Dubbo里与协议解析相关的两部分是Exchanger和Transporter,Transporter以ByteBuf和Message为中心负责读写数据并序列化和反序列化,Exchanger以Request和Response为核心处理数据。

这样划分只是帮助理解,并不是必须这样。

看一下decode的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
//DubboCodec.java
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int readable = buffer.readableBytes();
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
buffer.readBytes(header);
return decode(channel, buffer, readable, header);
}

@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// check magic number.
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
int length = header.length;
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
for (int i = 1; i < header.length - 1; i++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
return super.decode(channel, buffer, readable, header);
}
//读到的数据太少,继续等待
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}

//从第12个byte开始读data length
int len = Bytes.bytes2int(header, 12);
//Payload的长度有限制
checkPayload(channel, len);

int tt = len + HEADER_LENGTH;
if (readable < tt) {
return DecodeResult.NEED_MORE_INPUT;
}

// limit input stream.
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

try {
return decodeBody(channel, is, header);
} finally {
......
}
}

protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
//proto对应一种serialization
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
// get request id.
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
//decode response.
Response res = new Response(id);
//response与request的过程相似,省略
...
...
return res;
} else {
//decode request.
//构造request,Exchanger处理之
Request req = new Request(id);
req.setVersion(Version.getProtocolVersion());
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
if ((flag & FLAG_EVENT) != 0) {
req.setEvent(true);
}
try {
Object data;
//根据proto找到Deserializer
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
} else if (req.isEvent()) {
data = decodeEventData(channel, in);
} else {
DecodeableRpcInvocation inv;
if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
inv = new DecodeableRpcInvocation(channel, req, is, proto);
//若配置了在当前I/O线程进行decode工作,则在此处进行decode
//这里完成了decode,会把inv的hasDecoded置为true,后面DecodeHandler就不再decode了
inv.decode();
} else {
inv = new DecodeableRpcInvocation(channel, req,
new UnsafeByteArrayInputStream(readMessageData(is)), proto);
//没有调用decode,通过后面的线程池异步调用
//后面的DecodeHandler就是要做decode()的
}
data = inv;
}
req.setData(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode request failed: " + t.getMessage(), t);
}
// bad request
req.setBroken(true);
req.setData(t);
}

return req;
}
}

相关源码

上一篇讲到Provider暴露服务时,要启动服务等待Consumer连接,这里就从启动服务开始看吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//HeaderExchanger
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
//记得这里有三个handler
//最内层的handler是在DubboProtocol中定义的
//HeaderExchangeHandler是Exchanger层使用的
//DecodeHandler给Transporter使用
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

//Transporters使用SPI,走到了Netty4的NettyTransporter里
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
//这里的handler是前面的DecodeHandler
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}

Dubbo里的Handler很多,这里有必要详细解释一下,否则很快就晕了。我们可以认为DecodeHandler属于Transporter层,HeaderExchangeHandler属于Exchanger层,Exchanger里面又包含了Invoker,这样就完成了Transporter与Exchanger直到内部Invoker的交互。

上面有一行关键的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
//先更新了URL里的parameter,更新后添加了一个参数threadname=DubboServerHandler-xxx:20880,这个参数会在后面线程池创建线程时使用
ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))

//看看wrap里面
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}

protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
//这里通过SPI引入了Dispatcher,默认是AllDispatcher
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}

为了提高效率,I/O线程(对于Netty,这里指的是WorkerEventLoop)解析完数据后,Dubbo会创建自己的线程池进行异步处理,Dispatcher负责将新到来的请求根据条件分发到线程池中,通过SPI机制,可以个性化Dispatcher的行为,比如Dubbo提供的ExecutionChannelHandler,可以只对RPC请求才分发异步处理,而别的心跳事件不分发,还用I/O线程处理。

目前支持的几种Dispatcher:

1
2
3
4
5
all=org.apache.dubbo.remoting.transport.dispatcher.all.AllDispatcher
direct=org.apache.dubbo.remoting.transport.dispatcher.direct.DirectDispatcher
message=org.apache.dubbo.remoting.transport.dispatcher.message.MessageOnlyDispatcher
execution=org.apache.dubbo.remoting.transport.dispatcher.execution.ExecutionDispatcher
connection=org.apache.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedDispatcher
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class AllDispatcher implements Dispatcher {

public static final String NAME = "all";

@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
//正如名字一样,这个Handler会对所有事件进行异步处理,包括connected,disconnected,channelRead等
//要记得这里的handler是DecodeHandler
return new AllChannelHandler(handler, url);
}
}

//看一下AllChannelHandler的构造
public AllChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}

//进入super
public WrappedChannelHandler(ChannelHandler handler, URL url) {
//依然提醒一下,这里的handler是DecodeHandler
this.handler = handler;
this.url = url;
//SPI创建线程池
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
componentKey = CONSUMER_SIDE;
}
//还不清楚这里为何要保存一下
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}

底层网络框架以Netty为例,处理协议的各个Handler如下,其中虚线左侧是Netty的Handler,处理codec和(de)serializaion;右侧是Dubbo自定义的Handler,用于处理RPC的请求。NettyServerHandler是Netty与Dubbo的桥梁。两个SPI分别用于选择合适的DispatcherThreadPool

接着看一下DecodeHandler的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}

//前面提到过,DecodeHandler会处理data
if (message instanceof Request) {
decode(((Request) message).getData());
}

if (message instanceof Response) {
decode(((Response) message).getResult());
}

handler.received(channel, message);
}

private void decode(Object message) {
if (message instanceof Decodeable) {
try {
((Decodeable) message).decode();
if (log.isDebugEnabled()) {
log.debug("Decode decodeable message " + message.getClass().getName());
}
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
}
} // ~ end of catch
} // ~ end of if
} // ~ end of method decode

//对于Request,message是DecodeableRpcInvocation
public void decode() throws Exception {
if (!hasDecoded && channel != null && inputStream != null) {
try {
decode(channel, inputStream);
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn("Decode rpc invocation failed: " + e.getMessage(), e);
}
//decode错误
request.setBroken(true);
//data赋值为exception
request.setData(e);
} finally {
hasDecoded = true;
}
}
}
//解析rpc方法名,方法参数等
public Object decode(Channel channel, InputStream input) throws IOException {
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);

String dubboVersion = in.readUTF();
request.setVersion(dubboVersion);
setAttachment(DUBBO_VERSION_KEY, dubboVersion);

setAttachment(PATH_KEY, in.readUTF());
setAttachment(VERSION_KEY, in.readUTF());

setMethodName(in.readUTF());
try {
Object[] args;
Class<?>[] pts;
String desc = in.readUTF();
if (desc.length() == 0) {
pts = DubboCodec.EMPTY_CLASS_ARRAY;
args = DubboCodec.EMPTY_OBJECT_ARRAY;
} else {
pts = ReflectUtils.desc2classArray(desc);
args = new Object[pts.length];
for (int i = 0; i < args.length; i++) {
try {
args[i] = in.readObject(pts[i]);
} catch (Exception e) {
if (log.isWarnEnabled()) {
log.warn("Decode argument failed: " + e.getMessage(), e);
}
}
}
}
setParameterTypes(pts);

Map<String, String> map = (Map<String, String>) in.readObject(Map.class);
if (map != null && map.size() > 0) {
Map<String, String> attachment = getAttachments();
if (attachment == null) {
attachment = new HashMap<String, String>();
}
attachment.putAll(map);
setAttachments(attachment);
}
//decode argument ,may be callback
for (int i = 0; i < args.length; i++) {
//这里没太懂
args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
}

setArguments(args);

} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read invocation data failed.", e));
} finally {
if (in instanceof Cleanable) {
((Cleanable) in).cleanup();
}
}
return this;
}

接下来Exchanger开始处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
//HeaderExchangeHandler.java
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {

if (request.isTwoWay()) {
//有返回值的RPC
handleRequest(exchangeChannel, request);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}

void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {
//前面解析协议数据可能报错了
Object data = req.getData();

String msg;
if (data == null) {
msg = null;
} else if (data instanceof Throwable) {
msg = StringUtils.toString((Throwable) data);
} else {
msg = data.toString();
}
res.setErrorMessage("Fail to decode request due to: " + msg);
//status赋值
res.setStatus(Response.BAD_REQUEST);

channel.send(res);
return;
}
// find handler by message class.
Object msg = req.getData();
try {
//调用DubboProtocol里内部类的reply
CompletionStage<Object> future = handler.reply(channel, msg);
future.whenComplete((appResult, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(appResult);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
//将response返回
channel.send(res);
} catch (RemotingException e) {
logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
} finally {
// HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}

涉及到的参数

名字 默认值 用途
payload 8M dubbo协议中,payload的最大值
decode.in.io true 是否在I/O线程中进行decode
dispather all 选择的Dispatcher策略
threadpool fixed 选择哪种线程池

参考

http://dubbo.apache.org/zh-cn/index.html

Powered by Hexo and Hexo-theme-hiker

Copyright © 2019 - 2025 Young All Rights Reserved.

访客数 : | 访问量 :