Dubbo's principle of service consumption

Preface

Last article Dubbo's service exposure This paper analyzes how Dubbo services are exposed, and then analyzes the consumption process of Dubbo services. Mainly from the following aspects: the exposure of the registration center; service consumption notice through the registration center; direct service consumption. When the service consumer starts, it registers its own information to the directory of the registry, and subscribes to the directory of the service provider. When the URL of the service provider changes, it gets new data in real time.

Service consumer process

Here is a flow chart of service consumption:

As you can see in the figure above, the process of service consumption is a bit similar to that of service exposure. Likewise, Dubbo service is divided into two steps: the first step is to convert remote service into Invoker through Protocol (the concept is explained in the previous article). The second step is to transform the Invoker into the interface needed by the consumer service through the dynamic proxy.

The org.apache.dubbo.config.ReferenceConfig class is the parent class of referencesbean. Like the ServiceBean of the production-side service, it stores the parsed XML and annotation information. Class relations are as follows:

Portal for transformation in service initialization

When we call the local interface on the consumer side, we can call the remote service. How can we do this? According to the above flow chart, to analyze the principle of consumption. When the consumer initializes, referenceconfig? Init will execute referenceconfig? Createproxy to complete this series of operations. The following is the main code part of referenceconfig ා createproxy:

private T createProxy(Map<String, String> map) {
    // Determine whether it is a Jvm local reference
    if (shouldJvmRefer(map)) {
        // Get local service through injvm protocol
        URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
        invoker = REF_PROTOCOL.refer(interfaceClass, url);
    } else {
        urls.clear();
        // Determine whether there is a custom direct connection address or registration center address
        if (url != null && url.length() > 0) { 
            String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
            if (us != null && us.length > 0) {
                for (String u : us) {
                    URL url = URL.valueOf(u);
                    if (StringUtils.isEmpty(url.getPath())) {
                        url = url.setPath(interfaceName);
                    }
                    if (UrlUtils.isRegistry(url)) {
                        // If it is a registry Protocol type, add the refer service consumption metadata to the address
                        urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                    } else {
                        // Direct service provider
                        urls.add(ClusterUtils.mergeUrl(url, map));
                    }
                }
            }
        } else {
            // Assembly registry configuration
            if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
                // Check configuration center
                checkRegistry();
                List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
                if (CollectionUtils.isNotEmpty(us)) {
                    for (URL u : us) {
                        URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
                        if (monitorUrl != null) {
                            // Monitoring and reporting information
                            map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        // Add refer service consumption metadata to registry address
                        urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
            }
        }

        // Only one registration center data, i.e. single registration center
        if (urls.size() == 1) {
            // Convert remote service to Invoker
            invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
        } else {
            // Because there will be multiple invokers in multiple registries, they are saved in the List
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;
            for (URL url : urls) {
                // Convert each registry to Invoker data
                invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                if (UrlUtils.isRegistry(url)) {
                    // Will overwrite the previous traversal of the registry, using the last registry data
                    registryURL = url;
                }
            }
            if (registryURL != null) {
                // Use zone aware policy to handle multiple subscriptions by default
                URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME);
                // Merge the converted invokers into one
                invoker = CLUSTER.join(new StaticDirectory(u, invokers));
            } else {
                invoker = CLUSTER.join(new StaticDirectory(invokers));
            }
        }
    }
    // Using dynamic agent to transform Invoker into local interface agent
    return (T) PROXY_FACTORY.getProxy(invoker);
}

In the above transformation process, it can be summarized as follows: first, it is divided into local reference and remote reference. Local is to obtain local services based on the inJvm protocol, which is not explained too much; remote references are divided into direct connection services and through the registry. The registration center can be divided into single registration center and multiple registration centers. The single registration center is easy to solve and can be used directly. When there are multiple registration centers, the converted Invoker will be combined into one Invoker. Finally, the Invoker is transformed into a local interface proxy through a dynamic proxy.

Get Invoker instance

Because the local service is directly obtained from the cache, the consumption of the registry is analyzed here. In the code fragment above, ref.protocol.refer is used for conversion. This method code:

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // Get the service's registry url, which will set the registry protocol and remove the registry parameters
    url = getRegistryUrl(url);
    // Get registry instance
    Registry registry = registryFactory.getRegistry(url);
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }

    // Get service consumption metadata
    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
    // Get grouping information from service consumption metadata
    String group = qs.get(GROUP_KEY);
    if (group != null && group.length() > 0) {
        if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
            // Perform Invoker conversion
            return doRefer(getMergeableCluster(), registry, type, url);
        }
    }
    // Perform Invoker conversion
    return doRefer(cluster, registry, type, url);
}

The above is mainly to obtain service consumption registration center instances and service groups, and finally call the doRefer method to do the conversion work. The following is the code of doRefer:

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    // Create a RegistryDirectory object
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    // Set up a registry
    directory.setRegistry(registry);
    // Setup protocol
    directory.setProtocol(protocol);
    // directory.getUrl().getParameters() is the service consumption metadata
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
    if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
        directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
        // Consumer message registration to registration center
        registry.register(directory.getRegisteredConsumerUrl());
    }

    directory.buildRouterChain(subscribeUrl);
    // Service consumer subscription: service provider, dynamic configuration, routing notification
    directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
            PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));

    // Multiple invokers combined into one
    Invoker invoker = cluster.join(directory);
    return invoker;
}

The above implementation is mainly to complete the creation of the RegistryDirectory object, register the consumption service metadata to the registry, and realize the subscription related functions of service provider, dynamic configuration and routing through the information in the RegistryDirectory object.

The RegistryDirectory class implements the NotifyListener notification listening interface. When the subscribed service, configuration or route changes, notifications will be received and changed accordingly:

public synchronized void notify(List<URL> urls) {
    // Save the service provider configuration, routing configuration, and service provider's services in the Map with different key s
    Map<String, List<URL>> categoryUrls = urls.stream()
            .filter(Objects::nonNull)
            .filter(this::isValidCategory)
            .filter(this::isNotCompatibleFor26x)
            .collect(Collectors.groupingBy(url -> {
                if (UrlUtils.isConfigurator(url)) {
                    return CONFIGURATORS_CATEGORY;
                } else if (UrlUtils.isRoute(url)) {
                    return ROUTERS_CATEGORY;
                } else if (UrlUtils.isProvider(url)) {
                    return PROVIDERS_CATEGORY;
                }
                return "";
            }));

    // Update service provider configuration
    List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
    this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

    // Update route configuration
    List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
    toRouters(routerURLs).ifPresent(this::addRouters);

    // Load service provider's service information
    List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
    /**
     * 3.x added for extend URL address
     */
    ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
    List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
    if (supportedListeners != null && !supportedListeners.isEmpty()) {
        for (AddressListener addressListener : supportedListeners) {
            providerURLs = addressListener.notify(providerURLs, getUrl(),this);
        }
    }
    // Reload Invoker instance
    refreshOverrideAndInvoker(providerURLs);
}

In registrydirectory notify, the Invoker will be refreshed and reloaded. The following is the implementation of the core code:

private void refreshOverrideAndInvoker(List<URL> urls) {
    // mock zookeeper://xxx?mock=return null
    overrideDirectoryUrl();
    // Refresh invoker 
    refreshInvoker(urls);
}

private void refreshInvoker(List<URL> invokerUrls) {
    Assert.notNull(invokerUrls, "invokerUrls should not be null");

    if (invokerUrls.size() == 1
            && invokerUrls.get(0) != null
            && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {

        ......

    } else {
        // Refresh previous Invoker
        Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
        // Load a new Invoker Map
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
        // Get new Invokers
        List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
        // Cache new Invokers
        routerChain.setInvokers(newInvokers);
        this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
        this.urlInvokerMap = newUrlInvokerMap;

        try {
            // Destroy the useless Invokers by comparing the old ones with the new ones
            destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
        } catch (Exception e) {
            logger.warn("destroyUnusedInvokers error. ", e);
        }
    }
}

Get the Invokers before and after the refresh, cache the new ones again, and destroy the useless ones through comparison.

The URL conversion Invoker is performed in registrydirectory to invokers.

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
    Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
   
    Set<String> keys = new HashSet<>();
    String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
    for (URL providerUrl : urls) {

        // Filter the protocol that the consumer does not match, and illegal protocol
        ......

        // Merge service provider configuration data
        URL url = mergeUrl(providerUrl);
        // Filter duplicate service provider configuration data
        String key = url.toFullString();
        if (keys.contains(key)) {
            continue;
        }
        keys.add(key);

        // The cache key is a url that does not merge with the client parameters. No matter how the user merges the parameters, if the server url changes, it will be referenced again
        Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
        Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
        
        // If there is no corresponding invoker in the cache, call protocol ා refer again to check whether there is data
        if (invoker == null) {
            try {
                boolean enabled = true;
                if (url.hasParameter(DISABLED_KEY)) {
                    enabled = !url.getParameter(DISABLED_KEY, false);
                } else {
                    enabled = url.getParameter(ENABLED_KEY, true);
                }
                if (enabled) {
                    invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
                }
            } catch (Throwable t) {
                logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
            }
            // Cache the new Invoker
            if (invoker != null) { // Put new invoker in cache
                newUrlInvokerMap.put(key, invoker);
            }
        } else {
            // If there is data in the cache, it will be overwritten
            newUrlInvokerMap.put(key, invoker);
        }
    }
    keys.clear();
    return newUrlInvokerMap;
}

summary

Through "Dubbo's service exposure" and two articles in this paper, we understand the principle of Dubbo's service exposure and service consumption. We can see that whether it is exposure or consumption, Dubbo takes Invoker as the main body of data exchange to implement a remote or local implementation by calling Invoker.

Personal blog: https://ytao.top

Pay attention to the official account [ytao], more original and good articles.

Tags: Programming Dubbo Apache xml jvm

Posted on Mon, 09 Mar 2020 23:44:03 -0700 by veronicabend