The difference between the implementation of event monitoring mechanism in Curator application scenarios and the monitoring principle of zookeeper's native watcher

Curator is different from zookeeper's watcher monitoring mechanism


The native zookeeper monitoring API has some shortcomings. ZooKeeper supports event monitoring by registering Watcher, but its use is not particularly convenient. It requires developers to register Watcher repeatedly, which is cumbersome. For developers, subsequent development will consider more details.

Curator's approach is to get rid of the shortcomings of the original API. It's simpler to develop, and some operations such as reconnection become transparent to developers without considering them.

Curator introduces Cache to monitor ZooKeeper server events.
Cache is the packaging of event monitoring in Curator. In fact, it can be regarded as a comparison process between local cache view and remote ZooKeeper view. At the same time, Curator can automatically process repeated registration monitoring for developers, which greatly simplifies the tedious process of native API development. Cache can be classified into two types: node monitoring and child monitoring.

I. Curator's Monitoring API

Curator's monitoring implementation is an advanced encapsulation of zookeeper's native monitoring method.
It is mainly reflected in two points: monitoring the repeated registration and information of events.
And the listening event returns detailed information, such as the changed node information, the value of the node and so on.

Curator provides three interfaces:

As shown in the figure:

  1. NodeCache: A node is monitored. The monitoring event includes the operation of adding, deleting and changing the specified path node.

  2. Path ChildrenCache: Monitors the first-level subdirectory of a specified path node, does not monitor the operation of that node, and adds, deletes and alters the nodes of its subdirectory.

  3. TreeCache: The designated path node can be used as the root node (ancestor node) to monitor all its sub-node operations, present the tree directory monitoring, and set the listening depth, the maximum listening depth is 21474847 (the maximum value of int type).

Curator introduces Cache to implement event monitoring on Zookeeper server. Cache event monitoring can be understood as a process of comparing a local cache view with a remote Zookeeper view. Cache provides the function of repeated registration. Cache can be divided into two types of registration: node monitoring and child monitoring.

1. NodeCache (for listening on data nodes themselves)

Used to monitor changes in the data node itself. Two construction methods are provided:

public NodeCache(CuratorFramework client, String path)

public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)

The parameter dataIsCompressed denotes whether the data is compressed, while the first method is internally implemented to call the second method, and the dataIsCompressed is set to false by default.

Node monitoring needs to cooperate with callback function to deal with business processing after receiving listening events. NodeCache uses NodeCacheListener to complete subsequent processing. Specific code examples are as follows:

/**
 * Curator Node CacheListener implements the node monitoring function of zk:
 * todo: The trigger event is to create and update the node, and does not trigger the watcher when the node is deleted.
 */
public class CuratorWatcher1 {

    /** zookeeper address */
    static final String CONNECT_ADDR = "172.16.158.11:2181,"
            + "172.16.158.12:2181,"
            + "172.16.158.13:2181";
    /** session timeout */
    static final int SESSION_OUTTIME = 5000;//ms

    public static void main(String[] args) throws Exception {

        //1 retry strategy: 10 retries in 1s
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
        //2 Create connections through factories
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString(CONNECT_ADDR)
                .sessionTimeoutMs(SESSION_OUTTIME)
                .retryPolicy(retryPolicy)
                .build();

        //3 Establishing Connections
        cf.start();

        //4. Create a cache cache
        final NodeCache cache = new NodeCache(cf, "/super", false);
        cache.start(true);
        cache.getListenable().addListener(new NodeCacheListener() {
            /**
             * <B>Method Name: </B>nodeChanged<BR>
             * <B>Summary Note: </B> Trigger event is to create and update nodes, and does not trigger this operation when deleting nodes. <BR>
             * @see org.apache.curator.framework.recipes.cache.NodeCacheListener#nodeChanged()
             */
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("The path is:" + cache.getCurrentData().getPath());
                System.out.println("The data are:" + new String(cache.getCurrentData().getData()));
                System.out.println("The state is:" + cache.getCurrentData().getStat());
                System.out.println("---------------------------------------");
            }
        });

        Thread.sleep(1000);
        cf.create().forPath("/super", "123".getBytes());

        Thread.sleep(1000);
        cf.setData().forPath("/super", "456".getBytes());

        Thread.sleep(1000);
        cf.delete().forPath("/super");

//        Thread.sleep(Integer.MAX_VALUE);
    }
}

Implementation results:

The path is: / super
 Data: 123
 State: 8589937827, 8589937827, 1567847615844, 1567847615844, 0, 0, 0, 0, 3, 0, 8589937827

---------------------------------------
The path is: / super
 Data: 456
 State: 8589937827, 8589937828, 1567847615844, 1567847616855, 1, 0, 0, 0, 3, 0, 8589937827

---------------------------------------

NodeCache's start method has a method with a Boolean parameter. If set to true, the node content will be cached into Cache at the first boot.

After experimentation, it is found that some monitoring events will be lost if the contents of the monitoring nodes are modified several times. Other versions are not validated. This version needs special attention here.

NodeCache can monitor not only the content changes of nodes, but also the existence of specified nodes. If the original node does not exist, then Cache triggers the listening event when the node is created, and if the node is deleted, the listening event cannot be triggered again.

2. Path ChildrenCache (child nodes for listening on data nodes)

Path ChildrenCache is used to monitor the changes of data node subnodes. The current version provides a total of seven constructions, two of which are no longer recommended.

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService)

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory)

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory)

Common parameters are no longer specified. Where cacheData denotes whether to cache the content of the node, if true, then the content of the node will be obtained at the same time as the change of the node list is received.

ExecutorService and threadFactory provide a way to handle listening events through thread pools.

PathChildrenCache uses PathChildrenCacheListener to handle listening events. See code examples for specific usage methods:

/**
 * todo:PathChildrenCacheListener Listening for Subnode Change - > New, Modified, Deleted Subnodes
 * Note: Creating its own nodes does not change
 */
public class CuratorWatcher2 {

    /** zookeeper address */
    static final String CONNECT_ADDR = "172.16.158.11:2181,"
            + "172.16.158.12:2181,"
            + "172.16.158.13:2181";
    /** session timeout */
    static final int SESSION_OUTTIME = 5000;//ms

    public static void main(String[] args) throws Exception {

        //1 retry strategy: 10 retries in 1s
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
        //2 Create connections through factories
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString(CONNECT_ADDR)
                .sessionTimeoutMs(SESSION_OUTTIME)
                .retryPolicy(retryPolicy)
                .build();

        //3 Establishing Connections
        cf.start();

        //4. Establish a PathChildrenCache cache. The third parameter is whether to accept node data content, but not if it is false.
        PathChildrenCache cache = new PathChildrenCache(cf, "/super", true);
        //5 Cache monitoring at initialization time
        cache.start(StartMode.POST_INITIALIZED_EVENT);
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            /**
             * <B>Method Name: </B> Listen for Subnode Change < BR >
             * <B>Summary Description: </B> New, Modified and Deleted <BR>
             * @see org.apache.curator.framework.recipes.cache.PathChildrenCacheListener#childEvent(org.apache.curator.framework.CuratorFramework, org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent)
             */
            @Override
            public void childEvent(CuratorFramework cf, PathChildrenCacheEvent event) throws Exception {
                switch (event.getType()) {
                    case CHILD_ADDED:
                        System.out.println("CHILD_ADDED :" + event.getData().getPath());
                        break;
                    case CHILD_UPDATED:
                        System.out.println("CHILD_UPDATED :" + event.getData().getPath());
                        break;
                    case CHILD_REMOVED:
                        System.out.println("CHILD_REMOVED :" + event.getData().getPath());
                        break;
                    default:
                        break;
                }
            }
        });

        //Creating its own nodes does not change
        cf.create().forPath("/super", "init".getBytes());

        //Add child nodes
        Thread.sleep(1000);
        cf.create().forPath("/super/c1", "c1 content".getBytes());
        Thread.sleep(1000);
        cf.create().forPath("/super/c2", "c2 content".getBytes());

        //Modify subnodes
        Thread.sleep(1000);
        cf.setData().forPath("/super/c1", "c1 Update content".getBytes());

        //Delete child nodes
        Thread.sleep(1000);
        cf.delete().forPath("/super/c2");

        //Delete its own nodes
        Thread.sleep(1000);
        cf.delete().deletingChildrenIfNeeded().forPath("/super");

        Thread.sleep(Integer.MAX_VALUE);
    }
}


Implementation results:

CHILD_ADDED :/super/c1
CHILD_ADDED :/super/c2
CHILD_UPDATED :/super/c1
CHILD_REMOVED :/super/c2
CHILD_REMOVED :/super/c1

** PathChildrenCache does not listen on secondary subnodes, but only on subnodes. ** Looking at the example above, you will find that thread sleep is used between creating and deleting child nodes, otherwise you can not receive listening events, which is also a point to be noted in the use process.
At the same time, we also see that the client has not been notified of changes to the node zk=super itself.
In addition, like other ZooKeeper client products, Curator is unable to listen for events on secondary sub-nodes. That is to say, if you use PathChildrenCache to monitor / super, then when / super/c1/c2 nodes are created or deleted, it is impossible to trigger child node change events.

3. TreeCache (listening for changes in all nodes under a specified node)

Characteristic:

(1) Permanently monitor changes of nodes under specified nodes
(2) You can monitor the changes of all the nodes under the specified node, such as the specified node"/example". Adding "node1" below can monitor, but adding "node1/n1" can also be heard.
(3) Events that can be monitored: node creation, node data changes, node deletion, etc.

How to use it:

(1) Create client for curator framework
(2) Add TreeCache
(3) Start client and TreeCache
(4) Registered listener

Part of the sample code:

TreeCache treeCache = null;

treeCache.start();
        // There is no open mode as an entry method
        
        treeCache.getListenable().addListener(new TreeCacheListener(){

            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event)
                    throws Exception {
                // 
                switch(event.getType()) {
                case NODE_ADDED: 
                    System.out.println("tree:Occurrence of node addition" + event.getData().toString() ); break;
                case NODE_UPDATED:
                    System.out.println("tree:Node updates occur"); break;
                case NODE_REMOVED:
                    System.out.println("tree:Node deletion occurs"); break;
                case CONNECTION_SUSPENDED: 
                    break;
                case CONNECTION_RECONNECTED:
                    break;
                case CONNECTION_LOST:
                    break;
                case INITIALIZED:
                    System.out.println("Initialization operation"); break;
                default:
                    break;
                }
            }
            
        });

Tags: Zookeeper Apache Session

Posted on Sat, 07 Sep 2019 03:37:17 -0700 by yarin