Dubbo服务发布源码分析

一、服务导出整体流程

/dev-guide/images/dubbo-export.jpg

二、ServiceConfig.export()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ServiceConfig<T> extends ServiceConfigBase<T> {
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
// 以下为关键代码

// 通过ProxyFactory扩展点,对ref生成Invoker代理对象
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));

// 对上一步生成的Invoker对象进行包装,生成DelegateProviderMetaDataInvoker
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 通过Protocol扩展点,执行导出逻辑
Exporter<?> exporter = protocol.export(wrapperInvoker);

exporters.add(exporter);
}
}

三、ProxyFactory.getInvoker()对ref进行封装

1. ProxyFactory接口

1
2
3
4
5
6
7
8
9
10
11
12
@SPI("javassist")
public interface ProxyFactory {
// PROXY_KEY为“proxy”
@Adaptive({PROXY_KEY})
<T> T getProxy(Invoker<T> invoker) throws RpcException;

@Adaptive({PROXY_KEY})
<T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;

@Adaptive({PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}

由接口定义可知,其根据URL中的PROXY_KEY属性,即“proxy”属性作为扩点点类型获取扩展点实例。且默认扩展点类型为’javassist’。

2. ProxyFactory的自适应类

通过打断点获取ProxyFactory自适应类源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package org.apache.dubbo.rpc;

import org.apache.dubbo.common.extension.ExtensionLoader;

public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory {
public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
org.apache.dubbo.rpc.ProxyFactory extension =
(org.apache.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0);
}

public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0, boolean arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
org.apache.dubbo.rpc.ProxyFactory extension =
(org.apache.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0, arg1);
}

public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1,
org.apache.dubbo.common.URL arg2) throws org.apache.dubbo.rpc.RpcException {
if (arg2 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
org.apache.dubbo.rpc.ProxyFactory extension =
(org.apache.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}

默认情况下调用链如下:

ProxyFactory$Adaptive.getInvoker() -> JavassistProxyFactory.getInvoker()

通过JavassistProxyFactory把ref封装为AbstractProxyInvoker对象。

image-20210516232619487

四、ServiceConfig对ProxyFactory结果再次封装

DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

image-20210516232958512

五、protocol.export(wrapperInvoker)

protocol是自适应Protocol类,会根据URL属性找到对应类型的扩展点执行export。

1. RegistryProtocol.export()

protocol根据URL”registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?…”找到RegistryProtocol类型扩展点。

NOTE:外面的ProtocolListenerWrapper,ProtocolFilterWrapper,QosProtocolWwrapper为包装类。

image-20210517000543761

RegistryProtocol中会复写URL为“dubbo://ip:port/接口名?…”,然后再次调用protocol自适应类找到DubboProtocol扩展点执行export。调用关系如上图所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
package org.apache.dubbo.registry.integration;

public class RegistryProtocol implements Protocol {
// 此时providerUrl格式为“dubbo://ip:port/接口名?...”
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);

return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
}

由上面源码可知,执行DubboProtocol#export前,RegistryProtocol会对invoker再次包装。

image-20210516235557344

2. DubboProtocol.export()

调用DubboProtocol#export前,需经过ProtocolListenerWrapper,ProtocolFilterWrapper,QosProtocolWwrapper包装类的处理逻辑。

1)ProtocolFilterWrapper

该包装类会为Invoker进行Filter包装,生成Filter链。

image-20210517001936404

2)DubboProtocol#export

Ø DubboProtocol#export方法会生成DubboExporter,并调用openServer()方法,然后返回;

Ø 然后会经过包装类ProtocolListenerWrapper#export再次包装为ListenerExporterWrapper。

Ø 然后会在RegistryProtocol#doLocalExport里再次包装为ExporterChangeableWrapper。

Ø 然后会在RegistryProtocol#export里再次包装为DestroyableExporter。

image-20210517003347876

六、DubboProtocol.openServer

DubboProtocol.openServer中会调用createServer,该方法会调用Exchangers.bind(url, requestHandler);

其中,requestHandler是接收请求并根据请求URL从exportMap中找到对应的Exporter,然后根据Exporter获取Invoker,进行调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Exchangers {
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}

public static Exchanger getExchanger(URL url) {
// Constants.DEFAULT_EXCHANGER值为header
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}

public static Exchanger getExchanger(String type) {
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
}

Exchangers.bind方法会从URL中获取exchange属性,默认值为header。一般会找到类型为”header”的扩展点HeaderExchanger,进行bind。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";

@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}

HeaderExchanger会对requestHander进行封装HeaderExchangeHander,DecodeHander。然后再调用Trasporters进行进一步bind。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Transporters {
public static RemotingServer bind(String url, ChannelHandler... handler) throws RemotingException {
return bind(URL.valueOf(url), handler);
}

public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}

public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
}

Trasporters会通过Transporter的自适应类从URL获取server属性,默认为netty,即NettyTransporter。其会调用NettyServer构造函数。

1
2
3
4
5
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
}

在构造函数中,会对handler进行进一步包装。

1
2
3
4
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}

最终,会生成如下结构。

image-20210525234719381

七、服务发布最终结构

image-20210525235040075

其在ZK上的注册结构内容为:

/dubbo/org.apache.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F192.168.199.172%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-api-provider%26default%3Dtrue%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D21327%26release%3D%26side%3Dprovider%26timestamp%3D1621957958046

URL Decode:

1
/dubbo/org.apache.dubbo.demo.DemoService/providers/dubbo://192.168.199.172:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=21327&release=&side=provider&timestamp=1621957958046

image-20210525235413740

# 参考

  1. Dubbo框架设计

评论