Service Publishing for Dubbo Source Analysis 3

1. Overview of service publishing

The Dubbo service export process starts when the Spring container publishes a refresh event [dubbo:service --> ServiceBean --> onApplicationEvent (ContextRefreshedEvent event)] and executes the service export logic after receiving the ContextRefreshedEvent event.The whole logic can be roughly divided into three parts:

The first part is the pre-work, which is mainly used to check parameters and assemble URL s.

The second part is to export the service, which consists of two processes: exporting the service to local (JVM), and exporting the service to remote.

The third part is to register services with the registry for service discovery, including registering with and subscribing to zk.

This article focuses on the entire publishing process, with some details omitted, such as configuration checks and URL assembly.

2. Source environment description

Based on version 2.6.4 of dubbo, using the official dubbo-demo project, the project structure diagram is as follows:

Modify the registry to zookeeper

Interface and implementation class code:

public interface DemoService {
    String sayHello(String name);
}
public class DemoServiceImpl implements DemoService {
    @Override
    public String sayHello(String name) {
        System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress());
        return "Hello " + name + ", response from provider: " + RpcContext.getContext().getLocalAddress();
    }
}

3. Source Code Analysis

The entry method for service publishing is the onApplicationEvent for ServiceBean, as follows:

Code Block ServiceBean #onApplicationEvent

  @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        // Is there a delay in exporting &&has it been exported&has it been cancelled
        if (isDelay() && !isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
            export();
        }
    }

3.1 Pre-release Work for Service Publishing

3.1.1 Overview

The front-end work mainly consists of two parts, configuration checking and URL assembly.Before exporting the service, Dubbo needs to check that the user's configuration is reasonable or supplement the default configuration for the user.Once the configuration checks are complete, you need to assemble the URLs based on these configurations.URLs play an important role in Dubbo.Dubbo uses URLs as configuration vectors, and all the extensions are to obtain the configuration through URLs.

Code Block ServiceConfig#doExport

protected synchronized void doExport() {
    if (unexported) {
        throw new IllegalStateException("Already unexported!");
    }
    if (exported) {
        return;
    }
    exported = true;
    // Detect interfaceName legality
    if (interfaceName == null || interfaceName.length() == 0) {
        throw new IllegalStateException("interface not allow null!");
    }
    // Detects if the provider is empty, creates a new one, and initializes it with a system variable
    checkDefault();

    // The following if statements detect whether core configuration class objects such as provider, application, and so on are empty.
    // If empty, try to get an instance from another configuration class object.
    if (provider != null) {
        if (application == null) {
            application = provider.getApplication();
        }
        if (module == null) {
            module = provider.getModule();
        }
        if (registries == null) {...}
        if (monitor == null) {...}
        if (protocols == null) {...}
    }
    if (module != null) {
        if (registries == null) {
            registries = module.getRegistries();
        }
        if (monitor == null) {...}
    }
    if (application != null) {
        if (registries == null) {
            registries = application.getRegistries();
        }
        if (monitor == null) {...}
    }

    // Detect if ref is a generalized service type
    if (ref instanceof GenericService) {
        // Set interfaceClass to GenericService.class
        interfaceClass = GenericService.class;
        if (StringUtils.isEmpty(generic)) {
            // Set generic = "true"
            generic = Boolean.TRUE.toString();
        }
        
    // ref Non-GenericService Type
    } else {
        try {
            interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                    .getContextClassLoader());
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        // Check the necessary fields in the interfaceClass and <dubbo:method>tags
        checkInterfaceAndMethods(interfaceClass, methods);
        // Testing ref legality
        checkRef();
        // Set generic = "false"
        generic = Boolean.FALSE.toString();
    }

    // Local and stub should be functionally consistent to configure local stubs
    if (local != null) {
        if ("true".equals(local)) {
            local = interfaceName + "Local";
        }
        Class<?> localClass;
        try {
            // Get local stub class
            localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        // Detects whether a local stub class can be assigned to an interface class, or throws an exception if it cannot, alerting the user that the local stub class type is illegal
        if (!interfaceClass.isAssignableFrom(localClass)) {
            throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
        }
    }

    if (stub != null) {
        // The code here is basically the same as the code from the last if branch, omitted here
    }

    // Detects whether various objects are empty, creates new ones if they are empty, or throws an exception
    checkApplication();
    checkRegistry();
    checkProtocol();
    appendProperties(this);
    checkStubAndMock(interfaceClass);
    if (path == null || path.length() == 0) {
        path = interfaceName;
    }

    // Export Service
    doExportUrls();

    // ProviderModel represents a service provider model in which information about the service provider is stored.
    // Examples include service configuration information, service instances, and so on.Each exported service corresponds to a ProviderModel.
    // ApplicationModel holds all ProviderModel s.
    ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
    ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}

3.1.2 A brief summary of the logic of configuration checking:

  1. Testing dubbo:service Label's interface property is valid, unlawful throws an exception

  2. Detects if core configuration class objects such as ProviderConfig, ApplicationConfig are empty, and if they are empty, attempts to get corresponding instances from other configuration class objects.

  3. Detect and process generalized services and generic service classes

  4. Detect local stub configuration and process accordingly

  5. Detects configuration classes such as ApplicationConfig, RegistryConfig, etc., attempts to create if empty, and throws an exception if it cannot be created

3.2 Service Exposure

Enter doExportUrls(); method:

    private void doExportUrls() {
         // Load Registry Link
        List<URL> registryURLs = loadRegistries(true);
        // Traverse protocols and export services under each protocol
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

Code block: ServiceConfig#doExportUrlsFor1Protocol

    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        /***
        The code is a bit long, omit the code that assembles the url part
        Once the configuration check is complete, the next thing to do is assemble the URL based on the configuration, along with some other information.
        URL It is the carrier of Dubbo configuration, through which various configurations of Dubbo can be transferred between modules.
        ***/
        //...
        String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
        Integer port = this.findConfigedPorts(protocolConfig, name, map);
        URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
        /***An example url assembled here:
        dubbo://192.168.43.174:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.43.174&bind.port=20880&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=8564&qos.port=22222&side=provider&timestamp=1578456375449
        ***/

        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }

        //The next step is to get into the code for exposing the service
        String scope = url.getParameter(Constants.SCOPE_KEY);
        // don't export when none is configured
        if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

            // export to local if the config is not remote (export to remote only when config is remote)
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                //Exposing services locally
                exportLocal(url);
            }
            // export to remote if the config is not local (export to local only when config is local)
            if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (registryURLs != null && !registryURLs.isEmpty()) {
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }

                        // For providers, this is used to enable custom proxy to generate invoker
                        String proxy = url.getParameter(Constants.PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
                        }

                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
					//Exposing services to remote
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            }
        }
        this.urls.add(url);
    }

The code above determines how the service is exported based on the scope parameter in the url, as follows:

  • scope = none, do not export the service, note that this is the none string
  • Scope!= remote, export to local
  • Scope!= local, export to remote

In our example, socpe=null is here, so the service is exposed both locally and remotely

3.2.1 Exposing services locally

Next, enter the ServiceConfig#exportLocal(URL url) method

private void exportLocal(URL url) {
    // If the protocol header of the URL equals injvm, it means that the URL has been exported locally and does not need to be exported again
    if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
        URL local = URL.valueOf(url.toFullString())
            .setProtocol(Constants.LOCAL_PROTOCOL)    // Set protocol header to injvm
            .setHost(LOCALHOST)
            .setPort(0);
        ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
        // Create an Invoker and export the service, where the protocol calls the InjvmProtocol's export method at runtime
        Exporter<?> exporter = protocol.export(
            proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
        exporters.add(exporter);
    }
}

Here you can see that the result of service exposure is that an Exporter object is generated, stored, and associated with an Invoker object. What are these two?

Introducing Invoker and Exporter

Invoker is an entity domain. It is the core model of Dubbo. Other models depend on it or convert it to it. It represents an executable and invoke s it. It may be a local implementation, a remote implementation, or a cluster implementation.

It's an official description, but it doesn't seem clear what exactly Invoker does, what's the use?

So, for example, with DemoService at the beginning, there is a sayHello(String s) method that is used elsewhere, possibly locally or remotely, and can be invoked through the corresponding Invoker.invoke() method.The result of calling invoker is the final call to DemoService.sayHello().

public interface Exporter<T> {
    Invoker<T> getInvoker();
    void unexport();
}

Invoker can be retrieved from Exporter, cached, and then the corresponding local or remote method can be retrieved when the inoker needs to be called later.That's all. Invoker doesn't analyze how it came from

Let's continue with this code:

Exporter<?> exporter = protocol.export(
            proxyFactory.getInvoker(ref, (Class) interfaceClass, local));

Here protocol is the dynamic proxy class Protocol$Adaptive for production as follows:

package com.alibaba.dubbo.rpc;

import com.alibaba.dubbo.common.extension.ExtensionLoader;

public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
    public void destroy() {
        throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public int getDefaultPort() {
        throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        //extName=Injvm when executing here
        
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }

    public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }
}

The protocol.export executes by taking an extended instance of the Protocol, in this case InjvmProtocol, and then calling the InjvmProtocol#export method (below) returns an InjvmExporter.

InjvmProtocol#export

  @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
    }

The local exposure of the service is analyzed here.

3.2.2 Exposing Services to Remote

Then go back to this line of code in ServiceConfig#doExportUrlsFor1Protocol Exporter<?> exporter = protocol.export (wrapperInvoker);

Here the wrapperInvoker information is as follows:

interface com.alibaba.dubbo.demo.DemoService -> registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.43.174%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bind.ip%3D192.168.43.174%26bind.port%3D20880%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D16888%26qos.port%3D22222%26side%3Dprovider%26timestamp%3D1578470846696&pid=16888&qos.port=22222&registry=zookeeper&timestamp=1578470846603

When protocol.export executes, it gets a specific implementation based on the protocol extension:

Part of the code in the Protocol$Adaptive#export method (this class is above and pasted)

 com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension("registry");
//Here the extension is RegistryProtocol
extension.refer(arg0, arg1);

RegistryProtocol #export

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    // Export Service
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

    // Get the registry URL, and take the zookeeper registry as an example, get the following example URLs:
    // zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider
    URL registryUrl = getRegistryUrl(originInvoker);

    // Load Registry implementation classes based on URL s, such as ZookeeperRegistry
    final Registry registry = getRegistry(originInvoker);
    
    // Get the registered service provider URL, for example:
    // dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
    final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);

    // Get register parameter
    boolean register = registeredProviderUrl.getParameter("register", true);

    // Register service providers with service providers and consumers registry
    ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);

    // Determine whether to register a service based on the value of the register
    if (register) {
        // Register services with the registry
        register(registryUrl, registeredProviderUrl);
        ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
    }

    // Get the subscription URL, for example:
    // provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?category=configurators&check=false&anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
    // Create listeners
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    // Subscribe to the registry for override data
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    // Create and return DestroyableExporter
    return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}

The code above looks complicated and does the following:

  1. Call doLocalExport export service
  2. Register services with the registry
  3. Subscribe to the registry for override data
  4. Create and return DestroyableExporter

Let's start by analyzing the logic of the doLocalExport method, as follows:

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
    String key = getCacheKey(originInvoker);
    // Access Cache
    ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
    if (exporter == null) {
        synchronized (bounds) {
            exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null) {
                // Create Invoker as a delegate class object
                final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                // Call the export method of protocol to export the service
                exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);              
                // Write Cache
                bounds.put(key, exporter);
            }
        }
    }
    return exporter;
}

Next, we'll focus on the export method of Protocol.Assuming the runtime protocol is dubbo, the protocol variable here loads DubboProtocol at runtime and calls the export method of DubboProtocol.So next we look at DubboProtocol's export method with the following analysis:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();

    // Obtain the service identity and understand as service coordinates.It consists of service group name, service name, service version number and port.For example:
    // demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
    String key = serviceKey(url);
    // Create DubboExporter
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    // Put <key, exporter>key-value pairs in the cache
    exporterMap.put(key, exporter);

    // Local stub-related code
    //A local stub is a proxy object that is typically used to do some parameters such as check before a service is actually invoked
    Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice) {
        String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            // Omit log print code
        } else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }

    // Start Server
    openServer(url);
    // Optimize serialization
    optimizeSerialization(url);
    return exporter;
}

Focus on the creation of DubboExporter and the openServer method. The openServer method is analyzed below.

private void openServer(URL url) {
    // Get host:port and use it as the key of the server instance to identify the current server instance
    String key = url.getAddress();
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
    if (isServer) {
        // Access Cache
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
            // Create a server instance
            serverMap.put(key, createServer(url));
        } else {
            // Server created, reset server according to configuration in url
            //On the same machine (single network card), only one server instance is allowed to start on the same port.If a server instance already exists on a port, the reset method is called to reset some of the server's configuration.
            server.reset(url);
        }
    }
}

Next, the process of creating a server instance is analyzed as follows:

private ExchangeServer createServer(URL url) {
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY,
    // Add Heart Rate Detection Configuration to url
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
	// Get the server parameter, netty by default
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

	// SPI checks to see if there is a Transporter extension represented by the server parameter and throws an exception if none exists
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);

    // Add Codec Parameters
    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    ExchangeServer server;
    try {
        // Create ExchangeServer
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server...");
    }
                                   
	// Get the client parameter, specify netty, mina
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0) {
        // Get a collection of all Transporter implementation class names, such as supportedTypes = [netty, mina]
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        // Detect the list of Transporter implementation class names currently supported by Dubbo,
        // Whether to include the Transporter represented by the client, or throw an exception if not
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type...");
        }
    }
    return server;
}

Continue with the section on creating the server:

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
        throw new IllegalArgumentException("handler == null");
    }
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    // Gets Exchanger, which defaults to HeaderExchanger.
    // Next, call HeaderExchanger's bind method to create an ExchangeServer instance
    return getExchanger(url).bind(url, handler);
}

Let's look at the HeaderExchanger's bind method.

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
	// Create a HeaderExchangeServer instance, which contains multiple logic, as follows:
	//   1. new HeaderExchangeHandler(handler)
	//	 2. new DecodeHandler(new HeaderExchangeHandler(handler))
	//   3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

HeaderExchanger's bind method contains a lot of logic, but at present we only need to care about Transporters'bind method logic.The code for this method is as follows:

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handlers == null || handlers.length == 0) {
        throw new IllegalArgumentException("handlers == null");
    }
    ChannelHandler handler;
    if (handlers.length == 1) {
        handler = handlers[0];
    } else {
    	// Create ChannelHandler Distributor if the number of handlers elements is greater than 1
        handler = new ChannelHandlerDispatcher(handlers);
    }
    // Get an adaptive Transporter instance and call the instance method
    return getTransporter().bind(url, handler);
}

As mentioned above, the Transporter obtained by the getTransporter() method is dynamically created at runtime with the class name TransporterAdaptive, or Adaptive Extension Class.TransporterAdaptive determines what type of Transporter to load at runtime based on the URL parameters passed in, defaulting to NettyTransporter.Let's continue, this time analyzing NettyTransporter's bind method.

public Server bind(URL url, ChannelHandler listener) throws RemotingException {
	// Create NettyServer
	return new NettyServer(url, listener);
}
public class NettyServer extends AbstractServer implements Server {
    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        // Call parent class construction method
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }
    //doOpen()..
    //doClose()..
    //...
}

public abstract class AbstractServer extends AbstractEndpoint implements Server {
    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        // Call the parent constructor, there's no need to follow up, there's no complicated logic
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();

        // Get ip and port
        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
        if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
            // Set ip to 0.0.0.0
            bindIp = NetUtils.ANYHOST;
        }
        bindAddress = new InetSocketAddress(bindIp, bindPort);
        // Get the maximum number of acceptable connections
        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
        try {
            // Invoke the template method doOpen to start the server
            doOpen();
        } catch (Throwable t) {
            throw new RemotingException("Failed to bind ");
        }

        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
    }
    
    protected abstract void doOpen() throws Throwable;

    protected abstract void doClose() throws Throwable;
}

We focus on the doOpen Abstract method, which requires subclass implementation

NettyServer#doOpen

protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    // Create boss and worker thread pools
    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    
    // Create ServerBootstrap
    bootstrap = new ServerBootstrap(channelFactory);

    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    channels = nettyHandler.getChannels();
    bootstrap.setOption("child.tcpNoDelay", true);
    // Set PipelineFactory
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        @Override
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
            ChannelPipeline pipeline = Channels.pipeline();
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
    // Bind to specified ip and port
    channel = bootstrap.bind(getBindAddress());
}

Those who have seen this code using netty should be familiar with it and start the netty server.Exposure to remote services is done here.

The concepts protocol, exchange, transport are mentioned above. Recall that:

  • protocol Remote Call Layer: Encapsulates RPC calls, centers on Invocation, Result, and extends interfaces to Protocol, Invoker, Exporter
  • exchange Information exchange Layer: Encapsulate request response mode, synchronize to asynchronous, Request, Response-centric, Extend interface to Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
  • Transport network transport layer: Abstract mina and netty are unified interfaces, Message-centric, extended interfaces are Channel, Transporter, Client, Server, Codec

3.3 Service Registration

Back to the RegistryProtocol#export method

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    // ${Export Service}
    // Omit other code
    boolean register = registeredProviderUrl.getParameter("register", true);
    if (register) {
        // Registration Services
        register(registryUrl, registeredProviderUrl);
        ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
    }
    
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    // Subscribe to override data
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

    // Omit some code
}

RegistryProtocol's export method contains logic for service export, registration, and data subscription.The service export logic has been analyzed in the previous section. This section will analyze the service registration logic with the following code:

public void register(URL registryUrl, URL registedProviderUrl) {
    // Get Registry
    Registry registry = registryFactory.getRegistry(registryUrl);
    // Registration Services
    registry.register(registedProviderUrl);
}

The register method consists of two steps, the first is to get a registry instance, and the second is to register a service with the registry.

3.3.1 Create a Registry

As mentioned at the beginning of the article, the registry used in this article is Zookeeper

AbstractRegistryFactory #getRegistry

public Registry getRegistry(URL url) {
    url = url.setPath(RegistryService.class.getName())
            .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
            .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
    String key = url.toServiceString();
    LOCK.lock();
    try {
    	// Access Cache
        Registry registry = REGISTRIES.get(key);
        if (registry != null) {
            return registry;
        }  
        // Cache missed, create Registry instance
        registry = createRegistry(url);
        if (registry == null) {
            throw new IllegalStateException("Can not create registry...");
        }
        // Write Cache
        REGISTRIES.put(key, registry);
        return registry;
    } finally {
        LOCK.unlock();
    }
}

protected abstract Registry createRegistry(URL url);

As mentioned above, the getRegistry method accesses the cache first, and if the cache misses, it calls createRegistry to create the Registry and then writes to the cache.Here, createRegistry is a template method implemented by specific subclasses.

ZookeeperRegistryFactory #AbstractRegistryFactory

public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

    // zookeeperTransporter is injected by SPI at runtime and is of type ZookeeperTransporter$Adaptive
    private ZookeeperTransporter zookeeperTransporter;

    public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
        this.zookeeperTransporter = zookeeperTransporter;
    }

    @Override
    public Registry createRegistry(URL url) {
        // Create ZookeeperRegistry
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }
}
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    
    // Get the group name, dubbo by default
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(Constants.PATH_SEPARATOR)) {
        // group = "/" + group
        group = Constants.PATH_SEPARATOR + group;
    }
    this.root = group;
    // Create Zookeeper client, default is Curator ZookeeperTransporter
    //At version 2.5.x, the default is ZkclientZookeeperClient.
    //The default Carator ZookeeperClient at 2.6.4
    //Zkclient has been removed at version 2.7.x and needs to be extended to use
    zkClient = zookeeperTransporter.connect(url);
    // Add Status Listener
    zkClient.addStateListener(new StateListener() {
        @Override
        public void stateChanged(int state) {
            if (state == RECONNECTED) {
                try {
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    });
}

In the code above, we focused on the connect ion method call of ZookeeperTransporter, which is used to create Zookeeper clients.Creating a Zookeeper client means that the registry creation process is complete.Next, let's take a look at the Zookeeper client creation process.

As mentioned earlier, the zookeeperTransporter type here is an adaptive extension class, so the connect method decides what type of ZookeeperTransporter extension to load when called, defaulting to Curator ZookeeperTransporter.Let's take a look at Curator ZookeeperTransporter.

public ZookeeperClient connect(URL url) {
    // Create CuratorZookeeperClient
    return new CuratorZookeeperClient(url);
}
public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> {

    private final CuratorFramework client;
    
    public CuratorZookeeperClient(URL url) {
        super(url);
        try {
            // Create CuratorFramework Constructor
            CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                    .connectString(url.getBackupAddress())
                    .retryPolicy(new RetryNTimes(1, 1000))
                    .connectionTimeoutMs(5000);
            String authority = url.getAuthority();
            if (authority != null && authority.length() > 0) {
                builder = builder.authorization("digest", authority.getBytes());
            }
            // Building a CuratorFramework instance
            client = builder.build();
            // Add listener
            client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
                @Override
                public void stateChanged(CuratorFramework client, ConnectionState state) {
                    if (state == ConnectionState.LOST) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
                    } else if (state == ConnectionState.CONNECTED) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
                    } else if (state == ConnectionState.RECONNECTED) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
                    }
                }
            });
            
            // Start Client
            client.start();
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }
}

By the way, take a look at the ZkclientZookeeperClient

public class ZkclientZookeeperClient extends AbstractZookeeperClient<IZkChildListener> {

    private final ZkClientWrapper client;

    private volatile KeeperState state = KeeperState.SyncConnected;

    public ZkclientZookeeperClient(URL url) {
        super(url);
        client = new ZkClientWrapper(url.getBackupAddress(), 30000);
        client.addListener(new IZkStateListener() {
            @Override
            public void handleStateChanged(KeeperState state) throws Exception {
                ZkclientZookeeperClient.this.state = state;
                if (state == KeeperState.Disconnected) {
                    stateChanged(StateListener.DISCONNECTED);
                } else if (state == KeeperState.SyncConnected) {
                    stateChanged(StateListener.CONNECTED);
                }
            }

            @Override
            public void handleNewSession() throws Exception {
                stateChanged(StateListener.RECONNECTED);
            }
        });
        client.start();
    }

}

The process is similar, creating a client and then adding a listener.

Now that the registry instance is created here, the next thing to do is to register the service with the registry.

3.3.2 Service Registration

Take Zookeeper for example, so-called service registration essentially writes service configuration data to a node of a path in Zookeeper.

Zookeeper Visualization Client ZooInspector View the node data as follows:

In the diagram, you can see that the configuration information for the service com.alibaba.dubbo.demo.DemoService (stored in the URL) is ultimately registered under the/dubbo/com.alibaba.dubbo.demo.DemoService/providers/node.

Attached is a node hierarchy illustration of dubbo registered with zookeper:

Code registered like the registry is in RegistryProtocol#register(registryUrl, registeredProviderUrl)

public void register(URL registryUrl, URL registedProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registedProviderUrl);
    }

FailbackRegistry#register(URL url)

public void register(URL url) {
    //URL requiring registration: dubbo://192.168.43.174:20880/com.alibaba.dubbo.demo.DemoService? Anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=16024&side=provider&timestamp=1578478503772
    super.register(url);
    failedRegistered.remove(url);
    failedUnregistered.remove(url);
    try {
        // Template method, implemented by subclasses
        doRegister(url);
    } catch (Exception e) {
        Throwable t = e;

        // Gets the check parameter and throws an exception directly if check = true
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register");
        } else {
            logger.error("Failed to register");
        }

        // Log failed links
        failedRegistered.add(url);
    }
}

protected abstract void doRegister(URL url);

The doRegister method is a template method, so we analyze it in the FailbackRegistry subclass ZookeeperRegistry.The following:

protected void doRegister(URL url) {
    try {
        // Nodes are created through the Zookeeper client, and the node path is generated by the toUrlPath method in the following format:
        //   /${group}/${serviceInterface}/providers/${url}
        // such as
        //   /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register...");
    }
}

As mentioned above, ZookeeperRegistry calls the Zookeeper client in the doRegister to create a service node.Node paths are generated by the toUrlPath method, which is easy to understand and not analyzed.Next, the Create method is analyzed as follows:

public void create(String path, boolean ephemeral) {
    //path:
  ///dubbo/com.alibaba.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F192.168.43.174%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D16024%26side%3Dprovider%26timestamp%3D1578478503772
    if (!ephemeral) {
        // If the type of node you want to create is not a temporary node, check to see if it exists here
        if (checkExists(path)) {
            return;
        }
    }
    int i = path.lastIndexOf('/');
    if (i > 0) {
        // Recursively create the parent path
        create(path.substring(0, i), false);
    }
    
    // Create temporary or persistent nodes based on ephemeral values
    if (ephemeral) {
        createEphemeral(path);
    } else {
        createPersistent(path);
    }
}

This code creates these nodes:

Persistent Node/dubbo

Persistent Node/com.alibaba.dubbo.demo.DemoService

Persistent Node/providers

Temporary Node

/dubbo%3A%2F%2F192.168.43.174%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D16024%26side%3Dprovider%26timestamp%3D1578478503772

For the tree structure data above

3.3.2 Subscribe to override data

You have to go back to the RegistryProtocol#export method and paste it again

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    // Export Service
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

    // Get the registry URL, and take the zookeeper registry as an example, get the following example URLs:
    // zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider
    URL registryUrl = getRegistryUrl(originInvoker);

    // Load Registry implementation classes based on URL s, such as ZookeeperRegistry
    final Registry registry = getRegistry(originInvoker);
    
    // Get the registered service provider URL, for example:
    // dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
    final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);

    // Get register parameter
    boolean register = registeredProviderUrl.getParameter("register", true);

    // Register service providers with service providers and consumers registry
    ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);

    // Determine whether to register a service based on the value of the register
    if (register) {
        // Register services with the registry
        register(registryUrl, registeredProviderUrl);
        ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
    }

    // Get the subscription URL, for example:
    // provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?category=configurators&check=false&anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
    //Indicates a subscription to information about the configurators node of the service provider provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
    // Create listeners
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    // Subscribe to the registry for override data
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    // Create and return DestroyableExporter
    return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}

Attention: registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

FailbackRegistry#subscribe

public void subscribe(URL url, NotifyListener listener) {
    //url example
    //provider://192.168.43.174:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=17976&side=provider&timestamp=1578479464018
        super.subscribe(url, listener);
        removeFailedSubscribed(url, listener);
        try {
            // Sending a subscription request to the server side
            doSubscribe(url, listener);
        } catch (Exception e) {
            Throwable t = e;

            List<URL> urls = getCacheUrls(url);
            if (urls != null && !urls.isEmpty()) {
                notify(url, listener, urls);
                logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
            } else {
                // If the startup detection is opened, the Exception is thrown directly.
                boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                        && url.getParameter(Constants.CHECK_KEY, true);
                boolean skipFailback = t instanceof SkipFailbackWrapperException;
                if (check || skipFailback) {
                    if (skipFailback) {
                        t = t.getCause();
                    }
                    throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
                } else {
                    logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                }
            }

            // Record a failed registration request to a failed list, retry regularly
            addFailedSubscribed(url, listener);
        }
    }

Focus on doSubscribe(url, listener); method

ZookeeperRegistry#doSubscribe(url, listener)

protected void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
               		//Omit...
            } else {
                List<URL> urls = new ArrayList<URL>();
                for (String path : toCategoriesPath(url)) {
                    //toCategoriesPath(url) resolves the node path to subscribe to
                    //path:/dubbo/com.alibaba.dubbo.demo.DemoService/configurators
                    
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        //Add a listener and call notify(url, listener, urls) if there is a change
                        listeners.putIfAbsent(listener, new ChildListener() {
                            @Override
                            public void childChanged(String parentPath, List<String> currentChilds) {
                                ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    //Create a persistent node/dubbo/com.alibaba.dubbo.demo.DemoService/configurators
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                       //toUrlsWithEmpty(url, path, children) The protocol header of the method URL is replaced by empty by provider       
                       //Get an array of URLs in provider that match consumer
                        //Create a url return for empty://if it does not exist to handle situations like service provider being empty
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                         //At this time, the url is: empty://192.168.43.174:20880/com.alibaba.dubbo.demo.DemoService?Anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1384&side=provider&timestamp=1578533
                    }
                }
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

The notify(url, listener, urls) method calls the AbstractRegistry# notify(URL url, NotifyListener listener, List urls) method as follows:

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        if (url == null) {
            throw new IllegalArgumentException("notify url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("notify listener == null");
        }
        if ((urls == null || urls.isEmpty())
                && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            logger.warn("Ignore empty notify urls for subscribe url " + url);
            return;
        }
        if (logger.isInfoEnabled()) {
            logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
        }
        Map<String, List<URL>> result = new HashMap<String, List<URL>>();
        for (URL u : urls) {
            if (UrlUtils.isMatch(url, u)) {
                String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                List<URL> categoryList = result.get(category);
                if (categoryList == null) {
                    categoryList = new ArrayList<URL>();
                    result.put(category, categoryList);
                }
                categoryList.add(u);
            }
        }
        if (result.size() == 0) {
            return;
        }
        Map<String, List<URL>> categoryNotified = notified.get(url);
        if (categoryNotified == null) {
            notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
            categoryNotified = notified.get(url);
        }
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            categoryNotified.put(category, categoryList);
            /***
            Indicates that the service provider saves the local cache file key=com.alibaba.dubbo.demo.DemoService
            value=
            provider://192.168.43.174:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=16140&side=provider&timestamp=1578536147384
           ***/
            saveProperties(url);
            //Call the OverrideListener#notify (List<URL> urls) method in RegistryProtocol
            listener.notify(categoryList);
        }
    }

OverrideListener#notify(List urls) method

@Override
        public synchronized void notify(List<URL> urls) {
            //urls Here is only one data empty://192.168.43.174:20880/com.alibaba.dubbo.demo.DemoService?Anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=16140&side=provider&timestamp=1578536147384
            logger.debug("original override urls: " + urls);
            //subscribeUrl
            //provider://192.168.43.174:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=16140&side=provider&timestamp=1578536147384
            //Get a matching url
            List<URL> matchedUrls = getMatchedUrls(urls, subscribeUrl);
            logger.debug("subscribe url: " + subscribeUrl + ", override urls: " + matchedUrls);
            // No matching results
            if (matchedUrls.isEmpty()) {
                return;
            }
//Extract the changed configuration
            List<Configurator> configurators = RegistryDirectory.toConfigurators(matchedUrls);

            final Invoker<?> invoker;
            if (originInvoker instanceof InvokerDelegete) {
                invoker = ((InvokerDelegete<?>) originInvoker).getInvoker();
            } else {
                invoker = originInvoker;
            }
            //The origin invoker
            URL originUrl = RegistryProtocol.this.getProviderUrl(invoker);
            String key = getCacheKey(originInvoker);
            ExporterChangeableWrapper<?> exporter = bounds.get(key);
            if (exporter == null) {
                logger.warn(new IllegalStateException("error state, exporter should not be null"));
                return;
            }
            //The current, may have been merged many times
            URL currentUrl = exporter.getInvoker().getUrl();
            //Merged with this configuration
            //Assembling a new url based on changing configuration information
            URL newUrl = getConfigedInvokerUrl(configurators, originUrl);
            if (!currentUrl.equals(newUrl)) {
                //Re-export the service if the new url is different from the original
                RegistryProtocol.this.doChangeLocalExport(originInvoker, newUrl);
                logger.info("exported provider url changed, origin url: " + originUrl + ", old export url: " + currentUrl + ", new export url: " + newUrl);
            }
        }

The part of subscribing to override data here is also analyzed

3.3.4 Summary

The process of registering services here is analyzed and divided into two parts: creating a registry instance, registering services through the registry instance, and subscribing to configuration information changes.

4. Summary

The whole process of service publishing has been completed. To summarize, the following steps are the main steps:

  1. Prefix: Check parameter assembly URl
  2. Exposing services locally
  3. Exposing services to remote
  4. Start netty Exposure Service
  5. Create Connection zk Registry
  6. Service registered to zk
  7. Subscribe to override data at zk

Original Address

52 original articles published. 8% praised. 130,000 visits+
Private letter follow

Tags: Dubbo Zookeeper Netty Spring

Posted on Sat, 11 Jan 2020 16:59:36 -0800 by amargharat