一、服务导出整体流程
二、ServiceConfig.export()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class ServiceConfig <T> extends ServiceConfigBase <T> { private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); private void doExportUrlsFor1Protocol (ProtocolConfig protocolConfig, List<URL> registryURLs) { Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker (invoker, this ); Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } }
三、ProxyFactory.getInvoker()对ref进行封装 1. ProxyFactory接口 1 2 3 4 5 6 7 8 9 10 11 12 @SPI("javassist") public interface ProxyFactory { @Adaptive({PROXY_KEY}) <T> T getProxy (Invoker<T> invoker) throws RpcException; @Adaptive({PROXY_KEY}) <T> T getProxy (Invoker<T> invoker, boolean generic) throws RpcException; @Adaptive({PROXY_KEY}) <T> Invoker<T> getInvoker (T proxy, Class<T> type, URL url) throws RpcException; }
由接口定义可知,其根据URL中的PROXY_KEY属性,即“proxy”属性作为扩点点类型获取扩展点实例。且默认扩展点类型为’javassist’。
2. ProxyFactory的自适应类 通过打断点获取ProxyFactory自适应类源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package org.apache.dubbo.rpc;import org.apache.dubbo.common.extension.ExtensionLoader;public class ProxyFactory$Adaptive implements org .apache.dubbo.rpc.ProxyFactory { public java.lang.Object getProxy (org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException { if (arg0 == null ) throw new IllegalArgumentException ("org.apache.dubbo.rpc.Invoker argument == null" ); if (arg0.getUrl() == null ) throw new IllegalArgumentException ("org.apache.dubbo.rpc.Invoker argument getUrl() == null" ); org.apache.dubbo.common.URL url = arg0.getUrl(); String extName = url.getParameter("proxy" , "javassist" ); if (extName == null ) throw new IllegalStateException ("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])" ); org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName); return extension.getProxy(arg0); } public java.lang.Object getProxy (org.apache.dubbo.rpc.Invoker arg0, boolean arg1) throws org.apache.dubbo.rpc.RpcException { if (arg0 == null ) throw new IllegalArgumentException ("org.apache.dubbo.rpc.Invoker argument == null" ); if (arg0.getUrl() == null ) throw new IllegalArgumentException ("org.apache.dubbo.rpc.Invoker argument getUrl() == null" ); org.apache.dubbo.common.URL url = arg0.getUrl(); String extName = url.getParameter("proxy" , "javassist" ); if (extName == null ) throw new IllegalStateException ("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])" ); org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName); return extension.getProxy(arg0, arg1); } public org.apache.dubbo.rpc.Invoker getInvoker (java.lang.Object arg0, java.lang.Class arg1, org.apache.dubbo.common.URL arg2) throws org.apache.dubbo.rpc.RpcException { if (arg2 == null ) throw new IllegalArgumentException ("url == null" ); org.apache.dubbo.common.URL url = arg2; String extName = url.getParameter("proxy" , "javassist" ); if (extName == null ) throw new IllegalStateException ("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])" ); org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName); return extension.getInvoker(arg0, arg1, arg2); }
默认情况下调用链如下:
ProxyFactory$Adaptive.getInvoker() -> JavassistProxyFactory.getInvoker()
通过JavassistProxyFactory把ref封装为AbstractProxyInvoker对象。
四、ServiceConfig对ProxyFactory结果再次封装 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
五、protocol.export(wrapperInvoker) protocol是自适应Protocol类,会根据URL属性找到对应类型的扩展点执行export。
1. RegistryProtocol.export() protocol根据URL”registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?…”找到RegistryProtocol类型扩展点。
NOTE:外面的ProtocolListenerWrapper,ProtocolFilterWrapper,QosProtocolWwrapper为包装类。
RegistryProtocol中会复写URL为“dubbo://ip:port/接口名?…”,然后再次调用protocol自适应类找到DubboProtocol扩展点执行export。调用关系如上图所示。
1 2 3 4 5 6 7 8 9 10 11 12 13 package org.apache.dubbo.registry.integration;public class RegistryProtocol implements Protocol { private <T> ExporterChangeableWrapper<T> doLocalExport (final Invoker<T> originInvoker, URL providerUrl) { String key = getCacheKey(originInvoker); return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> { Invoker<?> invokerDelegate = new InvokerDelegate <>(originInvoker, providerUrl); return new ExporterChangeableWrapper <>((Exporter<T>) protocol.export(invokerDelegate), originInvoker); }); } }
由上面源码可知,执行DubboProtocol#export前,RegistryProtocol会对invoker再次包装。
2. DubboProtocol.export() 调用DubboProtocol#export前,需经过ProtocolListenerWrapper,ProtocolFilterWrapper,QosProtocolWwrapper包装类的处理逻辑。
1)ProtocolFilterWrapper
该包装类会为Invoker进行Filter包装,生成Filter链。
2)DubboProtocol#export
Ø DubboProtocol#export方法会生成DubboExporter,并调用openServer()
方法,然后返回;
Ø 然后会经过包装类ProtocolListenerWrapper#export再次包装为ListenerExporterWrapper。
Ø 然后会在RegistryProtocol#doLocalExport里再次包装为ExporterChangeableWrapper。
Ø 然后会在RegistryProtocol#export里再次包装为DestroyableExporter。
六、DubboProtocol.openServer DubboProtocol.openServer中会调用createServer,该方法会调用Exchangers.bind(url, requestHandler);
其中,requestHandler是接收请求并根据请求URL从exportMap中找到对应的Exporter,然后根据Exporter获取Invoker,进行调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class Exchangers { public static ExchangeServer bind (URL url, ExchangeHandler handler) throws RemotingException { if (url == null ) { throw new IllegalArgumentException ("url == null" ); } if (handler == null ) { throw new IllegalArgumentException ("handler == null" ); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange" ); return getExchanger(url).bind(url, handler); } public static Exchanger getExchanger (URL url) { String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); return getExchanger(type); } public static Exchanger getExchanger (String type) { return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type); } }
Exchangers.bind方法会从URL中获取exchange属性,默认值为header。一般会找到类型为”header”的扩展点HeaderExchanger,进行bind。
1 2 3 4 5 6 7 8 9 10 11 12 13 public class HeaderExchanger implements Exchanger { public static final String NAME = "header" ; @Override public ExchangeClient connect (URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient (Transporters.connect(url, new DecodeHandler (new HeaderExchangeHandler (handler))), true ); } @Override public ExchangeServer bind (URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer (Transporters.bind(url, new DecodeHandler (new HeaderExchangeHandler (handler)))); } }
HeaderExchanger会对requestHander进行封装HeaderExchangeHander,DecodeHander。然后再调用Trasporters进行进一步bind。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class Transporters { public static RemotingServer bind (String url, ChannelHandler... handler) throws RemotingException { return bind(URL.valueOf(url), handler); } public static RemotingServer bind (URL url, ChannelHandler... handlers) throws RemotingException { if (url == null ) { throw new IllegalArgumentException ("url == null" ); } if (handlers == null || handlers.length == 0 ) { throw new IllegalArgumentException ("handlers == null" ); } ChannelHandler handler; if (handlers.length == 1 ) { handler = handlers[0 ]; } else { handler = new ChannelHandlerDispatcher (handlers); } return getTransporter().bind(url, handler); } public static Transporter getTransporter () { return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); } }
Trasporters会通过Transporter的自适应类从URL获取server属性,默认为netty,即NettyTransporter。其会调用NettyServer构造函数。
1 2 3 4 5 public NettyServer (URL url, ChannelHandler handler) throws RemotingException { super (ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url)); }
在构造函数中,会对handler进行进一步包装。
1 2 3 4 protected ChannelHandler wrapInternal (ChannelHandler handler, URL url) { return new MultiMessageHandler (new HeartbeatHandler (ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); }
最终,会生成如下结构。
七、服务发布最终结构
其在ZK上的注册结构内容为:
/dubbo/org.apache.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F192.168.199.172%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-api-provider%26default%3Dtrue%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D21327%26release%3D%26side%3Dprovider%26timestamp%3D1621957958046
URL Decode:
1 /dubbo/org.apache.dubbo.demo.DemoService/providers/dubbo:
# 参考
Dubbo框架设计