本文所有讨论都基于Dubbo 2.7.2
上一篇 文章里讲到Consumer通过DubboInvoker
将客户端的请求发出去,这篇文章以Netty4为例接着分析从请求发出到收到返回值是一个什么样的过程。
先来一张RPC的数据流程图,后面根据这个图分析会更清晰一些(其中不包括Cluster、LoadBalance这些过程)
Provider和Consumer处理数据的流程差不多,所以这里只分析一下Consumer端的流程
发送请求 DubboInvoker 经过Cluster、LoadBalance等重重考验,终于来到了真正的Invoker里,Dubbo协议会进入DubboInvoker
,这个类会将RpcInvocation发出去。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 protected Result doInvoke (final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(PATH_KEY, getUrl().getPath()); inv.setAttachment(VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1 ) { currentClient = clients[0 ]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false ); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null ); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv); CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout); responseFuture.whenComplete((obj, t) -> { if (t != null ) { asyncRpcResult.completeExceptionally(t); } else { asyncRpcResult.complete((AppResponse) obj); } }); RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult)); return asyncRpcResult; } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }
这个类是HeaderExchangeClient
中负责发送数据的。上面的两个方法send()和request()如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public void send (Object message, boolean sent) throws RemotingException { channel.send(message, sent); } public CompletableFuture<Object> request (Object request) throws RemotingException { return channel.request(request); } public CompletableFuture<Object> request (Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this .getLocalAddress(), null , "Failed to send request " + request + ", cause: The channel " + this + " is closed!" ); } Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true ); req.setData(request); DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout); try { channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; }
NettyChannel 这个类是Dubbo上层代码与Netty的粘合剂,里面会直接操作Netty的API
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private final Channel channel;public void send (Object message, boolean sent) throws RemotingException { super .send(message, sent); boolean success = true ; int timeout = 0 ; try { ChannelFuture future = channel.writeAndFlush(message); if (sent) { timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); success = future.await(timeout); } Throwable cause = future.cause(); if (cause != null ) { throw cause; } } catch (Throwable e) { throw new RemotingException(this , "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); } if (!success) { throw new RemotingException(this , "Failed to send message " + message + " to " + getRemoteAddress() + "in timeout(" + timeout + "ms) limit" ); } }
从上面的代码可以看出,无论是否设置sent=true ,都会检查timeout。只是当sent=true时,会阻塞当前线程;sent=false时利用一个timeout check task线程检查,不会阻塞当前线程。
通常我们会把timeout赋给socket,由操作系统判断是否timeout,如果timeout了,抛出异常,应用程序捕获。但Dubbo自己实现了这个检查。
NettyClientHandler Netty的写入首先就会碰到NettyClientHandler
,看一下这个类的write(),前面的writeAndFlush()会走这个方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { super .write(ctx, msg, promise); final NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); final boolean isRequest = msg instanceof Request; promise.addListener(future -> { try { if (future.isSuccess()) { handler.sent(channel, msg); return ; } Throwable t = future.cause(); if (t != null && isRequest) { Request request = (Request) msg; Response response = buildErrorResponse(request, t); handler.received(channel, response); } } finally { NettyChannel.removeChannelIfDisconnected(ctx.channel()); } }); }
由于这个类也处理inbound事件,我们直接看一下这部分代码
1 2 3 4 5 6 7 8 9 10 public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try { handler.received(channel, msg); } finally { NettyChannel.removeChannelIfDisconnected(ctx.channel()); } }
接收请求 从Netty接收到请求后,Dubbo自定义了ChannelHandler
处理这些请求,我们直接从比较重要的类开始。
1 2 3 4 5 6 7 8 9 10 11 12 13 public interface ChannelHandler { void connected (Channel channel) throws RemotingException ; void disconnected (Channel channel) throws RemotingException ; void sent (Channel channel, Object message) throws RemotingException ; void received (Channel channel, Object message) throws RemotingException ; void caught (Channel channel, Throwable exception) throws RemotingException ; }
DispatcherHandler 这个类实际并不存在,只是表示了这里会根据配置初始化一个线程池,并把上游的请求转发到线程池里。默认是AllChannelHandler
,Dubbo里现在提供了以下几种Dispatcher,它们的主要区别就是是否要把当前请求放入线程池处理。
1 2 3 4 5 all =org.apache.dubbo.remoting.transport.dispatcher.all.AllDispatcher direct =org.apache.dubbo.remoting.transport.dispatcher.direct.DirectDispatcher message =org.apache.dubbo.remoting.transport.dispatcher.message.MessageOnlyDispatcher execution =org.apache.dubbo.remoting.transport.dispatcher.execution.ExecutionDispatcher connection =org.apache.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedDispatcher
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override public void connected (Channel channel) throws RemotingException { ExecutorService executor = getExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event" , channel, getClass() + " error when process connected event ." , t); } } @Override public void disconnected (Channel channel) throws RemotingException { ExecutorService executor = getExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } catch (Throwable t) { throw new ExecutionException("disconnect event" , channel, getClass() + " error when process disconnected event ." , t); } }
DecodeHandler 这一篇 文章讲过Dubbo协议的编解码。其中有一步骤是判断decode是否要在I/O线程(注:这里不一定是I/O线程,也可能是线程池的线程)里完成,如果配置为否 的话,当时就没有decode。那在哪里decode呢?就是在这个类里。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public void received (Channel channel, Object message) throws RemotingException { if (message instanceof Decodeable) { decode(message); } if (message instanceof Request) { decode(((Request) message).getData()); } if (message instanceof Response) { decode(((Response) message).getResult()); } handler.received(channel, message); }
HeaderExchangeHandler 解码后由这个类进行最后的处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 public void received (Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) { Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { handleRequest(exchangeChannel, request); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { handleResponse(channel, (Response) message); } else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0 ) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } static void handleResponse (Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); } } public static void received (Channel channel, Response response, boolean timeout) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null ) { Timeout t = future.timeoutCheckTask; if (!timeout) { t.cancel(); } future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS" ).format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } }
线程池 这里需要专门梳理一下Dubbo中线程池的使用。Dubbo中线程池的创建源码如下,这段代码是在创建NettyClientHandler
和NettyServerHandler
时调用的
1 2 3 4 5 6 7 8 9 10 11 12 13 public WrappedChannelHandler (ChannelHandler handler, URL url) { this .handler = handler; this .url = url; executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class ).getAdaptiveExtension ().getExecutor (url ) ; String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY; if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) { componentKey = CONSUMER_SIDE; } DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class ).getDefaultExtension () ; dataStore.put(componentKey, Integer.toString(url.getPort()), executor); }
总结 到这里一次RPC请求就完成了,总体过程并不复杂。这个过程中,Dubbo通过SPI机制提供了很多可以替换的模块,比如底层的Netty网络模块,中间使用到的线程池种类等。