Dubbo Consumer(2)

本文所有讨论都基于Dubbo 2.7.2

上一篇文章分析了Consumer引用服务的过程,其中通过Cluster,LoadBalance等最后确定了一个可以调用的Invoker,那么这个Invoker具体怎么创建的呢?这篇文章就来分析一下。

从前文知道,Directory负责从注册中心获取服务,并监控服务的变化。所以我们就从Directory里开始看起。

订阅服务

前面的文章讲到在Consumer初始化时会创建Directory,里面会调用subscribe(),这个方法就是Directory与注册中心交互的开始。

1
2
3
4
5
6
7
8
9
10
11
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
......
//这里订阅了providers目录
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));

Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
public void subscribe(URL url) {
//此时url= consumer://.....
setConsumerUrl(url);
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
//订阅
registry.subscribe(url, this);
}

//ZookeeperRegistry的父类FailbackRegistry.java
public void subscribe(URL url, NotifyListener listener) {
//此时的listener就是RegistryDirectory
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// Sending a subscription request to the server side
doSubscribe(url, listener);
} catch (Exception e) {

......

// Record a failed registration request to a failed list, retry regularly
addFailedSubscribed(url, listener);
}
}

public void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (ANY_VALUE.equals(url.getServiceInterface())) {
......
} else {
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkListener = listeners.get(listener);
}
//若path已存在,没有影响
zkClient.create(path, false);
//注册child监听器并且返回了child内容
//providers目录下的内容就是Provider暴露的服务
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
//String样式的url转换为Url
//这里还进行了一步Match操作,并不是zk上当前目录下所有的url都可以转化为Invoker,每个url还需要进行一次匹配,匹配条件就是consumer的(group, version, classifier)与provider的匹配,匹配的才能继续
//可以参考 UrlUtils.isMatch()方法
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}

//notify()走入了下面的方法里
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
......
// keep every provider's category.
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
//listener就是RegistryDirectory
listener.notify(categoryList);
// We will update our cache file after each notification.
// When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
saveProperties(url);
}
}

public synchronized void notify(List<URL> urls) {
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(url -> {
if (UrlUtils.isConfigurator(url)) {
return CONFIGURATORS_CATEGORY;
} else if (UrlUtils.isRoute(url)) {
return ROUTERS_CATEGORY;
} else if (UrlUtils.isProvider(url)) {
return PROVIDERS_CATEGORY;
}
return "";
}));

List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);

// providers
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
refreshOverrideAndInvoker(providerURLs);
}

private void refreshOverrideAndInvoker(List<URL> urls) {
overrideDirectoryUrl();
//首次调用就是把url转化为Invokers;否则,就是用url刷新Invokers
refreshInvoker(urls);
}

private void refreshInvoker(List<URL> invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null");

if (invokerUrls.size() == 1
&& invokerUrls.get(0) != null
&& EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
//不可访问
this.forbidden = true; // Forbid to access
this.invokers = Collections.emptyList();
routerChain.setInvokers(this.invokers);
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls == Collections.<URL>emptyList()) {
invokerUrls = new ArrayList<>();
}
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<>();
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.isEmpty()) {
return;
}
//这里会根据协议(此时是dubbo://...)创建具体的Invoker
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map

/**
* If the calculation is wrong, it is not processed.
*
* 1. The protocol configured by the client is inconsistent with the protocol of the server.
* eg: consumer protocol = dubbo, provider only has other protocol services(rest).
* 2. The registration center is not robust and pushes illegal specification data.
*
*/
if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
.toString()));
return;
}

List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
// pre-route and build cache, notice that route cache should build on original Invoker list.
// toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
routerChain.setInvokers(newInvokers);
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
this.urlInvokerMap = newUrlInvokerMap;

try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
//这个方法的核心如下
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {

......
for (URL providerUrl : urls) {
......
try {
boolean enabled = true;
//provider可以通过disabled=true禁用一个服务
if (url.hasParameter(DISABLED_KEY)) {
enabled = !url.getParameter(DISABLED_KEY, false);
} else {
enabled = url.getParameter(ENABLED_KEY, true);
}
if (enabled) {
//根据SPI,dubbo://... 会使用DubboProtocol
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
}
......

}

连接服务器

从zk上获取一个服务后,接下来的工作就是创建到这个服务器的连接了。

上面进入了DubboProtocol的refer()

这里面URL已经是dubbo://…这样的了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//Consumer的异步转同步是在这里做的
return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}

public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);

//create rpc invoker.
//终于看见了真正的Invoker
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);

return invoker;
}

private ExchangeClient[] getClients(URL url) {
// whether to share connection
boolean useShareConnect = false;
//这个配置若=0,即没有指定connection的数目,则使用已创建的clients,已创建的数目由shareconnections = N指定
//若!=0, 即显式指定了connection数目,则创建新的clients
int connections = url.getParameter(CONNECTIONS_KEY, 0);
List<ReferenceCountExchangeClient> shareClients = null;
// if not configured, connection is shared, otherwise, one connection for one service
if (connections == 0) {
useShareConnect = true;

/**
* The xml configuration should have a higher priority than properties.
*/
String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
shareClients = getSharedClient(url, connections);
}

ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (useShareConnect) {
clients[i] = shareClients.get(i);

} else {
clients[i] = initClient(url);
}
}

return clients;
}

//看一下单个client怎么创建的
private ExchangeClient initClient(URL url) {

//默认netty
String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));

//codec
url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
// enable heartbeat by default
url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));

// BIO is not allowed since it has severe performance issue.
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}

ExchangeClient client;
try {
// connection should be lazy
if (url.getParameter(LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);

} else {
//这里和Provider创建server的过程基本一样了。
client = Exchangers.connect(url, requestHandler);
}

} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}

return client;
}

到这里一个Consumer就启动可以调用了。Provider会创建一系列的Handler用于处理数据,这些Handler同样适用于Consumer。

关于Client

因为后面还会涉及到线程池的讲解,这里有必要专门讨论一下Client的数量。

刚开始我是从连接池的角度理解这些clients的,就像经常使用的HttpClients,其使用方法就像下面这段代码。

1
2
3
4
5
6
7
8
9
Connection connection = null;
try {
connection = ConnectionPool.get();
if(connection != null) {
.......
}
} finally {
ConnectionPool.release(connection);
}

后来发现不是这样的,Dubbo在使用client时就是轮询获取的,并没有get、release这些操作。现在明白了原因:Dubbo里真正进行I/O的是底层的网络框架,比如Netty,这里的client只是将我们的请求转发给了网络框架,自身并不负责真正的I/O,因此这些client是可以同时被多个线程使用的。 虽然这些client不直接I/O,但它们和I/O线程是一一对应的,当创建一个client时,就会创建一个底层的I/O线程,从而创建一个TCP连接。

从前面的分析知道,Consumer创建的client是以URL为粒度进行创建的。一个URL就代表了Provider暴露的一个接口服务(目前版本的Dubbo是这样的)。因此对于一个URL而言,Consumer至少会创建一个client,具体的一个URL创建几个clients则由URL里的参数connectionsshareconnections决定。

注意:2.7.5中Dubbo可以以应用为粒度暴露

我们从TCP连接的角度看,一个client实际代表了一个tcp连接,比如Provider提供了两个服务,都通过dubbo协议暴露在28808端口,一个NameService,其上配置了connections=2,另一个UserService,其配置了connections=3,那么一个Consumer里创建的clients数量就等于2 + 3 = 5。


参考文献

http://dubbo.apache.org/zh-cn/index.html

Powered by Hexo and Hexo-theme-hiker

Copyright © 2019 - 2025 Young All Rights Reserved.

访客数 : | 访问量 :