Dubbo SPI

本文所有讨论都基于Dubbo 2.7.2

SPI是JDK引入的一种加载扩展点的机制,我们接触到最多的可能就是使用DriverManager加载数据库的Driver,DriverManager内部会使用ServiceLoader读取jar包下的META-INF/services/java.sql.Driver文件,在读的过程中,每一行都代表一个具体的Driver类,读到后使用反射进行实例化。

这个文件内容如下:

1
2
com.mysql.jdbc.Driver
com.mysql.fabric.jdbc.FabricMySQLDriver

Dubbo的SPI就是基于上述SPI机制扩展而来。功能更强,使用更灵活。

Dubbo的SPI文件格式如下,是一种key=value的样式。

1
2
3
4
filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=org.apache.dubbo.rpc.support.MockProtocol
dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol

例子

Dubbo中Protocol的services文件org.apache.dubbo.rpc.Protocol,这里面都是Protocol的实现类,可以通过指定的key加载对应的扩展。

1
2
3
4
5
6
7
8
9
10
11
12
#前三个Protocol有些特殊
filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
qos=org.apache.dubbo.qos.protocol.QosProtocolWrapper

mock=org.apache.dubbo.rpc.support.MockProtocol
dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
injvm=org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol
rmi=org.apache.dubbo.rpc.protocol.rmi.RmiProtocol
hessian=org.apache.dubbo.rpc.protocol.hessian.HessianProtocol
http=org.apache.dubbo.rpc.protocol.http.HttpProtocol
....

现在尝试加载里面key=dubbo对应的扩展org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol

1
2
3
4
5
//这里指定了key="dubbo"
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("dubbo");

//按道理这里应该返回DubboProtocol的实例,但实际值为org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper@ae45eb6,原因后面会分析
System.out.println(protocol);

接下来看一下getExtension()的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public T getExtension(String name) {
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException("Extension name == null");
}
if ("true".equals(name)) {
return getDefaultExtension();
}
//可以先忽略Holder的用途
Holder<Object> holder = getOrCreateHolder(name);
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
//创建extension
instance = createExtension(name);
holder.set(instance);
}
}
}
return (T) instance;
}


private T createExtension(String name) {
//getExtensionClasses()会读services文件加载class
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
//调用set方法,设置一些属性
//到这里name对应的clazz已经实例化完毕了
//这里如果直接返回,最后得到的就是我们想加载的那个实例,我们的例子里就是DubboProtocol
injectExtension(instance);

//还记得前面看到的三个特殊的Protocol吗?这里就是使用这三个特殊的Protocol对instance进行包装增强,由于cachedWrapperClasses是一个Set,所以包装类的层次与它们在services文件中出现的顺序可能不一致
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
type + ") couldn't be instantiated: " + t.getMessage(), t);
}
}

扩展点自动包装

前面的例子里,获取dubbo的扩展点,返回的是org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper,这就是dubbo引入的扩展点包装机制。

扩展点包装机制有点像AOP机制,也可以理解为一种装饰器模式,就是实际返回的扩展点是经过包装增强的。dubbo中用于包装的类一般是Wrapper结尾,当然并不是通过类名称来判断是否是一个包装类

services文件中包装类的位置不重要,首先包装类之间的顺序不重要,因为都要放入一个set中;其次,包装类和真正的扩展点的顺序也不重要,读文件时,会把所有的包装类都找到

下面是在读取services文件过程中调用的一个方法,里面有对包装类的处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {
if (!type.isAssignableFrom(clazz)) {
throw new IllegalStateException("Error occurred when loading extension class (interface: " +
type + ", class line: " + clazz.getName() + "), class "
+ clazz.getName() + " is not subtype of interface.");
}
if (clazz.isAnnotationPresent(Adaptive.class)) {
//缓存Adaptive的class
cacheAdaptiveClass(clazz);
} else if (isWrapperClass(clazz)) {
//缓存wrapper class
//这里会把clazz缓存在cachedWrapperClasses中
//可否还记得前面实例化dubbo对应的DubboProtocol后,接着使用了cachedWrapperClasses对其进行包装
cacheWrapperClass(clazz);
} else {
clazz.getConstructor();
if (StringUtils.isEmpty(name)) {
name = findAnnotationName(clazz);
if (name.length() == 0) {
throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
}
}

String[] names = NAME_SEPARATOR.split(name);
if (ArrayUtils.isNotEmpty(names)) {
cacheActivateClass(clazz, names[0]);
for (String n : names) {
cacheName(clazz, n);
saveInExtensionClass(extensionClasses, clazz, name);
}
}
}
}

private boolean isWrapperClass(Class<?> clazz) {
try {
//我们的例子中type就是Protocol.class
//一个类只要存在一个以Protocol为参数的构造方法,它就是一个包装类
clazz.getConstructor(type);
return true;
} catch (NoSuchMethodException e) {
return false;
}
}

根据上面的代码,我们自己可以实现一个Protocol扩展点的包装类,并构造一个SPI的services文件放入classpath的META-INF/dubbo下,当获取一个具体的protocol时,就会引入这个包装类。

1
2
3
4
5
6
7
public class MyWrapper implements Protocol {
private Protocol protocol

public MyWrapper(Protocol protocol) {
this.protocol = protocol;
}
}

那么此时获取到的Protocol如下,可以看到真正的DubboProtocol被包装在最内层,dubbo正是通过这样的方式对扩展点进行了包装增强。目前这种机制无法禁用,若想禁用,需要删掉services文件里所有的wrapper。

扩展点自适应

有些场景下,我们知道要获取的extension的key是什么。但也有很多情况,需要根据运行时的具体参数决定加载哪个extension,因此引入了自适应扩展点。当使用这种扩展点时,需要传递给它具体的参数,然后从参数中解析出需要的key,最后才是具体extension的加载过程。

dubbo 中所有的参数都会格式化为URL的样式,当加载自适应扩展点时,通常都会传入URL,根据URL中的某个key加载extension,具体根据哪个key则有@Adaptive指定

其中比较特殊的是,当@Adaptive的参数是”protocol”时,像这样@Adaptive("protocol"),则根据url里的protocol去加载扩展点,而不是去找key=protocol的parameter

URL的样式都很熟悉,@Adaptive会根据下面标黄的值查找扩展点。

dubbo://127.0.0.1:20880/xxxx?key1=value1&key2=value2

例子

先通过一个例子,感受一下扩展点自适应的使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
//定义了一个接口
@SPI("Default")
public interface AdaptiveProtocol {

@Adaptive("method1")
String method1(URL url);

@Adaptive("method2")
String method2(URL url);

@Adaptive
String method3(URL url);

//@Adaptive
int port();
}

//有三个实现类
public class AProtocol implements AdaptiveProtocol {

@Override
public String method1(URL url) {
return "A method1";
}

@Override
public String method2(URL url) {
return "A method2";
}

@Override
public String method3(URL url) {
return "A method3";
}

@Override
public int port() {
return 0;
}
}

public class BProtocol implements AdaptiveProtocol {

@Override
public String method1(URL url) {
return "B method1";
}

@Override
public String method2(URL url) {
return "B method2";
}

@Override
public String method3(URL url) {
return "B method3";
}

@Override
public int port() {
return 1;
}
}

public class DefaultProtocol implements AdaptiveProtocol {

@Override
public String method1(URL url) {
return "Default method1";
}

@Override
public String method2(URL url) {
return "Default method2";
}

@Override
public String method3(URL url) {
return "Default method3";
}

@Override
public int port() {
return 2;
}
}
1
2
3
4
# services文件
A=com.young.dubbo.spi.AProtocol
B=com.young.dubbo.spi.BProtocol
Default=com.young.dubbo.spi.DefaultProtocol
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args){
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("dubbo");
//打印 20880
System.out.println(protocol.getDefaultPort());


AdaptiveProtocol extension = ExtensionLoader.getExtensionLoader(AdaptiveProtocol.class)
.getAdaptiveExtension();

HashMap<String, String> map = new HashMap<>(2);
map.put("method1", "A");
map.put("method2", "B");
URL url = new URL("http", "127.0.0.1", 8080, map);
//url为http://127.0.0.1:8080?method1=A&method2=B,自适应机制会使用后面的key=value
//打印 A method1
System.out.println(extension.method1(url));

//打印 B method2
System.out.println(extension.method2(url));

//打印 Default method3
System.out.println(extension.method3(url));
//System.out.println(extension.port());
}

上面的例子中,获取的AdaptiveProtocol的extension,它的三个方法实际上调用了三个不同实现类的方法。前面说过,自适应扩展点会根据URL的参数决定具体调用过程,从上面的结果可以发现这个过程不是实例级别,而是方法级别的。换句话说,自适应扩展点并不是简单的根据配置加载某一个实现类(比如DefaultProtocol,然后调用DefaultProtocol的所有方法),而是在运行时根据URL参数来决定加载某个实现类并调用,这就导致每个方法都可能去调用不同的实现类

接下来看一下getAdaptiveExtension()的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public T getAdaptiveExtension() {
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if (createAdaptiveInstanceError == null) {
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
//创建adaptive extension
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
}
}
}
} else {
throw new IllegalStateException("Failed to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
}
}

return (T) instance;
}

//createAdaptiveExtension()里调用了getAdaptiveExtensionClass()
private Class<?> getAdaptiveExtensionClass() {
//如果是第一次调用,这里会读services文件
getExtensionClasses();
//cachedAdaptiveClass保存的是被@Adaptive修饰的类,只能有一个
//为啥只能有一个,不太懂
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}

private Class<?> createAdaptiveExtensionClass() {
//这里会通过代码,拼凑出一个类文件
String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
ClassLoader classLoader = findClassLoader();
org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}

上面的代码会给每个接口(我们的例子就是AdaptiveProtocol)生成一个代理实现类,也就是自适应扩展点,其名字以$Adaptive结尾,看了这个类的源码,应该就明白自适应扩展点的精髓了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import org.apache.dubbo.common.extension.ExtensionLoader;

public class AdaptiveProtocol$Adaptive implements com.young.dubbo.spi.AdaptiveProtocol {

public int port() {
throw new UnsupportedOperationException(
"The method public abstract int com.young.dubbo.spi.AdaptiveProtocol.port() of interface com.young.dubbo.spi.AdaptiveProtocol is not adaptive method!");
}

public java.lang.String method1(org.apache.dubbo.common.URL arg0) {
if (arg0 == null) {
throw new IllegalArgumentException("url == null");
}
org.apache.dubbo.common.URL url = arg0;
//关键点,这里会去url里查找指定key的value,这个value就是services文件里的key,它对应的实现类就是我们要找的
//url里如果没有这个参数,默认使用@SPI中指定的值
String extName = url.getParameter("method1", "Default");
if (extName == null) {
throw new IllegalStateException(
"Failed to get extension (com.young.dubbo.spi.AdaptiveProtocol) name from url (" + url.toString()
+ ") use keys([method1])");
}
//这里加载了真正的extension
com.young.dubbo.spi.AdaptiveProtocol extension = (com.young.dubbo.spi.AdaptiveProtocol) ExtensionLoader
.getExtensionLoader(com.young.dubbo.spi.AdaptiveProtocol.class).getExtension(extName);
return extension.method1(arg0);
}

public java.lang.String method2(org.apache.dubbo.common.URL arg0) {
if (arg0 == null) {
throw new IllegalArgumentException("url == null");
}
org.apache.dubbo.common.URL url = arg0;
String extName = url.getParameter("method2", "Default");
if (extName == null) {
throw new IllegalStateException(
"Failed to get extension (com.young.dubbo.spi.AdaptiveProtocol) name from url (" + url.toString()
+ ") use keys([method2])");
}
com.young.dubbo.spi.AdaptiveProtocol extension = (com.young.dubbo.spi.AdaptiveProtocol) ExtensionLoader
.getExtensionLoader(com.young.dubbo.spi.AdaptiveProtocol.class).getExtension(extName);
return extension.method2(arg0);
}

public java.lang.String method3(org.apache.dubbo.common.URL arg0) {
if (arg0 == null) {
throw new IllegalArgumentException("url == null");
}
org.apache.dubbo.common.URL url = arg0;
//这个key有点特殊,当@Adaptive没有指定任何key时,dubbo会根据接口名按照驼峰命名法将其拆开组装成一个key
String extName = url.getParameter("adaptive.protocol", "Default");
if (extName == null) {
throw new IllegalStateException(
"Failed to get extension (com.young.dubbo.spi.AdaptiveProtocol) name from url (" + url.toString()
+ ") use keys([adaptive.protocol])");
}
com.young.dubbo.spi.AdaptiveProtocol extension = (com.young.dubbo.spi.AdaptiveProtocol) ExtensionLoader
.getExtensionLoader(com.young.dubbo.spi.AdaptiveProtocol.class).getExtension(extName);
return extension.method3(arg0);
}
}

还有一点需要注意,扩展点自适应机制需要依赖于URL传递参数,并根据@Adaptive指定参数的key获取具体的扩展点,因此对于参数不是URL或是其子类的方法,比如port()不能使用@Adaptive修饰,而且不能调用(上面生成的自适应扩展点里port()直接抛异常了)。

若注释是这样:@Adaptive("protocol"),则在Xxx$Adaptive类中,不是生成url.getParameter(“aaa”,”bbb”),而是extName = getProtocol(“….”)。

由于自适应扩展点内部还是会查找真正的Extension,这个Extension依然会被包装,所以上面的method1、method2等方法的内部逻辑还是会先调用Extension的Wrapper,再调用真正的Extension方法。

总结

Dubbo的SPI机制在JDK的SPI机制基础上,引入了扩展点自动包装扩展点自适应的功能,增强了Dubbo自身的可扩展性,为使用者也提供了个性化定制的能力。在Dubbo的代码中,SPI机制贯穿始终,理解了它的SPI机制,对理解Dubbo的运行机理很有帮助。


参考文献

http://dubbo.apache.org/zh-cn/docs/dev/SPI.html

Powered by Hexo and Hexo-theme-hiker

Copyright © 2019 - 2025 Young All Rights Reserved.

访客数 : | 访问量 :