Dubbo 一次调用过程

本文所有讨论都基于Dubbo 2.7.2

上一篇文章里讲到Consumer通过DubboInvoker将客户端的请求发出去,这篇文章以Netty4为例接着分析从请求发出到收到返回值是一个什么样的过程。

先来一张RPC的数据流程图,后面根据这个图分析会更清晰一些(其中不包括Cluster、LoadBalance这些过程)

Provider和Consumer处理数据的流程差不多,所以这里只分析一下Consumer端的流程

发送请求

DubboInvoker

经过Cluster、LoadBalance等重重考验,终于来到了真正的Invoker里,Dubbo协议会进入DubboInvoker,这个类会将RpcInvocation发出去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);

ExchangeClient currentClient;
//获取一个client,一个client可以同时发送多个请求
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
if (isOneway) {
//不需要返回值的调用
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
//由于这种调用不需要返回值,因此需要确认请求是否发送出去。
//isSent决定是否检查
//是否发出去总得有个时间限制吧,但这里为啥没使用上面method的timeout呢?
//因为从send()方法可看出,若没有传timeout的话会从url上获取timeout,也就是获取了实例级别的timeout而不是方法级别的timeout,为啥?
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
//这里没有获取isSent参数,后面还是会获取的,默认false
CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
//这里相当于回调,记为A
responseFuture.whenComplete((obj, t) -> {
if (t != null) {
asyncRpcResult.completeExceptionally(t);
} else {
asyncRpcResult.complete((AppResponse) obj);
}
});
RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
//由AsyncToSyncInvoker负责同步等待
return asyncRpcResult;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}

HeaderExchangeChannel

这个类是HeaderExchangeClient中负责发送数据的。上面的两个方法send()和request()如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//HeaderExchangeClient
public void send(Object message, boolean sent) throws RemotingException {
//channel就是HeaderExchangeChannel
channel.send(message, sent);
}
public CompletableFuture<Object> request(Object request) throws RemotingException {
return channel.request(request);
}
//下面只分析request()
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
//这里会把request id与future保存起来,等reponse返回时,根据id去更新future
//这里也启动了timeout check task
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
try {
//这个channel对应了下层的网络框架。
//Netty的话,就是NettyChannel
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}

NettyChannel

这个类是Dubbo上层代码与Netty的粘合剂,里面会直接操作Netty的API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//NettyChannel里的这个属性实际就是Netty定义的Channel
private final Channel channel;

public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);

boolean success = true;
int timeout = 0;
try {
//写入Netty
ChannelFuture future = channel.writeAndFlush(message);
if (sent) {
//需要检查是否成功写入
timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
//注意:这里会阻塞
success = future.await(timeout);
}
Throwable cause = future.cause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}

if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}

从上面的代码可以看出,无论是否设置sent=true,都会检查timeout。只是当sent=true时,会阻塞当前线程;sent=false时利用一个timeout check task线程检查,不会阻塞当前线程。

通常我们会把timeout赋给socket,由操作系统判断是否timeout,如果timeout了,抛出异常,应用程序捕获。但Dubbo自己实现了这个检查。

NettyClientHandler

Netty的写入首先就会碰到NettyClientHandler,看一下这个类的write(),前面的writeAndFlush()会走这个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
//继续写
super.write(ctx, msg, promise);
final NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
final boolean isRequest = msg instanceof Request;

// We add listeners to make sure our out bound event is correct.
// If our out bound event has an error (in most cases the encoder fails),
// we need to have the request return directly instead of blocking the invoke process.
//添加了监听器
promise.addListener(future -> {
try {
if (future.isSuccess()) {
// if our future is success, mark the future to sent.
// 如上图,这个handler是MultiMessageHandler,这个sent event会从这个handler向上走
handler.sent(channel, msg);
return;
}

Throwable t = future.cause();
if (t != null && isRequest) {
//如果没有写成功,mock一个假的Response,这个Response携带错误信息
Request request = (Request) msg;
Response response = buildErrorResponse(request, t);
handler.received(channel, response);
}
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
});
}

由于这个类也处理inbound事件,我们直接看一下这部分代码

1
2
3
4
5
6
7
8
9
10
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
//只是简单地转发
//这个handler就是MultiMessageHandler,从图中可以清晰的看出来
handler.received(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}

接收请求

从Netty接收到请求后,Dubbo自定义了ChannelHandler处理这些请求,我们直接从比较重要的类开始。

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface ChannelHandler {

void connected(Channel channel) throws RemotingException;

void disconnected(Channel channel) throws RemotingException;

//这个方法在发送请求时才使用
void sent(Channel channel, Object message) throws RemotingException;

void received(Channel channel, Object message) throws RemotingException;

void caught(Channel channel, Throwable exception) throws RemotingException;
}

DispatcherHandler

这个类实际并不存在,只是表示了这里会根据配置初始化一个线程池,并把上游的请求转发到线程池里。默认是AllChannelHandler,Dubbo里现在提供了以下几种Dispatcher,它们的主要区别就是是否要把当前请求放入线程池处理。

1
2
3
4
5
all=org.apache.dubbo.remoting.transport.dispatcher.all.AllDispatcher
direct=org.apache.dubbo.remoting.transport.dispatcher.direct.DirectDispatcher
message=org.apache.dubbo.remoting.transport.dispatcher.message.MessageOnlyDispatcher
execution=org.apache.dubbo.remoting.transport.dispatcher.execution.ExecutionDispatcher
connection=org.apache.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedDispatcher
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//AllChannelHandler
//可以看出,下面的请求都提交给了线程池
@Override
public void connected(Channel channel) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
}
}

@Override
public void disconnected(Channel channel) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
}
}

DecodeHandler

这一篇文章讲过Dubbo协议的编解码。其中有一步骤是判断decode是否要在I/O线程(注:这里不一定是I/O线程,也可能是线程池的线程)里完成,如果配置为的话,当时就没有decode。那在哪里decode呢?就是在这个类里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void received(Channel channel, Object message) throws RemotingException {
//decode的步骤这里就不展开了
if (message instanceof Decodeable) {
decode(message);
}

if (message instanceof Request) {
decode(((Request) message).getData());
}

if (message instanceof Response) {
decode(((Response) message).getResult());
}

handler.received(channel, message);
}

HeaderExchangeHandler

解码后由这个类进行最后的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
handleRequest(exchangeChannel, request);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
//我们看一下这里
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}

static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}

public static void received(Channel channel, Response response, boolean timeout) {
try {
//还记得前面发送Request时,将id和future保存起来了吗?这里会使用到
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
Timeout t = future.timeoutCheckTask;
if (!timeout) {
// decrease Time
//cancel timeout task
t.cancel();
}
//完成future,前面DubboInvoker调用request()后,注册了一个回调A,这里会调用这个回调。如果上层的业务线程采用的是同步阻塞模式,此时也会返回
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}

线程池

这里需要专门梳理一下Dubbo中线程池的使用。Dubbo中线程池的创建源码如下,这段代码是在创建NettyClientHandlerNettyServerHandler时调用的

1
2
3
4
5
6
7
8
9
10
11
12
13
public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
//这里通过SPI机制创建线程池。
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
componentKey = CONSUMER_SIDE;
}
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}

总结

到这里一次RPC请求就完成了,总体过程并不复杂。这个过程中,Dubbo通过SPI机制提供了很多可以替换的模块,比如底层的Netty网络模块,中间使用到的线程池种类等。

Powered by Hexo and Hexo-theme-hiker

Copyright © 2019 - 2025 Young All Rights Reserved.

访客数 : | 访问量 :