How to register dubbo with zookeeper

Dubbo's Provider and Consumer will create a registration center when they start. The registration center can choose zookeeper and Redis. Zookeeper is commonly used

In Dubbo, zkclient is used to operate the zookeeper server by default. It encapsulates the original customer list of zookeeper to a certain extent. When operating zookeeper, it is more convenient, such as no need to handle session timeout manually, no need to repeatedly register the watcher, etc.

Directory of nodes registered by Dubbo on Zookeeper:

Suppose the interface name is: com.bob.dubbo.service.CityDubboService

When Dubbo starts, both Consumer and Provider will format their own URL as a string, and then register it under the corresponding node of zookeeper as a temporary node. When the connection is disconnected, the node will be deleted.

When a Consumer starts, it not only registers itself to /In the directory of consumers, you can also subscribe to /The providers directory, which gets the URL string information of the Provider on it in real time.

 

Let's look at the relevant code implementation:

public class ZookeeperRegistry extends FailbackRegistry {

    ......

    /**
     * Default port
     */
    private final static int DEFAULT_ZOOKEEPER_PORT = 2181;
    /**
     * Default Zookeeper root node
     */
    private final static String DEFAULT_ROOT = "dubbo";

    /**
     * Zookeeper Root node
     */
    private final String root;
    /**
     * Service Interface full name collection
     */
    private final Set<String> anyServices = new ConcurrentHashSet<String>();
    /**
     * Listener set
     */
    private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners
        = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();
    /**
     * Zookeeper Client
     */
    private final ZookeeperClient zkClient;

    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);   // Call the constructor of the parent FailbackRegistry
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        // Get the Zookeeper root node, dubbo if the "group" parameter is not specified
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); // `url.parameters.group ` parameter value
        if (!group.startsWith(Constants.PATH_SEPARATOR)) {
            group = Constants.PATH_SEPARATOR + group;
        }
        this.root = group;   // root = "/dubbo"
        // Create Zookeeper Client
        zkClient = zookeeperTransporter.connect(url);
        // Add StateListener object. The listener calls the recovery method when reconnecting.
        zkClient.addStateListener(new StateListener() {
            @Override
            public void stateChanged(int state) {
                if (state == RECONNECTED) {
                    try {
                        recover();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        });
    }
}

 

public abstract class FailbackRegistry extends AbstractRegistry {

    /**
     * URL collection failed to initiate registration
     */
    private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();
    /**
     * Unregister failed URL collection
     */
    private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();
    /**
     * Listener collection failed to initiate subscription
     */
    private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
    /**
     * Unsubscribe failed listener collection
     */
    private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
    /**
     * URL collection for notification notifications
     */
    private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();

    public FailbackRegistry(URL url) {
        super(url);
        // Retry frequency, unit: ms, default 5 * 1000
        int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
        // Create failed retry timer
        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                // Check and connect to the registry
                try {
                    retry();
                } catch (Throwable t) { // Defensive fault tolerance
                    logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
                }
            }
        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
    }

    /**
     * retry
     */
    // Retry the failed actions
    protected void retry() {
        // Retry registration
        if (!failedRegistered.isEmpty()) {
            ......
            for (URL url : failed) {
                try {
                    // Enforcement registration
                    doRegister(url);
                    // Remove out ` failedRegistered`
                    failedRegistered.remove(url);
                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                    logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                }
            }
        }
        // Retry unregister
        if (!failedUnregistered.isEmpty()) {
            ......
            for (URL url : failed) {
                try {
                    // Perform deregistration
                    doUnregister(url);
                    // Remove out ` failedUnregistered`
                    failedUnregistered.remove(url);
                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                    logger.warn("Failed to retry unregister  " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                }
            }
        }
        // Retry subscription
        if (!failedSubscribed.isEmpty()) {
            ......
            for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
                URL url = entry.getKey();
                Set<NotifyListener> listeners = entry.getValue();
                for (NotifyListener listener : listeners) {
                    try {
                        // Executive subscription
                        doSubscribe(url, listener);
                        // Remove listener
                        listeners.remove(listener);
                    } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                        logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
        // Retry unsubscribe
        if (!failedUnsubscribed.isEmpty()) {
            ......
            for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
                URL url = entry.getKey();
                Set<NotifyListener> listeners = entry.getValue();
                for (NotifyListener listener : listeners) {
                    try {
                        // Execute unsubscribe
                        doUnsubscribe(url, listener);
                        // Remove listener
                        listeners.remove(listener);
                    } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                        logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
    }

}

 

ZookeeperRegistry calls the parent constructor when instantiated. In the parent constructor, a timer task is created, and the retry() method is executed every 5S.

In the retry() method, retry the failed actions. Retry actions include:

Provider registers its own url with zookeeper to generate a temporary znode
Provider exits from the Dubbo container and stops providing RPC calls. That is to remove the znode corresponding to its own url in zookeeper
Consumer subscription "/ dubbo / Child node of the "Service/providers" directory, generating ChildListener
Consumer exits from the Dubbo container to remove the ChildListener created previously

Why is this set up? It is mainly related to the communication mechanism of zookeeper. When the Client and Server of zookeeper are disconnected or the heartbeat times out, the Server will delete the temporary nodes registered by the corresponding Client, and the registered Listener will also be deleted.

The URLs registered by Provider and Consumer belong to temporary nodes. When the connection is disconnected, Dubbo registers the StateListener of zookeeper, that is, the state listener. When the zookeeper Client and Server in Dubbo reconnect, add the previously registered URLs to these failed collections, and then register and subscribe again.

 

Look at the constructor of ZookeeperRegistry, which adds a StateListener:

public class ZookeeperRegistry extends FailbackRegistry {

    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        ......
        // Add StateListener object. The listener calls the recovery method when reconnecting.
        zkClient.addStateListener(new StateListener() {
            @Override
            public void stateChanged(int state) {
                if (state == RECONNECTED) {
                    try {
                        recover();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        });
    }
}
public abstract class FailbackRegistry extends AbstractRegistry {

    protected void recover() throws Exception {
        // register to resume registration, add to 'failedRegistered', retry regularly
        Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
        if (!recoverRegistered.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Recover register url " + recoverRegistered);
            }
            for (URL url : recoverRegistered) {
                failedRegistered.add(url);
            }
        }
        // subscribe resume subscription, add to 'failedSubscribed', retry regularly
        Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
        if (!recoverSubscribed.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Recover subscribe url " + recoverSubscribed.keySet());
            }
            for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
                URL url = entry.getKey();
                for (NotifyListener listener : entry.getValue()) {
                    addFailedSubscribed(url, listener);
                }
            }
        }
    }
}

 

In the ZookeeperRegistry constructor, a state listener is added to the operation client of zookeeper. When reconnecting (reconnection means that the previous connection was disconnected), the registered and subscribed URL s are added to the failed collection, and the regular retry, that is, re registration and subscription, is performed.

After the hookeeper client is disconnected from the Server, it will try to reconnect at regular intervals. When the connection is successful, an Event will be triggered. Dubbo has registered the listener in the CONNECTED state. When the connection is successful, it will re register and subscribe.

The zookeeper Server is down. The Client in Dubbo doesn't respond to this event. Of course, its internal zkClient will keep trying to connect to the Server. When zookeeper Server goes down, RPC calls of registered components in Dubbo will not be affected, because Invoker objects have been generated through URL, and these objects are still in Dubbo container. Of course, because the registry is down, we can't perceive the new Provider. At the same time, because the Provider information obtained from the previous subscription has been persisted to the local file, when the Dubbo application is restarted, if the zookeeper registry is not available, the Provider information cached in the file will be loaded, which can ensure the high availability of the service.

The Consumer will always maintain the ChildListener of the Provider and listen to the real-time data information of the Provider. When the sub nodes of the Providers node change, notify Dubbo in real time, update the URL, and update the Consumer Invoker object in the Dubbo container at the same time. As long as the subscription is successful, the Provider will be synchronized in real time, and the Invoker object will be updated, whether it is the first subscription or the subscription after disconnection and reconnection:

public class ZookeeperRegistry extends FailbackRegistry {

    protected void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            // Handle all Service layer initiation subscriptions, such as those in the monitoring center
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                ......
                // Process the originating subscription of the specified Service layer, such as the subscription of the Service consumer
            } else {
                // Data array of child nodes
                List<URL> urls = new ArrayList<URL>();
                // Circular classification array, router, configurator, provider
                for (String path : toCategoriesPath(url)) {
                    // Get the listener set corresponding to url
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) { // No, create
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }
                    // Get ChildListener object
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) { // No listener exists for subdirectory to create ChildListener object
                        // Subscribe to the parent directory and trigger this callback function when any child node changes
                        listeners.putIfAbsent(listener, new ChildListener() {
                            @Override
                            public void childChanged(String parentPath, List<String> currentChilds) {
                                // When changing, call the 'notify(...)' method and call back the NotifyListener
                                ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    // Create a Type node. The node is persistent.
                    zkClient.create(path, false);
                    // Initiate subscription to Zookeeper, PATH node, and return all child elements under this node PATH: / root node / interface full name / providers, such as / dubbo/com.bob.service.CityService/providers
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    // Add to 'urls'
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                // When the first full data acquisition is completed, call the 'notify(...)' method and call back the NotifyListener. In this step, instantiate the Invoker from the connection Provider
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

}

 

Subscribe to get the latest URL string of Providers, call notify( )Method, notify the listener, and finally execute the following code:

public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {

    private volatile List<Configurator> configurators;
    
    private volatile Map<String, Invoker<T>> urlInvokerMap;
 
    private volatile Map<String, List<Invoker<T>>> methodInvokerMap;  

    private volatile Set<URL> cachedInvokerUrls; 


    private void refreshInvoker(List<URL> invokerUrls) {
        // The url obtained from zookeeper is no longer suitable. When the subscription return is empty, an empty "Protocol" url will be generated manually
        if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
            && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
            this.forbidden = true; // Forbid to access
            this.methodInvokerMap = null; // Set the method invoker map to null
            destroyAllInvokers(); // Close all invokers
        } else {
            this.forbidden = false; // Allow to access
            Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
            if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
                invokerUrls.addAll(this.cachedInvokerUrls);
            } else {
                this.cachedInvokerUrls = new HashSet<URL>();
                this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
            }
            if (invokerUrls.isEmpty()) {
                return;
            }
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
            Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
            // state change
            // If the calculation is wrong, it is not processed.
            if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
                logger.error(new IllegalStateException(
                    "urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
                return;
            }
            this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
            this.urlInvokerMap = newUrlInvokerMap;
            try {
                destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
            } catch (Exception e) {
                logger.warn("destroyUnusedInvokers error. ", e);
            }
        }
    }
}

Update the relevant data of the Invoker in Dubbo to ensure that the Consumer can sense the information of the Provider in real time, and ensure that the PRC call will not go wrong.

This is the implementation process of Zookeeper registry in Dubbo.

 

Conclusion:

Provider and Consumer register temporary nodes with Zookeeper, and delete the corresponding registered nodes when the connection is disconnected.
The Consumer subscribes to the sub nodes of the Providers node, perceives the changes of the Provider in real time, synchronizes its own Invoker object in real time, and ensures the availability of RPC.

Tags: Programming Zookeeper Dubbo Redis Session

Posted on Mon, 16 Mar 2020 03:35:59 -0700 by Tezread