Dubbo学习笔记之SPI机制

Dubbo学习笔记,在 Dubbo 中,SPI 贯穿在整个 Dubbo,有必要好好了解一下

SPI机制

SPI 全称为 Service Provider Interface,是一种服务发现机制。SPI 的本质是将接口实现类的全限定名配置在文件中,并由服务加载器读取配置文件,加载实现类。这样可以在运行时,动态为接口替换实现类。正因此特性,我们可以很容易的通过 SPI 机制为我们的程序提供拓展功能。

java 自带的有SPI机制,java SPI介绍

java SPI 的缺点

  1. JDK 标准的 SPI 会一次性加载实例化扩展点的所有实现,如果你在 META-INF/services 下的文件里面加了 N 个实现类,那么 JDK 启动的时候都会一次性全部加载。那么如果有的扩展点实现初始化很耗时或者如果有些实现类并没有用到,那么会很浪费资源
  2. 如果扩展点加载失败,会导致调用方报错,而且这个错误很难定位到是这个原因

Dubbo 优化后的 SPI 机制

Dubbo 的 SPI 扩展机制,有两个规则

  1. 需要在 resource 目录下配置 META-INF/dubbo 或者 META-INF/dubbo/internal 或者 META-INF/services,并基于 SPI 接口去创建一个文件
  2. 文件名称和接口名称保持一致,文件内容和 SPI 有差异,内容是 KEY 对应 Value,key是服务名,value的实现服务的类名全路径,例如 dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol

Dubbo 针对的扩展点非常多,可以针对协议、拦截、集群、路由、负载均衡、序列化、容器 几乎里面用到的所有功能,都可以实现自己的扩展,当我们在为一个接口查找具体实现类时,可以指定扩展名来选择相应的扩展实现。例如,这里指定扩展名为 dubbo,Dubbo SPI 就知道我们要使用:org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol 这个扩展实现类,只实例化这一个扩展实现即可,无须实例化 SPI 配置文件中的其他扩展实现类。
使用 KV 格式的 SPI 配置文件的另一个好处是:让我们更容易定位到问题。假设我们使用的一个扩展实现类所在的 jar 包没有引入到项目中,那么 Dubbo SPI 在抛出异常的时候,会携带该扩展名信息,而不是简单地提示扩展实现类无法加载。这些更加准确的异常信息降低了排查问题的难度,提高了排查问题的效率。

Dubbo 扩展点简单示例

扩展协议扩展点

  1. 创建如下文件,放在classpath下的 META-INF/dubbo 文件夹中。文件名和 Dubbo 提供的协议扩展点接口保持一致
    文件内容

    1
    myProtocol=com.dxyusn.dubbo.study.protocol.MyProtocol
  2. 创建 MyProtocol 协议类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    import org.apache.dubbo.common.URL;
    import org.apache.dubbo.rpc.Exporter;
    import org.apache.dubbo.rpc.Invoker;
    import org.apache.dubbo.rpc.Protocol;
    import org.apache.dubbo.rpc.RpcException;

    public class MyProtocol implements Protocol {
    @Override
    public int getDefaultPort() {
    return 8888;
    }

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    return null;
    }

    @Override
    public <T> Invoker<T> refer(Class<T> aClass, URL url) throws RpcException {
    return null;
    }

    @Override
    public void destroy() {

    }
    }
  3. 在调用处执行如下代码

    1
    2
    3
    4
    5
    6
    7
    public class Main {

    public static void main(String[] args) {
    Protocol protocol= ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("myProtocol");
    System.out.print(protocol.getDefaultPort());
    }
    }
  4. 输出结果,可以看到运行结果,是执行的自定义的协议扩展点。
    运行输出结果如下

    1
    8888

Dubbo 的扩展点原理实现

dubbo 的扩展点,就是通过指定目录下配置一个对应接口的实现类,然后程序会进行查找和解析,找到对应的扩展点。

@SPI 注解

Dubbo 中某个接口被 @SPI注解修饰时,就表示该接口是扩展接口,org.apache.dubbo.rpc.Protocol 接口就是一个扩展接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@SPI("dubbo")
public interface Protocol {

int getDefaultPort(); // 默认端口

// 将一个Invoker发布出去,export()方法实现需要是幂等的,
// 即同一个服务暴露多次和暴露一次的效果是相同的
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

// 引用一个Invoker,refer()方法会根据参数返回一个Invoker对象,
// Consumer端可以通过这个Invoker请求到Provider端的服务
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

// 销毁export()方法以及refer()方法使用到的Invoker对象,释放
// 当前Protocol对象底层占用的资源
void destroy();

// 返回当前Protocol底层的全部ProtocolServer
default List<ProtocolServer> getServers() {
return Collections.emptyList();
}

}

@SPI 注解的 value 值指定了默认的扩展名称,例如,在通过 Dubbo SPI 加载 Protocol 接口实现时,如果没有明确指定扩展名,则默认会将 @SPI 注解的 value 值作为扩展名,即加载 dubbo 这个扩展名对应的 org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol 这个扩展实现类

dubbo是如何解析并进行存储使用扩展点的
ExtensionLoader 位于 dubbo-common 模块中的 extension 包中,功能类似于 JDK SPI 中的 java.util.ServiceLoader。Dubbo SPI 的核心逻辑几乎都封装在 ExtensionLoader 之中(其中就包括 @SPI 注解的处理逻辑),其使用方式如下所示:

1
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("dubbo");

ExtensionLoader 中三个核心的静态字段。

  • strategies(LoadingStrategy[]类型): LoadingStrategy 接口有三个实现(通过 JDK SPI 方式加载的),如下图所示,分别对应前面介绍的三个 Dubbo SPI 配置文件所在的目录,且都继承了 Prioritized 这个优先级接口,默认优先级是DubboInternalLoadingStrategy > DubboLoadingStrategy > ServicesLoadingStrateg

  • EXTENSION_LOADERS(ConcurrentMap<Class, ExtensionLoader>类型):Dubbo 中一个扩展接口对应一个 ExtensionLoader 实例,该集合缓存了全部 ExtensionLoader 实例,其中的 Key 为扩展接口,Value 为加载其扩展实现的 ExtensionLoader 实例。

  • EXTENSION_INSTANCES(ConcurrentMap<Class<?>, Object>类型):该集合缓存了扩展实现类与其实例对象的映射关系。

ExtensionLoader 的实例字段

  • type(Class<?>类型):当前 ExtensionLoader 实例负责加载扩展接口。
  • cachedDefaultName(String类型):记录了 type 这个扩展接口上 @SPI 注解的 value 值,也就是默认扩展名。
  • cachedNames(ConcurrentMap<Class<?>, String>类型):缓存了该 ExtensionLoader 加载的扩展实现类与扩展名之间的映射关系。
  • cachedClasses(Holder<Map<String, Class<?>>>类型):缓存了该 ExtensionLoader 加载的扩展名与扩展实现类之间的映射关系。cachedNames 集合的反向关系缓存。
  • cachedInstances(ConcurrentMap<String, Holder>类型):缓存了该 ExtensionLoader 加载的扩展名与扩展实现对象之间的映射关系。

    dubbo扩展点可分为三类,静态扩展点,自适应扩展点,激活扩展点

    静态扩展点

    查看 ExtensionLoader.getExtensionLoader(Protocol.class) 的源码
    ExtensionLoader.getExtensionLoader() 方法会根据扩展接口从 EXTENSION_LOADERS 缓存中查找相应的 ExtensionLoader 实例,核心实现如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
    if (type == null) {
    throw new IllegalArgumentException("Extension type == null");
    }
    if (!type.isInterface()) {
    throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
    }
    if (!withExtensionAnnotation(type)) {
    throw new IllegalArgumentException("Extension type (" + type +
    ") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");
    }

    ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    if (loader == null) {
    // 实例化 ExtensionLoader ,并放入Map中
    EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
    loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    }
    return loader;
    }

    实例化 ExtensionLoader

    1
    2
    3
    4
    private ExtensionLoader(Class<?> type) {
    this.type = type;
    objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
    }

    如果当前的 type==ExtensionFactory.class,那么 objectFactory=null, 否则会创建一个自适应扩展点给到 objectFacotry,objectFactory 在这里赋值了,并且是返回一个 AdaptiveExtension()。

    下面看一下ExtensionLoader的getExtension方法
    得到接口对应的 ExtensionLoader 对象之后会调用其 getExtension() 方法,根据传入的扩展名称从 cachedInstances 缓存中查找扩展实现的实例,最终将其实例化后返回:

    getExtension

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public T getExtension(String name) {
    if (StringUtils.isEmpty(name)) {
    throw new IllegalArgumentException("Extension name == null");
    }
    if ("true".equals(name)) {
    return getDefaultExtension();
    }
    // getOrCreateHolder()方法中封装了查找cachedInstances缓存的逻辑
    final Holder<Object> holder = getOrCreateHolder(name);
    Object instance = holder.get();
    if (instance == null) { // double-check防止并发问题
    synchronized (holder) {
    instance = holder.get();
    if (instance == null) {
    // 根据扩展名从SPI配置文件中查找对应的扩展实现类
    instance = createExtension(name);
    holder.set(instance);
    }
    }
    }
    return (T) instance;
    }

    这个方法就是根据一个名字来获得一个对应类的实例,name 就是我们传的参数 myProtocol,而返回的实现类应该就是 MyProtocol。

    下面看一下 createExtension(name)

    createExtension

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    private T createExtension(String name) {
    // 从配置文件中加载所有的拓展类,可得到“配置项名称”到“配置类”的映射关系表
    Class<?> clazz = getExtensionClasses().get(name);
    if (clazz == null) {
    throw findException(name);
    }
    try {
    T instance = (T) EXTENSION_INSTANCES.get(clazz);
    if (instance == null) {
    // 通过反射创建实例
    EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
    instance = (T) EXTENSION_INSTANCES.get(clazz);
    }
    // 向实例中注入依赖
    injectExtension(instance);
    Set<Class<?>> wrapperClasses = cachedWrapperClasses;
    if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
    // 循环创建 Wrapper 实例
    for (Class<?> wrapperClass : wrapperClasses) {
    // 将当前 instance 作为参数传给 Wrapper 的构造方法,并通过反射创建 Wrapper 实例。
    // 然后向 Wrapper 实例中注入依赖,最后将 Wrapper 实例再次赋值给 instance 变量
    instance = injectExtension(
    (T) wrapperClass.getConstructor(type).newInstance(instance));
    }
    }
    return instance;
    } catch (Throwable t) {
    throw new IllegalStateException("...");
    }
    }

    createExtension 方法的逻辑稍复杂一下,包含了如下的步骤:
    通过 getExtensionClasses 获取所有的拓展类
    通过反射创建拓展对象
    向拓展对象中注入依赖
    将拓展对象包裹在相应的 Wrapper 对象中
    以上步骤中,第一个步骤是加载拓展类的关键,第三和第四个步骤是 Dubbo IOC 与 AOP 的具体实现。

    getExtensionClasses

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    private Map<String, Class<?>> getExtensionClasses() {
    // 从缓存中获取已加载的拓展类
    Map<String, Class<?>> classes = cachedClasses.get();
    // 双重检查
    if (classes == null) {
    synchronized (cachedClasses) {
    classes = cachedClasses.get();
    if (classes == null) {
    // 加载拓展类
    classes = loadExtensionClasses();
    cachedClasses.set(classes);
    }
    }
    }
    return classes;
    }

    这个方法,会查找指定目录 /META-INF/dubbo || /META-INF/services || /META-INF/services/internal 下对应的 spi接口 的 properties 文件,然后扫描这个文件下的所有配置信息。然后保存到一个 HashMap 中(classes),key=name(对应 protocol 文件中配置的 myProtocol), value=对应配置的类的实例

    loadExtensionClasses

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
        private Map<String, Class<?>> loadExtensionClasses() {
    // 对 SPI 注解进行解析
    cacheDefaultExtensionName();

    // 加载指定文件夹下的配置文件
    Map<String, Class<?>> extensionClasses = new HashMap<>();
    loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName());
    loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
    loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName());
    loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
    loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName());
    loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
    return extensionClasses;
    }
    private void cacheDefaultExtensionName() {
    // 获取 SPI 注解,这里的 type 变量是在调用 getExtensionLoader 方法时传入的
    final SPI defaultAnnotation = type.getAnnotation(SPI.class);
    if (defaultAnnotation != null) {
    String value = defaultAnnotation.value();
    if ((value = value.trim()).length() > 0) {
    // 对 SPI 注解内容进行切分
    String[] names = NAME_SEPARATOR.split(value);
    // 检测 SPI 注解内容是否合法,不合法则抛出异常
    if (names.length > 1) {
    throw new IllegalStateException("More than 1 default extension name on extension " + type.getName()
    + ": " + Arrays.toString(names));
    }
    // 设置默认名称,参考 getDefaultExtension 方法
    if (names.length == 1) {
    cachedDefaultName = names[0];
    }
    }
    }
    }

    loadExtensionClasses 方法总共做了两件事情,一是对 SPI 注解进行解析,二是调用 loadDirectory 方法加载指定文件夹配置文件。
    extensionClasses存的就是文件中对应的配置信息
    加载的过程,在源码中找到的加载目录如下
    SERVICES_DIRECTORY = “META-INF/services/“;
    DUBBO_DIRECTORY = “META-INF/dubbo/“;
    DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + “internal/“;

    loadDirectory

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir) {
    // fileName = 文件夹路径 + type 全限定名
    String fileName = dir + type.getName();
    try {
    Enumeration<java.net.URL> urls;
    ClassLoader classLoader = findClassLoader();
    // 根据文件名加载所有的同名文件
    if (classLoader != null) {
    urls = classLoader.getResources(fileName);
    } else {
    urls = ClassLoader.getSystemResources(fileName);
    }
    if (urls != null) {
    while (urls.hasMoreElements()) {
    java.net.URL resourceURL = urls.nextElement();
    // 加载资源
    loadResource(extensionClasses, classLoader, resourceURL);
    }
    }
    } catch (Throwable t) {
    logger.error("...");
    }
    }

    loadDirectory 方法先通过 classLoader 获取所有资源链接,然后再通过 loadResource 方法加载资源。

    loadResource

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    private void loadResource(Map<String, Class<?>> extensionClasses, 
    ClassLoader classLoader, java.net.URL resourceURL) {
    try {
    BufferedReader reader = new BufferedReader(
    new InputStreamReader(resourceURL.openStream(), "utf-8"));
    try {
    String line;
    // 按行读取配置内容
    while ((line = reader.readLine()) != null) {
    // 定位 # 字符
    final int ci = line.indexOf('#');
    if (ci >= 0) {
    // 截取 # 之前的字符串,# 之后的内容为注释,需要忽略
    line = line.substring(0, ci);
    }
    line = line.trim();
    if (line.length() > 0) {
    try {
    String name = null;
    int i = line.indexOf('=');
    if (i > 0) {
    // 以等于号 = 为界,截取键与值
    name = line.substring(0, i).trim();
    line = line.substring(i + 1).trim();
    }
    if (line.length() > 0) {
    // 加载类,并通过 loadClass 方法对类进行缓存
    loadClass(extensionClasses, resourceURL,
    Class.forName(line, true, classLoader), name);
    }
    } catch (Throwable t) {
    IllegalStateException e = new IllegalStateException("Failed to load extension class...");
    }
    }
    }
    } finally {
    reader.close();
    }
    } catch (Throwable t) {
    logger.error("Exception when load extension class...");
    }
    }

    loadResource 方法用于读取和解析配置文件,并通过反射加载类,最后调用 loadClass 方法进行其他操作。loadClass 方法用于主要用于操作缓存,该方法的逻辑如下

    loadClass

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {
    if (!type.isAssignableFrom(clazz)) {
    throw new IllegalStateException("Error occurred when loading extension class (interface: " +
    type + ", class line: " + clazz.getName() + "), class "
    + clazz.getName() + " is not subtype of interface.");
    }
    // 检测目标类上是否有 Adaptive 注解
    if (clazz.isAnnotationPresent(Adaptive.class)) {
    // 设置 cachedAdaptiveClass缓存
    cacheAdaptiveClass(clazz);
    // 检测 clazz 是否是 Wrapper 类型
    } else if (isWrapperClass(clazz)) {
    // 存储 clazz 到 cachedWrapperClasses 缓存中
    cacheWrapperClass(clazz);

    // 程序进入此分支,表明 clazz 是一个普通的拓展类
    } else {
    // 检测 clazz 是否有默认的构造方法,如果没有,则抛出异常
    clazz.getConstructor();
    if (StringUtils.isEmpty(name)) {
    name = findAnnotationName(clazz);
    if (name.length() == 0) {
    throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
    }
    }
    // 切分 name
    String[] names = NAME_SEPARATOR.split(name);
    if (ArrayUtils.isNotEmpty(names)) {
    // 如果类上有 Activate 注解,则使用 names 数组的第一个元素作为键,
    // 存储 name 到 Activate 注解对象的映射关系
    cacheActivateClass(clazz, names[0]);
    for (String n : names) {
    // 存储 Class 到名称的映射关系
    cacheName(clazz, n);
    // 存储名称到 Class 的映射关系
    saveInExtensionClass(extensionClasses, clazz, n);
    }
    }
    }
    }

    loadClass 方法操作了不同的缓存,比如 cachedAdaptiveClass、cachedWrapperClasses 和 cachedNames 等等。除此之外,该方法没有其他什么逻辑了。
    到此,关于缓存类加载的过程就分析完了。

    Dubbo IOC

    Dubbo IOC 是通过 setter 方法注入依赖。Dubbo 首先会通过反射获取到实例的所有方法,然后再遍历方法列表,检测方法名是否具有 setter 方法特征。若有,则通过 ObjectFactory 获取依赖对象,最后通过反射调用 setter 方法将依赖设置到目标对象中。整个过程对应的代码如下:

    injectExtension

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    private T injectExtension(T instance) {
    try {
    if (objectFactory != null) {
    // 遍历目标类的所有方法
    for (Method method : instance.getClass().getMethods()) {
    // 检测方法是否以 set 开头,且方法仅有一个参数,且方法访问级别为 public
    if (isSetter(method)) {
    /**
    * Check {@link DisableInject} to see if we need auto injection for this property
    */
    if (method.getAnnotation(DisableInject.class) != null) {
    continue;
    }
    // 获取 setter 方法参数类型
    Class<?> pt = method.getParameterTypes()[0];
    if (ReflectUtils.isPrimitives(pt)) {
    continue;
    }
    try {
    // 获取属性名,比如 setName 方法对应属性名 name
    String property = getSetterProperty(method);
    // 从 ObjectFactory 中获取依赖对象
    Object object = objectFactory.getExtension(pt, property);
    if (object != null) {
    // 通过反射调用 setter 方法设置依赖
    method.invoke(instance, object);
    }
    } catch (Exception e) {
    logger.error("Failed to inject via method " + method.getName()
    + " of interface " + type.getName() + ": " + e.getMessage(), e);
    }
    }
    }
    }
    } catch (Exception e) {
    logger.error(e.getMessage(), e);
    }
    return instance;
    }

    这个方法是用来实现依赖注入的,如果被加载的实例中,有成员属性本身也是一个扩展点,则会通过 set 方法进行注入
    在上面代码中,objectFactory 变量的类型为 AdaptiveExtensionFactory,AdaptiveExtensionFactory 内部维护了一个 ExtensionFactory 列表,用于存储其他类型的 ExtensionFactory。Dubbo 目前提供了两种 ExtensionFactory,分别是 SpiExtensionFactory 和 SpringExtensionFactory。前者用于创建自适应的拓展,后者是用于从 Spring 的 IOC 容器中获取所需的拓展。
    Dubbo IOC 目前仅支持 setter 方式注入

    Adaptive 自适应扩展点

    什么叫自适应扩展点呢?举一个例子

    1
    2
    Compiler compiler=ExtensionLoader.getExtensionLoader(Compiler.class).getAdaptiveExtension(); 
    System.out.println(compiler.getClass());

    传入一个 Compiler 接口,它会返回一个AdaptiveCompiler,这个就叫自适应。

    Adaptive 注解

    该注解的定义如下:

    1
    2
    3
    4
    5
    6
    @Documented
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.TYPE, ElementType.METHOD})
    public @interface Adaptive {
    String[] value() default {};
    }

    Adaptive 可注解在类或方法上。当 Adaptive 注解在类上时,Dubbo 不会为该类生成代理类。注解在方法(接口方法)上时,Dubbo 则会为该方法生成代理逻辑。Adaptive 注解在类上的情况很少,在 Dubbo 中,仅有两个类被 Adaptive 注解了,分别是 AdaptiveCompiler 和 AdaptiveExtensionFactory。此种情况,表示拓展的加载逻辑由人工编码完成。更多时候,Adaptive 是注解在接口方法上的,表示拓展的加载逻辑需由框架自动生成。Adaptive 注解的地方不同,相应的处理逻辑也是不同的。


    AdaptiveExtensionFactory 不实现任何具体的功能,而是用来适配 ExtensionFactory 的 SpiExtensionFactory 和 SpringExtensionFactory 这两种实现。AdaptiveExtensionFactory 会根据运行时的一些状态来选择具体调用 ExtensionFactory 的哪个实现。

    注解在类上时,处理逻辑比较简单。注解在接口方法上时,处理逻辑较为复杂,重点分析此块逻辑。

    getAdaptiveExtension 获取自适应拓展

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public T getAdaptiveExtension() {
    // 从缓存中获取自适应拓展
    Object instance = cachedAdaptiveInstance.get();
    if (instance == null) { // 缓存未命中
    if (createAdaptiveInstanceError == null) {
    synchronized (cachedAdaptiveInstance) {
    instance = cachedAdaptiveInstance.get();
    if (instance == null) {
    try {
    // 创建自适应拓展
    instance = createAdaptiveExtension();
    // 设置自适应拓展到缓存中
    cachedAdaptiveInstance.set(instance);
    } catch (Throwable t) {
    createAdaptiveInstanceError = t;
    throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
    }
    }
    }
    } else {
    throw new IllegalStateException("Failed to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
    }
    }

    return (T) instance;
    }

    getAdaptiveExtension 方法首先会检查缓存,缓存未命中,则调用 createAdaptiveExtension 方法创建自适应拓展。

    createAdaptiveExtension

    1
    2
    3
    4
    5
    6
    7
    8
    private T createAdaptiveExtension() {  
    try {
    return injectExtension((T) getAdaptiveExtensionClass().newInstance());
    } catch (Exception e) {
    throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage
    (), e);
    }
    }

    createAdaptiveExtension 方法的代码比较少,但却包含了三个逻辑,分别如下:
    1.调用 getAdaptiveExtensionClass 方法获取自适应拓展 Class 对象
    2.通过反射进行实例化
    3.调用 injectExtension 方法向拓展实例中注入依赖

    前两个逻辑比较好理解,第三个逻辑用于向自适应拓展对象中注入依赖。
    Dubbo 中有两种类型的自适应拓展,一种是手工编码的,一种是自动生成的。手工编码的自适应拓展中可能存在着一些依赖,而自动生成的 Adaptive 拓展则不会依赖其他类。这里调用 injectExtension 方法的目的是为手工编码的自适应拓展注入依赖.
    关于 injectExtension 方法,前文已经分析过了。接下来,分析 getAdaptiveExtensionClass 方法的逻辑。

    getAdaptiveExtensionClass

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    private Class<?> getAdaptiveExtensionClass() {
    // 通过 SPI 获取所有的拓展类
    getExtensionClasses();
    // 检查缓存,若缓存不为空,则直接返回缓存
    if (cachedAdaptiveClass != null) {
    return cachedAdaptiveClass;
    }
    // 创建自适应拓展类
    return cachedAdaptiveClass = createAdaptiveExtensionClass();
    }

    getAdaptiveExtensionClass 方法同样包含了三个逻辑,如下:
    1.调用 getExtensionClasses 获取所有的拓展类
    2.检查缓存,若缓存不为空,则返回缓存
    3.若缓存为空,则调用 createAdaptiveExtensionClass 创建自适应拓展类

    getExtensionClasses 这个方法用于获取某个接口的所有实现类。比如该方法可以获取 Protocol 接口的 DubboProtocol、HttpProtocol、InjvmProtocol 等实现类。在获取实现类的过程中,如果某个实现类被 Adaptive 注解修饰了,那么该类就会被赋值给 cachedAdaptiveClass 变量。此时,上面步骤中的第二步条件成立(缓存不为空),直接返回 cachedAdaptiveClass 即可。如果所有的实现类均未被 Adaptive 注解修饰,那么执行第三步逻辑,创建自适应拓展类。

    createAdaptiveExtensionClass

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    private Class<?> createAdaptiveExtensionClass() {
    // 构建自适应拓展代码
    String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
    ClassLoader classLoader = findClassLoader();
    // 获取编译器实现类
    org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.
    common.compiler.Compiler.class).getAdaptiveExtension();
    // 编译代码,生成 Class
    return compiler.compile(code, classLoader);
    }

    createAdaptiveExtensionClass 方法用于生成自适应拓展类,该方法首先会生成自适应拓展类的源码,然后通过 Compiler 实例(Dubbo 默认使用 javassist 作为编译器)编译源码,得到代理类 Class 实例。
    这个方法中动态生成字节码,然后进行动态加载。那么这个时候返回的 class 如果加载的是 Protocol.class,应该是 Protocol$Adaptive ,这个 cachedDefaultName 实际上就是扩展点接口的@SPI 注解对应的名字,如果此时加载的是 Protocol.class,那么cachedDefaultName=dubbo

    Protocol$Adaptive

    动态生成的代理类,可以通过 debug 拿到的代理类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {  
    public void destroy() {
    throw new Unsup tion("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of inter
    face org.apache.dubbo.rpc.Protocol is not adaptive method!");
    }
    public int getDefaultPort() {
    throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.
    getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
    }
    public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws o
    rg.apache.dubbo.rpc.RpcException {
    if (arg1 == null) throw new IllegalArgumentException("url == null");
    org.apache.dubbo.common.URL url = arg1;
    String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
    if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Pro
    tocol) name from url (" + url.toString() + ") use keys([protocol])");
    org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionL
    oader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
    return extension.refer(arg0, arg1);
    }
    public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.
    RpcException {
    if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");

    if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument g
    etUrl() == null");
    org.apache.dubbo.common.URL url = arg0.getUrl();
    String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
    if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Pro
    tocol) name from url (" + url.toString() + ") use keys([protocol])");
    org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionL
    oader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
    return extension.export(arg0);
    }

    在这个动态生成的类中,它的默认实现是 dubbo
    String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() )

    关于 objectFactory

    在 injectExtension 这个方法中,发现入口出的代码首先判断了 objectFactory 这个对象是否为空。这个是在哪里初始化的呢?
    实际上在获得 ExtensionLoader 的时候,就对 objectFactory 进行了初始化。

    1
    objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());

    通过 ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension()去获得一个自适应的扩展点,进入 ExtensionFactory 这个接口中,可以看到它是一个扩展点,并且有一个自己实现的自适应扩展点 AdaptiveExtensionFactory;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    @Adaptive
    public class AdaptiveExtensionFactory implements ExtensionFactory {

    private final List<ExtensionFactory> factories;

    public AdaptiveExtensionFactory() {
    ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
    List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
    for (String name : loader.getSupportedExtensions()) {
    list.add(loader.getExtension(name));
    }
    factories = Collections.unmodifiableList(list);
    }

    @Override
    public <T> T getExtension(Class<T> type, String name) {
    for (ExtensionFactory factory : factories) {
    T extension = factory.getExtension(type, name);
    if (extension != null) {
    return extension;
    }
    }
    return null;
    }

    }

    @Adaptive 加载到类上表示这是一个自定义的适配器类,表示我们再调用 getAdaptiveExtension 方法的时候,不需要走上面这么复杂的过程。会直接加载到 AdaptiveExtensionFactory。然后在 getAdaptiveExtensionClass()方法处有判断

    除了自定义的自适应适配器类以外,还有两个实现类,一个是 SpiExtensionFactory,一个是 SpringExtensionFactory,AdaptiveExtensionFactory 轮询这 2 个,获取到一个就返回。

    Activate 自动激活扩展点

    自动激活扩展点,有点类似 springboot 用到的 conditional,根据条件进行自动激活。但是这里设计的初衷是,对于一个类会加载多个扩展点的实现,这个时候可以通过自动激活扩展点进行动态加载, 从而简化配置我们的配置工作 @Activate 提供了一些配置来允许我们配置加载条件,比如 group 过滤,比如 key 过滤。

    @Activate 注解标注在扩展实现类上,有 group、value 以及 order 三个属性。

    • group 属性:修饰的实现类是在 Provider 端被激活还是在 Consumer 端被激活。
    • value 属性:修饰的实现类只在 URL 参数中出现指定的 key 时才会被激活。
    • order 属性:用来确定扩展实现类的排序。

    举个例子,看 org.apache.dubbo.Filter 这个类,它有非常多的实现,比如说 CacheFilter,这个缓存过滤器,配置信息如下

    1
    2
    3
    4
    @Activate(group = {CONSUMER, PROVIDER}, value = CACHE_KEY)  
    public class CacheFilter implements Filter {
    ...
    }

    group 表示客户端和和服务端都会加载,value 表示 url 中有 cache_key 的时候

    源码

    我们先来看 loadClass() 方法对 @Activate 的扫描,其中会将包含 @Activate 注解的实现类缓存到 cachedActivates 这个实例字段(Map<String, Object>类型,Key为扩展名,Value为 @Activate 注解):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name,
    boolean overridden) throws NoSuchMethodException {
    if (!type.isAssignableFrom(clazz)) {
    throw new IllegalStateException("Error occurred when loading extension class (interface: " +
    type + ", class line: " + clazz.getName() + "), class "
    + clazz.getName() + " is not subtype of interface.");
    }

    if (clazz.isAnnotationPresent(Adaptive.class)) {
    // 缓存到cachedAdaptiveClass字段
    cacheAdaptiveClass(clazz, overridden);
    } else if (isWrapperClass(clazz)) { // --1
    // 1.在 isWrapperClass() 方法中,会判断该扩展实现类是否包含拷贝构造函数
    // (即构造函数只有一个参数且为扩展接口类型),如果包含,则为 Wrapper 类,这就是判断 Wrapper 类的标准。

    // 2.将 Wrapper 类记录到 cachedWrapperClasses(Set<Class<?>>类型)这个实例字段中进行缓存。
    cacheWrapperClass(clazz); // ---2
    } else {
    clazz.getConstructor(); // 扩展实现类必须有无参构造函数
    if (StringUtils.isEmpty(name)) {
    name = findAnnotationName(clazz);
    if (name.length() == 0) {
    throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
    }
    }

    String[] names = NAME_SEPARATOR.split(name);
    if (ArrayUtils.isNotEmpty(names)) {
    // 将包含@Activate注解的实现类缓存到cachedActivates集合中
    cacheActivateClass(clazz, names[0]);
    for (String n : names) {
    cacheName(clazz, n);
    saveInExtensionClass(extensionClasses, clazz, n, overridden);
    }
    }
    }
    }

    使用 cachedActivates 这个集合的地方是 getActivateExtension() 方法。首先来关注 getActivateExtension() 方法的参数:url 中包含了配置信息,values 是配置中指定的扩展名,group 为 Provider 或 Consumer。下面是 getActivateExtension() 方法的核心逻辑:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    public List<T> getActivateExtension(URL url, String[] values, String group) {
    List<T> activateExtensions = new ArrayList<>();
    // values就是扩展名
    List<String> names = values == null ? new ArrayList<>(0) : asList(values);
    if (!names.contains(REMOVE_VALUE_PREFIX + DEFAULT_KEY)) { // ---1
    getExtensionClasses(); // 触发cachedActivates等缓存字段的加载
    for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) {
    String name = entry.getKey(); // 扩展名
    Object activate = entry.getValue(); // @Activate注解

    String[] activateGroup, activateValue;

    if (activate instanceof Activate) { // @Activate注解中的配置
    activateGroup = ((Activate) activate).group();
    activateValue = ((Activate) activate).value();
    } else if (activate instanceof com.alibaba.dubbo.common.extension.Activate) {
    activateGroup = ((com.alibaba.dubbo.common.extension.Activate) activate).group();
    activateValue = ((com.alibaba.dubbo.common.extension.Activate) activate).value();
    } else {
    continue;
    }
    if (isMatchGroup(group, activateGroup) // 匹配group
    && !names.contains(name) // 匹配扩展名
    && !names.contains(REMOVE_VALUE_PREFIX + name) // 如果包含"-"表示不激活该扩展实现
    && isActive(activateValue, url)) // 检测URL中是否出现了指定的Key
    {
    activateExtensions.add(getExtension(name)); // 加载扩展实现
    }
    }
    // 排序 --- 2
    activateExtensions.sort(ActivateComparator.COMPARATOR);
    }
    List<T> loadedExtensions = new ArrayList<>();
    for (int i = 0; i < names.size(); i++) { // ---3
    String name = names.get(i);
    // 通过"-"开头的配置明确指定不激活的扩展实现,直接就忽略了
    if (!name.startsWith(REMOVE_VALUE_PREFIX)
    && !names.contains(REMOVE_VALUE_PREFIX + name)) {
    if (DEFAULT_KEY.equals(name)) {
    if (!loadedExtensions.isEmpty()) {
    // 按照顺序,将自定义的扩展添加到默认扩展集合前面
    activateExtensions.addAll(0, loadedExtensions);
    loadedExtensions.clear();
    }
    } else {
    loadedExtensions.add(getExtension(name));
    }
    }
    }
    if (!loadedExtensions.isEmpty()) {
    // 按照顺序,将自定义的扩展添加到默认扩展集合后面
    activateExtensions.addAll(loadedExtensions);
    }
    return activateExtensions;
    }

    首先,获取默认激活的扩展集合。默认激活的扩展实现类有几个条件:①在 cachedActivates 集合中存在;②@Activate 注解指定的 group 属性与当前 group 匹配;③扩展名没有出现在 values 中(即未在配置中明确指定,也未在配置中明确指定删除);④URL 中出现了 @Activate 注解中指定的 Key。

    然后,按照 @Activate 注解中的 order 属性对默认激活的扩展集合进行排序。

    最后,按序添加自定义扩展实现类的对象。

    参考

    Dubbo SPI
    Dubbo 扩展点

    打赏

    请我喝杯咖啡吧~

    支付宝
    微信