Source code analysis of Dubbo cluster fault tolerance strategy

In the previous article, the service discovery (Directory, registry Directory), routing mechanism (Router), and load balancing mechanism have been separately analyzed. This section will focus on the analysis of the cluster fault tolerance mechanism (AbstractClusterInvoker). AbstractClusterInvoker is the integration of the above mechanisms. In the whole cluster fault tolerance, the role of the above components is shown in the figure below. This article will focus on Analyze how AbstractClusterInvoker integrates these components. AbstractClusterInvoker#invoke

@Override
    public Result invoke(final Invocation invocation) throws RpcException {
        checkWhetherDestroyed();
        LoadBalance loadbalance = null;
        List<invoker<t>&gt; invokers = list(invocation);    // @1
        if (invokers != null &amp;&amp; !invokers.isEmpty()) {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));      // @2
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);   
        return doInvoke(invocation, invokers, loadbalance);  // @3
    }

Code @ 1: get the list of service providers according to the calling context, and the service providers get it from the Directory.

protected List<invoker<t>&gt; list(Invocation invocation) throws RpcException {
        List<invoker<t>&gt; invokers = directory.list(invocation);
        return invokers;
 }

Finally, the list method of registrydirectory will be called. The service provider of this method is that when the list of service providers subscribed by the consumer changes, an event will be generated in the registry, and then the consumer will be notified to update the list of service providers (local cache). It should be noted that before returning to Invoker, registrydirectory has used Router for filtering once, which is implemented in the registrydirectory notify method.

Code @ 2: according to the SPI mechanism, get the implementation class of the load balancing algorithm. According to the configuration values of < Dubbo: consumer loadbalance = "" / >, < Dubbo: reference loadbalance = "" / > and other tags, the default is random, weighted random algorithm.

Code @ 3: select a service provider according to the call context, service provider list and load balancing algorithm. The specific code is implemented by each subclass of AbstractClusterInvoker.

The cluster fault tolerance strategies currently supported by Dubbo are defined in / Dubbo cluster / SRC / main / resources / meta-inf / Dubbo / internal / com.alibaba.dubbo.rpc.cluster.cluster.cluster

mock=com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper
failover=com.alibaba.dubbo.rpc.cluster.support.FailoverCluster
failfast=com.alibaba.dubbo.rpc.cluster.support.FailfastCluster
failsafe=com.alibaba.dubbo.rpc.cluster.support.FailsafeCluster
failback=com.alibaba.dubbo.rpc.cluster.support.FailbackCluster
forking=com.alibaba.dubbo.rpc.cluster.support.ForkingCluster
available=com.alibaba.dubbo.rpc.cluster.support.AvailableCluster
mergeable=com.alibaba.dubbo.rpc.cluster.support.MergeableCluster
broadcast=com.alibaba.dubbo.rpc.cluster.support.BroadcastCluster

For the above cluster policies, the corresponding executor is Cluser+Invoker. For example, the Invoker corresponding to FailoverCluster is FailoverClusterInvoker.

Before explaining various cluster fault tolerance strategies, let's first focus on AbstractClusterInvoker's algorithm for selecting service providers from service providers according to different load balancing algorithms.

1. Source code analysis abstractclusterinvoker ා select

AbstractClusterInvoker#select

protected Invoker<t> select(LoadBalance loadbalance, Invocation invocation, List<invoker<t>&gt; invokers, List<invoker<t>&gt; selected) throws 
        RpcException {    // @1
    if (invokers == null || invokers.isEmpty())
         return null;
    String methodName = invocation == null ? "" : invocation.getMethodName();
    boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, 
                Constants.DEFAULT_CLUSTER_STICKY);     // @2
        {
            //ignore overloaded method
            if (stickyInvoker != null &amp;&amp; !invokers.contains(stickyInvoker)) {
                stickyInvoker = null;
            }
            //ignore concurrency problem
            if (sticky &amp;&amp; stickyInvoker != null &amp;&amp; (selected == null || !selected.contains(stickyInvoker))) {
                if (availablecheck &amp;&amp; stickyInvoker.isAvailable()) {
                    return stickyInvoker;
                }
            }
        }
        Invoker<t> invoker = doSelect(loadbalance, invocation, invokers, selected);   // @3

        if (sticky) {
            stickyInvoker = invoker;
        }
        return invoker;
    }

Code @ 1: parameter description

  • Loadbalance: load balance algorithm.
  • Invocation invocation: service invocation context.
  • List < invoker < T > > invokers: list of service providers to be selected.
  • List < invoker < T > > selected: the selected service provider in this cluster test. Code @ 2: sticky mechanism (sticky), if the sticky mechanism is enabled. Through < Dubbo: Method sticky = "true" / >, it is not enabled by default. If it is enabled, which service provider was last called by the service. As long as there is no error during the call, the service provider will be selected for subsequent calls.

Code @ 3: execute doSelect selection.

1.1 source code analysis abstractclusterinvoker × doselect

private Invoker<t> doSelect(LoadBalance loadbalance, Invocation invocation, List<invoker<t>&gt; invokers, List<invoker<t>&gt; selected) throws RpcException {
        if (invokers == null || invokers.isEmpty())
            return null;
        if (invokers.size() == 1)    // @1
            return invokers.get(0);
        // If we only have two invokers, use round-robin instead.
        if (invokers.size() == 2 &amp;&amp; selected != null &amp;&amp; !selected.isEmpty()) {    // @2
            return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
        }
        if (loadbalance == null) {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
        }
        Invoker<t> invoker = loadbalance.select(invokers, getUrl(), invocation);    // @3

        //If the `invoker` is in the  `selected` or invoker is unavailable &amp;&amp; availablecheck is true, reselect.
        if ((selected != null &amp;&amp; selected.contains(invoker))
                || (!invoker.isAvailable() &amp;&amp; getUrl() != null &amp;&amp; availablecheck)) {
            try {
                Invoker<t> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);    // @4
                if (rinvoker != null) {
                    invoker = rinvoker;
                } else {
                    //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
                    int index = invokers.indexOf(invoker);
                    try {
                        //Avoid collision
                        invoker = index &lt; invokers.size() - 1 ? invokers.get(index + 1) : invoker;
                    } catch (Exception e) {
                        logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
                    }
                }
            } catch (Throwable t) {
                logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
            }
        }
        return invoker;
    }

Code @ 1: if there is only one optional Invoker, it will be returned directly.

Code @ 2: if there are only two invokers and one of them has been selected, return another unselected Invoker.

Code @ 3: call the loadBalance load balancing algorithm and select a service provider.

Code @ 4: if the selected Invoker has been selected, select it again. Here is a question: why not filter out the selected Invoker before selection.

This is how to select a service provider algorithm from the list of service providers. Next, we will analyze the cluster fault tolerance methods provided by Dubbo.

2. Source code analysis of Dubbo cluster strategy

2.1 source code analysis FailoverClusterInvoker (FailoverCluster, dubbo default policy)

Policy: after failure, automatically select other service providers to retry. The number of retries is set by the retries property, < Dubbo: reference retries = "2" / > by default, it is 2, representing 2 retries, and the maximum number of retries is 3.

FailoverClusterInvoker#doInvoke

  public Result doInvoke(Invocation invocation, final List<invoker<t>&gt; invokers, LoadBalance loadbalance) throws RpcException {
        List<invoker<t>&gt; copyinvokers = invokers;
        checkInvokers(copyinvokers, invocation);
        int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;      // @1
        if (len &lt;= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<invoker<t>&gt; invoked = new ArrayList<invoker<t>&gt;(copyinvokers.size()); // invoked invokers.
        Set<string> providers = new HashSet<string>(len);      // @2
        for (int i = 0; i &lt; len; i++) {   // @3
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            if (i &gt; 0) {     // @4
                checkWhetherDestroyed();
                copyinvokers = list(invocation);
                // check again
                checkInvokers(copyinvokers, invocation);
            }
            Invoker<t> invoker = select(loadbalance, invocation, copyinvokers, invoked);    // @5
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                Result result = invoker.invoke(invocation);                                                        // @6
                if (le != null &amp;&amp; logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + invocation.getMethodName()
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + " (" + providers.size() + "/" + copyinvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());       // @7
            }
        }
        throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
                + invocation.getMethodName() + " in the service " + getInterface().getName()
                + ". Tried " + len + " times of the providers " + providers
                + " (" + providers.size() + "/" + copyinvokers.size()
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + (le != null ? le.getMessage() : ""), le != null &amp;&amp; le.getCause() != null ? le.getCause() : le);
    }

Code @ 1: first verify the list of service providers. If it is empty, an RpcException will be thrown, indicating that no service provider is available.

Code @2: build Set< Stirng> providers, mainly used for the address of the called service provider. If this call fails, the called service provider information will be printed in the log information.

Code @ 3, number of cycles, equal to retries + 1.

Code @ 4: if I > 0, it means service call. At this time, you need to call directory list method again to get the minimum service provider list.

Code @ 5: select Invoker according to the load balancing algorithm, and analyze in detail later.

Code @ 6: according to the load algorithm, the routing algorithm selects a service provider from the service provider list and initiates an RPC call.

Code @ 7: add the address of this service provider to the providers collection. If the normal call cannot be completed after multiple retries, this information will be included in the error log.

2.2 source code analysis: AvailableClusterInvoker

Policy: select the first available service provider in the cluster. Disadvantages: it is equivalent to the primary and secondary service, but only one service provider carries the traffic at the same time, and does not use the load balancing mechanism of the cluster. AvailableClusterInvoker#doInvoke

public Result doInvoke(Invocation invocation, List<invoker<t>&gt; invokers, LoadBalance loadbalance) throws RpcException {
        for (Invoker<t> invoker : invokers) {
            if (invoker.isAvailable()) {
                return invoker.invoke(invocation);
            }
        }
        throw new RpcException("No provider available in " + invokers);
}

Traverse the list of service providers, select the first available service provider, and then perform an RPC service call. If the call fails, it fails.

2.3 source code analysis BroadcastClusterInvoker

Policy: broadcast call will call all service providers. If a service caller fails, it will not fuse. If a service provider fails to call, the whole call is considered as failure. Scene: refresh cache.

public Result doInvoke(final Invocation invocation, List<invoker<t>&gt; invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);                                     // @1
        RpcContext.getContext().setInvokers((List) invokers);
        RpcException exception = null;
        Result result = null;
        for (Invoker<t> invoker : invokers) {   // @2
            try {    
                result = invoker.invoke(invocation);
            } catch (RpcException e) {
                exception = e;
                logger.warn(e.getMessage(), e);
            } catch (Throwable e) {
                exception = new RpcException(e.getMessage(), e);
                logger.warn(e.getMessage(), e);
            }
        }
        if (exception != null) {   // @3
            throw exception;
        }
        return result;
    }

Code @ 1: check the list of service providers. If it is empty, an exception without service provider will be thrown.

Code @ 2: traverse the service provider list, call the invoker of the service provider in turn, wrap each service call with a try catch statement, record the exception information when the service call is abnormal, but do not return immediately, broadcast mode, whether each service provider call is asynchronous or synchronous depends on the configuration of the service call, the default is synchronous call.

Code @ 3: as long as one of the service calls is sent once, an exception message will be thrown, which is encapsulated as RpcException.

2.4 source code analysis FailbackClusterInvoker

Policy: after the call fails, it returns success, but it will retry regularly in the background, the number of retries (repeated) Scenario: usually used for message notification, but after the consumer restarts, the retry task is lost.

FailbackClusterInvoker#doInvoke

protected Result doInvoke(Invocation invocation, List<invoker<t>&gt; invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);  // @1
            Invoker<t> invoker = select(loadbalance, invocation, invokers, null);   // @2
            return invoker.invoke(invocation);   // @3
        } catch (Throwable e) {
            logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
                    + e.getMessage() + ", ", e);
            addFailed(invocation, this);   // @4
            return new RpcResult(); // ignore
        }
    }

Code @ 1: verify the list of service providers. If it is empty, an error will be thrown that there is no service provider.

Code @2: select a service provider according to the load balancing mechanism.

Code @ 3: initiate a remote service call. If there is an exception, call the addFailed method, add a retry task, and then return it to the caller successfully.

Next, look at the addFailed method.

FailbackClusterInvoker#addFailed

private void addFailed(Invocation invocation, AbstractClusterInvoker<!--?--> router) {  // @1 
        if (retryFuture == null) {    // @2
            synchronized (this) {
                if (retryFuture == null) {
                    retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {     // @3

                        @Override
                        public void run() {
                            // collect retry statistics
                            try {
                                retryFailed();
                            } catch (Throwable t) { // Defensive fault tolerance
                                logger.error("Unexpected error occur at collect statistic", t);
                            }
                        }
                    }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
                }
            }
        }
        failed.put(invocation, router);   // @4
    }

Code @ 1: Invocation invocation: call context; abstractclusterinvoker <? > Router: call cluster policy.

Code @ 2: if retryFuture (scheduledfuture <? > retryFuture) is empty, lock to create a scheduled scheduling task. The task calls the retryFailed method every 5s.

Code @ 3: add retry task (concurrentmap < invocation, abstractclusterinvoker <? > > failed). Presumably, the retryFailed method is to traverse the failed method. If the call is successful, it will be removed. If the call is unsuccessful, it will be put in again.

FailbackClusterInvoker#retryFailed

void retryFailed() {
        if (failed.size() == 0) {
            return;
        }
        for (Map.Entry<invocation, abstractclusterinvoker<?>&gt; entry : new HashMap<invocation, abstractclusterinvoker<?>&gt;(    // @1
                failed).entrySet()) {
            Invocation invocation = entry.getKey();
            Invoker<!--?--> invoker = entry.getValue();
            try {
                invoker.invoke(invocation);   // @2
                failed.remove(invocation);    // @3
            } catch (Throwable e) {
                logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
            }
        }
    }

Code @ 1: traverse the list to be retried, and then make a remote call. If the call is successful, it will be removed from the collection. If only the selection fails, it will not be removed from the list to be retried. That is to say, if the consumer does not restart, the call will be repeated until it succeeds.

2.5 source code analysis FailfastClusterInvoker

Policy: fast failure, throw an exception immediately after the service call fails, and do not retry. Scenario: modify class service (service call without idempotent)

public Result doInvoke(Invocation invocation, List<invoker<t>&gt; invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);     // @1
        Invoker<t> invoker = select(loadbalance, invocation, invokers, null);   // @2
        try {
            return invoker.invoke(invocation);    // @3
        } catch (Throwable e) {
            if (e instanceof RpcException &amp;&amp; ((RpcException) e).isBiz()) { // biz exception.
                throw (RpcException) e;
            }
            throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " +   
                 loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + 
                 invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to 
                 perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);    // @4
        }
    }

Code @ 1: check the service provider. If the list of service providers is empty, throw the error of no service provider.

Code @ 2: select a service provider according to the load algorithm.

Code @ 3: initiate RPC service call.

Code @ 4: if the service call is abnormal, throw an exception and print the service consumer and service provider information.

2.6 source code analysis FailsafeClusterInvoker

Policy: after the service call fails, only the error log will be printed, and then the service call success will be returned. Scenario: call audit, log class service interface.

FailsafeClusterInvoker#doInvoke

public Result doInvoke(Invocation invocation, List<invoker<t>&gt; invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);   // @1
            Invoker<t> invoker = select(loadbalance, invocation, invokers, null);  // @2
            return invoker.invoke(invocation);   // @3
        } catch (Throwable e) {
            logger.error("Failsafe ignore exception: " + e.getMessage(), e);
            return new RpcResult(); // ignore
        }
}

Code @ 1: check the service provider. If the list of service providers is empty, throw the error of no service provider.

Code @ 2: select a service provider according to the load algorithm.

Code @ 3: call RPC service. If there is an exception, record the error stack information and return success.

2.7 source code analysis

Policy: call multiple service providers in parallel. When a service provider returns success, it returns success. Scenario: a scenario with high real-time requirements, but a waste of server resources, you can usually set the concurrent call degree through the forks parameter.

ForkingClusterInvoker#doInvoke

public Result doInvoke(final Invocation invocation, List<invoker<t>&gt; invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);   // @1
        final List<invoker<t>&gt; selected;          
        final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);              // @2
        final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        if (forks &lt;= 0 || forks &gt;= invokers.size()) {
            selected = invokers;
        } else {
            selected = new ArrayList<invoker<t>&gt;();
            for (int i = 0; i &lt; forks; i++) {
                // TODO. Add some comment here, refer chinese version for more details.
                Invoker<t> invoker = select(loadbalance, invocation, invokers, selected);
                if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
                    selected.add(invoker);
                }
            }
        }
        RpcContext.getContext().setInvokers((List) selected);
        final AtomicInteger count = new AtomicInteger();
        final BlockingQueue<object> ref = new LinkedBlockingQueue<object>();
        for (final Invoker<t> invoker : selected) {   // @3
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Result result = invoker.invoke(invocation);
                        ref.offer(result);
                    } catch (Throwable e) {
                        int value = count.incrementAndGet();
                        if (value &gt;= selected.size()) {
                            ref.offer(e);
                        }
                    }
                }
            });
        }
        try {
            Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);   // @4
            if (ret instanceof Throwable) {
                Throwable e = (Throwable) ret;
                throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
            }
            return (Result) ret;
        } catch (InterruptedException e) {
            throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
        }
    }

Code @ 1: check the service provider. If the list of service providers is empty, throw the error of no service provider.

Code @ 2: get the forks attribute. It seems that you can only set forks in < Dubbo: reference / > by using < Dubbo: parameter key = "Forks" value = "" / >. The default value is 2. If the forks value is greater than the number of service providers, all service providers will be called. If the forks value is less than the number of service providers, the load balancing algorithm will be used to select forks service providers.

Code @ 3: asynchronously make RPC calls to the service provider in turn, and add the results to BlockingQueue < Object > Ref. if the service call sends errors and the number of errors is greater than or equal to the number of this call, put the error information into BlockingQueue < Object > Ref. otherwise, increase the number of errors by 1.

Code @ 4: Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS) to get the result from the queue. If the queue is not empty, it will block waiting until the timeout. When one call succeeds, it will return and ignore other call results.

This paper focuses on the analysis of Dubbo cluster fault-tolerant mechanism, routing discovery, routing algorithm, load balancing, etc. How to work together to complete Dubbo's service calls, and analyzes various Dubbo cluster strategies in detail, such as failover, failfast, failsafe, failback, forking, available and other implementation details.

The author introduces: Ding Wei, author of "RocketMQ technology insider", RocketMQ community sermons, public address: Middleware interest circle At present, maintainers have successively published source code analysis Java collection, Java Concurrent contract (JUC), Netty, Mycat, Dubbo, RocketMQ, Mybatis and other source code columns. You can click on the link: Middleware knowledge planet , discuss high concurrent and distributed service architecture and exchange source code together. </t></object></object></t></invoker<t></invoker<t></invoker<t></t></invoker<t></t></invoker<t></invocation,></invocation,></t></invoker<t></t></invoker<t></t></invoker<t></t></string></string></invoker<t></invoker<t></invoker<t></invoker<t></t></t></invoker<t></invoker<t></t></t></invoker<t></invoker<t></t></invoker<t></invoker<t></invoker<t>

Tags: Programming Dubbo Java Load Balance Attribute

Posted on Sun, 12 Jan 2020 09:00:19 -0800 by Infinitive