Source Code Analysis Elastic Job Failure Transfer Mechanism

This section explores Elastic Job failover mechanisms. We know that Elastic Job is a distributed task scheduling framework based on Quartz, where the distribution is the distribution of data. Elastic Job's core design concept is that a task is executed on multiple nodes, each node processes a part of the data (task to be processed data fragmentation). If a task node goes down, part of the data will not be processed during a task scheduling period. In order to solve the problem that the data of the task execution part of a task in a scheduling period is not processed due to the downtime of the task node, it can be set up to turn on failover and transfer the task to other normal nodes for execution, so that the task can be progressed on a single node. With the same effect of row scheduling (the amount of data processed in this scheduling), Elastic Job failover class diagram is shown as follows:

  • Failover Listener Manager: Failover Listener Manager.
  • Failover Listener Manager $JobCrashedJobListener job implementation (Job instance downtime) event listener manager.
  • Failover Listener Manager $Failover Settings Changed JobListener Failover Transfer Configuration Change Event Listener.

1. Fault Failure Transfer Event Monitor Manager

1.1 JobCrashedJobListener Job instance node downtime event listener

class JobCrashedJobListener extends AbstractJobListener {
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {     // @1
                String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);                                 // @2
                if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {               // @3
                List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);                                                  //@4
                if (!failoverItems.isEmpty()) {                                                                                                                               //@5
                    for (int each : failoverItems) {
                } else {
                    for (int each : shardingService.getShardingItems(jobInstanceId)) {                                                                //@6

Code @1: If a failover mechanism is set up in the configuration file and the deletion event of the child node under ${namespace}/jobname/instances node is monitored, the node is considered to be down and the failover-related logic will be executed.
Code @2: Get the job instance ID (job Instance Id) of the downed task.
Code @3: If the deleted task node ID is the same as the ID of the current instance, it is ignored.
Code @4: Get the collection of failover fragments of the job server according to job InstanceId.

The implementation logic is as follows: FailoverService getFailover Items

     * Gets the collection of failed transfer fragments for the job server.
     * @param jobInstanceId Primary Key of Job Running Example
     * @return Piecewise Item Set of Job Failure Transfer
    public List<Integer> getFailoverItems(final String jobInstanceId) {
        List<String> items = jobNodeStorage.getJobNodeChildrenKeys(ShardingNode.ROOT);
        List<Integer> result = new ArrayList<>(items.size());
        for (String each : items) {
            int item = Integer.parseInt(each);
            String node = FailoverNode.getExecutionFailoverNode(item);
            if (jobNodeStorage.isJobNodeExisted(node) && jobInstanceId.equals(jobNodeStorage.getJobNodeDataDirectly(node))) {
        return result;

First, get the direct child node (current fragmentation information) in the ${namespace}/ jobname/sharing directory, and determine whether the ${namespace}/jobname/sharding/{item}/failover node exists, if it exists, whether the fragment is a fragmentation node of the current task, and if it is, return. The main purpose of this method is to obtain the fragmentation information that has been transferred to the current task node.

Code @5, to determine whether there is a failure to move fragmented to the current node, the initial state must be empty, will execute code @6, set up a failover-related preparation environment.

Code @6, get all the fragmentation nodes assigned to Crashed (downtime job instance), traverse the fragments that have been broken down, set these fragments as failures, pending failover, and set them as failures by creating ${namespace}/jobname/leader/failover/items/{item}.

Code @7: Execute Failover Service# Failover IfNecessary or not.

     * If failover is required, job failover is performed.
    public void failoverIfNecessary() {
        if (needFailover()) {
            jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
    private boolean needFailover() {
        return jobNodeStorage.isJobNodeExisted(FailoverNode.ITEMS_ROOT) && !jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).isEmpty()
                && !JobRegistry.getInstance().isJobRunning(jobName);

The idea of implementation is as follows: [needFailover method] First, it judges whether there is a dollar, {namespace}/jobname/leader/failover/items node, and whether there are sub-nodes under the node, and if the node runs the task, it needs to perform failover. The logic of failover execution is also the failover selector. Its distributed lock nodes are ${namespace}/jobname/leader/failover/latch. Whoever gets the lock first executes the specific logic of failover Leader Execution Callback. The specific failure transfer algorithm is as follows:


class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {
        public void execute() {
            if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {      // @1
            int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));    // @2
            log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
            jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());  // @3
            jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));     // @4
            // TODO should not use triggerJob, but use executor to unify scheduling
            JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);    // @5
            if (null != jobScheduleController) {

Code @1: If the current instance stops running the job or failover is not required, it returns.

Code @2: Get the first fragment to failover, get ${namespace}/jobname/leader/failover/items/{itemnum, and get the fragment serial number itemnum.

Code @3: Create a temporary node ${namespace}/jobname/sharding/itemnum/failover node.

Code @4: Delete the ${namespace}/jobname/leader/failover/items/{itemnum} node.

Code @5: Trigger task scheduling and terminate the failover of the current node, then release the lock, and the next node acquires the lock to transfer the failure fragmentation under the ${namespace}/jobname/leader/failover/items directory.

PS: When a task node goes down, other nodes will listen to the instance deletion event, get its instance ID from the instance directory, and get the fragmentation information of the original allocated fault instance from ZK, and mark these fragments as needing to failover (create ${namespace}/ jobname/leader/over/items/{item} persistent node), and then judge. Whether a failover operation is required is interrupted.

The prerequisites for performing failover operations are:

  1. The current task instance also schedules the job;
  2. There are ${namespace}/jobname/leader/failover/items nodes and child nodes. If the above two conditions are met, failure transfer is performed. Leader Latch is performed by multiple surviving nodes, and distributed lock nodes (${namespace}/jobname/leader/failover/latch) are created. The nodes that acquire locks are first executed to acquire fragmented nodes. As shown above, each surviving node competes for only one fragment at a failover.

2. Fault Piecewise Re-execution Logic Analysis

The main function of the event listener mentioned above is to create ${namespace}/ jobname/ sharding/{item}/ failure over nodes when the task node fails. However, the tasks of these fragments are not really implemented. This summary will sort out the implementation of the fault node fragments.

It can be seen that fragmentation failover is the creation of failure over nodes under the corresponding fault fragmentation, which will be given priority in obtaining the fragmentation information context, which is not emphasized in the analysis of the fragmentation process. So before you go into the following, read the source code to analyze Elastic Job's fragmentation mechanism.

Back to the timing task execution entry: AbstractElasticJobExecutor#execute

     * Execute work.
    public final void execute() {
        try {
        } catch (final JobExecutionEnvironmentException cause) {
            jobExceptionHandler.handleException(jobName, cause);
        ShardingContexts shardingContexts = jobFacade.getShardingContexts();  // Getting a fragmented context environment

    public ShardingContexts getShardingContexts() {
        boolean isFailover = configService.load(true).isFailover();
        if (isFailover) {    // @1
            List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();    // @2
            if (!failoverShardingItems.isEmpty()) {
                return executionContextService.getJobShardingContext(failoverShardingItems);    // @3
        List<Integer> shardingItems = shardingService.getLocalShardingItems();
        if (isFailover) {
        return executionContextService.getJobShardingContext(shardingItems);

Code @1: When obtaining the fragmentation context, if the failover mechanism is enabled, the fragmentation context of failover will be obtained first.

Code @2: Get the implementation fragmentation information obtained by this node. Its basic logic is to traverse the byte points under & dollar; {namespace}/ jobname/sharing, get all the current fragmentation information of the task, traverse each node, get the serial number, and then determine whether there is (& dollar; {namespace}/ jobname/sharding/{item}/ failover), and the content of the node is the current instance ID, which is added to the fragmentation result.

Code @3: Construct the fragmentation context environment according to the invalid fragmentation sequence number, execute the tasks on the fragmentation, and execute the tasks according to the fragmentation context environment. [AbstractElastic Job execute (Sharing Contexts, JobExecution Event. Execution Source. NORMAL_TRIGGER)]] After the task scheduling is completed, the fault markers of the fragments will be deleted and re-fragmented when the next task scheduling.

The fault markup code for deleting fragments is as follows: LiteJobFacade registerJobCompleted

public void registerJobCompleted(final ShardingContexts shardingContexts) {
        executionService.registerJobCompleted(shardingContexts);  // @1
        if (configService.load(true).isFailover()) {
            failoverService.updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet());   // @2

Code @1: Set the scheduling task of fragmentation to complete. First, set the task in memory as non-running (JobRegistry. getInstance (). setJobRunning (false). If monitorExecution is enabled, the running tag of fragmentation needs to be deleted. The specific method is to delete & dollar; {namespace}/ jobname/sharding/{item}/ running node.

Code @2: If failover is enabled, call the updateFailover Complete method, update the failover process, delete the ${namespace}/jobname/sharding/{item}/failover node, and the next time the task is scheduled uniformly, all the fragments will be re-fragmented, thus completing a failover.


Fault transfer means that during a task scheduling period, the partitioned nodes are down, resulting in the partitioned tasks allocated to the downtime service not being executed. That data is not processed during the task scheduling period. In order to deal with that part of the database in time, Elastic Job supports failover, that is, during a task scheduling period, the other downtime services are allocated. Film context is transferred to the current surviving node for execution, and the next mobilization task will not begin until the execution is completed.

The next time the task is deployed, it will be re-fragmented.
Elastic Job is a distributed task scheduling platform, where distributed more refers to the distribution of data, that is, a task is executed on multiple fragments, each node according to the fragmentation context to obtain part of the data processing (data fragmentation).

The original release date is 2018-12-03.
Author: Ding Wei, author of RocketMQ Technology Insider.
This article comes from Interest Circle of Middleware To learn about relevant information, you can pay attention to it. Interest Circle of Middleware.

Tags: Java Fragment Database

Posted on Wed, 09 Oct 2019 20:50:02 -0700 by vomitbomb