Dubbo Consumer

本文所有讨论都基于Dubbo 2.7.2

这篇文章介绍了Dubbo如何暴露服务,这篇继续介绍Consumer如何引入服务。Consumer引用服务相比Provider要复杂一下,因为还要有服务路由,负载均衡等模块。

配置

1
2
3
4
5
6
7
<dubbo:application name="dubbo-example-consumer"/>

<!-- 使用zookeeper注册中心引用服务 -->
<dubbo:registry protocol="zookeeper" address="127.0.0.1:2181" />

<!-- 生成远程服务代理,可以和本地bean一样使用demoService -->
<dubbo:reference id="demoService" interface="com.young.dubbo.api.DemoService" check="true" timeout="100000" />
1
2
3
4
5
6
7
8
//发起RPC调用
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] {"consumer.xml"});
context.start();

//这里会返回一个代理
DemoService demoService = (DemoService)context.getBean("demoService");

System.out.println(demoService.sayHello("world"));

Directory

在开始引入服务之前,有必要介绍一下Directory。我们知道Provider将提供的服务保存在注册中心上,Consumer去注册中心获取这些服务,而且不光能获取服务,更重要的是还能感知Provider暴露的服务的变化,比如Provider宕机了,或者暴露的服务参数有变化等。所以Directory就是Consumer端的发现服务和感知服务变化的模块。

除了感知服务的变化,Directory里还包含了服务路由模块,用于筛选符合条件的Invoker。

Cluster

日常使用时,Provider为了避免单点故障会部署多台,Consumer调用时需要挑一台进行调用。Cluster将多个Provider整合为一个Cluster Invoker,当Consumer调用时,由这个Cluster Invoker负责Provider的选择以及调用失败处理等工作,屏蔽了Provider的复杂情况,使Consumer的调用过程统一且简单。

在Cluster Invoker选择Provider时,Dubbo提供了几种容错方式:

  • Failover Cluster - 失败自动切换
  • Failfast Cluster - 快速失败
  • Failsafe Cluster - 失败安全
  • Failback Cluster - 失败自动恢复
  • Forking Cluster - 并行调用多个服务提供者

Cluster唯一的作用就是生成一个Cluster Invoker。

1
2
3
4
5
6
7
8
9
10
public class FailoverCluster implements Cluster {

public final static String NAME = "failover";

@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}

}

LoadBalance

负载均衡的概念很容易理解,就是让RPC请求”均匀”地分摊到不同的Provider上。比较常用的负载均衡策略Dubbo里都有提供:

  • RandomLoadBalance: 基于权重随机算法
  • LeastActiveLoadBalance: 基于最少活跃调用数算法
  • ConsistentHashLoadBalance: 基于一致性hash算法
  • RoundRobinLoadBalance: 基于加权轮询算法

引用服务

Consumer进行RPC调用的总体流程是根据<dubbo:reference id=”demoService” interface=”com.young.dubbo.api.DemoService” …./>这个配置生成一个代理,这个代理整合了前面讲到的Cluster,LoadBalance等操作,所以我们就从这个配置对应的实例ReferenceConfig开始。

开始前先引用Dubbo官网的一张图说明引用服务的具体过程:

Provider由ServiceConfig开始暴露服务,Consumer由ReferenceConfig开始引入服务

ReferenceConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//上述例子的getBean()会调用get()
public synchronized T get() {
checkAndUpdateSubConfigs();

if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
if (ref == null) {
//这里会创建代理
init();
}
//最终返回一个代理
return ref;
}

private void init() {
if (initialized) {
return;
}
checkStubAndLocal(interfaceClass);
checkMock(interfaceClass);
Map<String, String> map = new HashMap<String, String>();
//这里会根据xml中的配置以及一些默认配置,组装一个map
......

ref = createProxy(map);

String serviceKey = URL.buildKey(interfaceName, group, version);
ApplicationModel.initConsumerModel(serviceKey, buildConsumerModel(serviceKey, attributes));
initialized = true;
}

上面init里的map

继续进入createProxy()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
private T createProxy(Map<String, String> map) {
if (shouldJvmRefer(map)) {
//调用同一个jvm里的服务
URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
invoker = REF_PROTOCOL.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
urls.clear(); // reference retry init will add url to urls, lead to OOM
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
//xml里指定了服务的url配置,这种情况不再需要注册中心了
String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
url = url.setPath(interfaceName);
}
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // assemble URL from register center's configuration
// if protocols not injvm checkRegistry
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())){
checkRegistry();
//根据<dubbo:registry />加载注册中心的配置
List<URL> us = loadRegistries(false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
//还记得Provider也有这一步吗?Provider只是添加的export=dubbo://...,这里添加了refer=(key1=value1&key2=value2)
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
}

if (urls.size() == 1) {
//一个注册中心
//根据SPI机制,这里的REF_PROTOCOL核心为RegistryProtocol,之所以说核心,是因为不要忘了扩展点的自动包装
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
//多个注册中心
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// use RegistryAwareCluster only when register's CLUSTER is available
URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);
// The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
invoker = CLUSTER.join(new StaticDirectory(u, invokers));
} else { // not a registry url, must be direct invoke.
invoker = CLUSTER.join(new StaticDirectory(invokers));
}
}
}
//配置里的check参数
if (shouldCheck() && !invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
/**
* @since 2.7.0
* ServiceData Store
*/
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
metadataReportService.publishConsumer(consumerURL);
}
//创建代理,这个代理包裹了invoker
return (T) PROXY_FACTORY.getProxy(invoker);
}

RegistryProtocol

接下来由RegistryProtocol负责引用服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//url的protocol要更新为具体的注册中心的protocol,我们的例子中会更新为zookeeper
url = URLBuilder.from(url)
.setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
.removeParameter(REGISTRY_KEY)
.build();
//获取注册中心相关的实例,负责与注册中心交互
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}

// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
//这里cluster是一个自适应扩展点
return doRefer(cluster, registry, type, url);
}

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
//consumer把自己注册到注册中心。为什么要注册自己呢?监控组件要监控服务状态
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
registry.register(directory.getRegisteredConsumerUrl());
}
//创建路由
directory.buildRouterChain(subscribeUrl);
//订阅zk上若干个目录,下一篇文章会详细分析这个方法
//若是初次订阅,会把zk上相关目录的内容拽下来
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));

//返回了Cluster Invoker
//默认是FailoverCluster
//directory可以看成是zk上暴露的服务
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}

FailoverClusterInvoker

invoker返回后会创建一个DemoService的代理PROXY_FACTORY.getProxy(invoker),默认使用javassist创建代理,我们看一下创建代理的源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class JavassistProxyFactory extends AbstractProxyFactory {

@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
//这里的返回值最终赋给了ReferenceConfig的ref属性,我们的例子调用的方法实际上就是这个代理的方法
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

.......
}

public class InvokerInvocationHandler implements InvocationHandler {
private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);

//这个invoker就是Cluster Invoker
private final Invoker<?> invoker;

public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
//开始调用Cluster Invoker
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}

Dubbo默认使用这个FailoverClusterInvoker,我们就看一下它的invoker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
//AbstractClusterInvoker.java
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();

// binding attachments into invocation.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
//通过directory查找invoker list并通过router选择合适的invokers
List<Invoker<T>> invokers = list(invocation);
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
//这里再次获取一下Invoker列表,防止在重试期间Invoker列表发生变化
//注意:这不能完全避免在我们重试期间invokers发生变化,这里只是尽量去避免
copyInvokers = list(invocation);
// check again
checkInvokers(copyInvokers, invocation);
}
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + methodName
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le.getCode(), "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
}

protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

if (CollectionUtils.isEmpty(invokers)) {
return null;
}
String methodName = invocation == null ? StringUtils.EMPTY : invocation.getMethodName();

//我们知道sticky意思是让一个consumer总是调用某个固定机器上的provider提供的服务
//当这个服务不可用时,如何处理呢?
//现在的选择是:若这个stickyInvoker调用失败了,则在重试时,选择一个新的invoker进行调用并把这个新的invoker赋值给stickyInvoker。sticky的那个机器可能会变。
boolean sticky = invokers.get(0).getUrl()
.getMethodParameter(methodName, CLUSTER_STICKY_KEY, DEFAULT_CLUSTER_STICKY);

//ignore overloaded method
if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
stickyInvoker = null;
}
//ignore concurrency problem
if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
if (availablecheck && stickyInvoker.isAvailable()) {
return stickyInvoker;
}
}

Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);

if (sticky) {
//stickyInvoker变成了这个新选的invoker
stickyInvoker = invoker;
}
return invoker;
}

private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

if (CollectionUtils.isEmpty(invokers)) {
return null;
}
if (invokers.size() == 1) {
//这里没有availablecheck ?
return invokers.get(0);
}
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);

//If the `invoker` is in the `selected` or invoker is unavailable && availablecheck is true, reselect.
if ((selected != null && selected.contains(invoker))
|| (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
try {
//reselect比较简单,先选择一个不在selected中的invoker,若所有的invoker都在selected中,则从selected中使用loadbalance选择一个可用的返回
Invoker<T> rInvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
if (rInvoker != null) {
invoker = rInvoker;
} else {
//Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
//走到这说明所有的invoker都试过了且都不可用。这样还要选一个出来吗?不直接返回null ?
int index = invokers.indexOf(invoker);
try {
//Avoid collision
//这个invoker可能是不可用的
invoker = invokers.get((index + 1) % invokers.size());
} catch (Exception e) {
logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
}
}
} catch (Throwable t) {
logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
}
}
return invoker;
}

参考文献

http://dubbo.apache.org/zh-cn/index.html

Powered by Hexo and Hexo-theme-hiker

Copyright © 2019 - 2025 Young All Rights Reserved.

访客数 : | 访问量 :