Xxl Job Source Analysis--Start Dispatch Center

From the previous Xxl Job Helloworld You learned how to use Xxl-Job for simple step-by-step task scheduling.And you know when I use Xxl-Job.Our core needs three basic steps:

  • Start dispatch center (xxl-job-admin)
  • Start the executor (business code referencing xxl-job-core)
  • Dispatch Center adds tasks and executes them

After these three steps, the added task can be executed.Let's start with a source-code analysis of what has been done in the last three steps so that tasks can be scheduled on a regular basis.So I decided to divide it into three blog s to analyze the source implementation of xxl-job.

Starting the dispatch center, in addition to adding XXXcontroller s in the com.xxl.job.admin.controller directory to support CRUD operations on tasks through Web pages and rights management related to users and users, the core is the initialization of XxlJobAdminConfig class.It implements the InitializingBean interface in spring, so XxlJobAdminConfig#afterPropertiesSet is called when this object is initialized

    public void afterPropertiesSet() throws Exception {
        adminConfig = this;

        xxlJobScheduler = new XxlJobScheduler();
        xxlJobScheduler.init();
    }

Let's take a look at what XxlJobScheduler does mainly:

    public void init() throws Exception {
        // init i18n
        initI18n();

        // admin registry monitor run
        JobRegistryMonitorHelper.getInstance().start();

        // admin monitor run
        JobFailMonitorHelper.getInstance().start();

        // admin trigger pool start
        JobTriggerPoolHelper.toStart();

        // admin log report start
        JobLogReportHelper.getInstance().start();

        // start-schedule
        JobScheduleHelper.getInstance().start();

        logger.info(">>>>>>>>> init xxl-job admin success.");
    }

1,initI18n

    private void initI18n(){
        for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) {
            item.setTitle(I18nUtil.getString("jobconf_block_".concat(item.name())));
        }
    }

Its simpler function is to initialize the internationalization information related to the blocking policy in the new task page.Internationalization provides Chinese and English as two optional languages, defaulting to Chinese.

2,JobRegistryMonitorHelper

JobRegistryMonitorHelper opens a Daemon thread that primarily manages task call addresses registered with xxl-job-core.The code inside is simple, and its core function is the following three points:

JobRegistryMonitorHelper#start

public void start(){
		registryThread = new Thread(new Runnable() {
			@Override
			public void run() {
				while (!toStop) {
					try {
						// auto registry group
						List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
						if (groupList!=null && !groupList.isEmpty()) {

							// remove dead address (admin/executor)
							List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
							if (ids!=null && ids.size()>0) {
								XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
							}

							// fresh online address (admin/executor)
							HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
							List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
							if (list != null) {
								for (XxlJobRegistry item: list) {
									if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
										String appName = item.getRegistryKey();
										List<String> registryList = appAddressMap.get(appName);
										if (registryList == null) {
											registryList = new ArrayList<String>();
										}

										if (!registryList.contains(item.getRegistryValue())) {
											registryList.add(item.getRegistryValue());
										}
										appAddressMap.put(appName, registryList);
									}
								}
							}

							// fresh group address
							for (XxlJobGroup group: groupList) {
								List<String> registryList = appAddressMap.get(group.getAppName());
								String addressListStr = null;
								if (registryList!=null && !registryList.isEmpty()) {
									Collections.sort(registryList);
									addressListStr = "";
									for (String item:registryList) {
										addressListStr += item + ",";
									}
									addressListStr = addressListStr.substring(0, addressListStr.length()-1);
								}
								group.setAddressList(addressListStr);
								XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
							}
						}
					} catch (Exception e) {
						if (!toStop) {
							logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
						}
					}
					try {
						TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
					} catch (InterruptedException e) {
						if (!toStop) {
							logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
						}
					}
				}
				logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
			}
		});
		registryThread.setDaemon(true);
		registryThread.setName("xxl-job, admin JobRegistryMonitorHelper");
		registryThread.start();
	}
}
  • Delete invalid registered address, service goes offline automatically
  • Refresh valid registered address, service comes online automatically
  • Refresh the registered address in the executor so that when job is called remotely, the latest valid service can be called

3,JobFailMonitorHelper

JobFailMonitorHelper#start

public void start(){
		monitorThread = new Thread(new Runnable() {

			@Override
			public void run() {

				// monitor
				while (!toStop) {
					try {

						List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
						if (failLogIds!=null && !failLogIds.isEmpty()) {
							for (long failLogId: failLogIds) {

								// lock log
								int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
								if (lockRet < 1) {
									continue;
								}
								XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
								XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());

								// 1,fail retry monitor
								if (log.getExecutorFailRetryCount() > 0) {
									JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam());
									String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
									log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
									XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
								}

								// 2,fail alarm monitor
								int newAlarmStatus = 0;		// Alert status: 0-default, -1=lock status, 1-no alert, 2-alert success, 3-alert failure
								if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
									boolean alarmResult = true;
									try {
										alarmResult = failAlarm(info, log);
									} catch (Exception e) {
										alarmResult = false;
										logger.error(e.getMessage(), e);
									}
									newAlarmStatus = alarmResult?2:3;
								} else {
									newAlarmStatus = 1;
								}

								XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
							}
						}

					} catch (Exception e) {
						if (!toStop) {
							logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
						}
					}

                    try {
                        TimeUnit.SECONDS.sleep(10);
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }

                }

				logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");

			}
		});
		monitorThread.setDaemon(true);
		monitorThread.setName("xxl-job, admin JobFailMonitorHelper");
		monitorThread.start();
	}
}

JobFailMonitorHelper is used to start a Daemon thread that primarily manages exceptional calls.If a task call fails, it will determine if the task is configured for failed retries (no retries by default).Failed retries are performed if the number of configuration retries is greater than 0, and the maximum number of retries if you have been failing is the number of retries you have configured.

Another effect is if you have an alarm message configured in this task.This failed task is to send information about the task failure to the configured mailbox.

4,JobTriggerPoolHelper

public void start(){
    fastTriggerPool = new ThreadPoolExecutor(
            10,
            XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(1000),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
                }
            });

    slowTriggerPool = new ThreadPoolExecutor(
            10,
            XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(2000),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
                }
            });
}

Its purpose is to initialize the fast and slow two thread pools, which are used when tasks begin to execute.The slow-threaded pool is then used to perform tasks when the threshold number of task execution failures has been reached.Isolate the thread pool that executes tasks: the dispatch thread pool is segregated and split, and slow tasks are automatically downgraded to the "Slow" thread pool to avoid exhausting dispatch threads and improve system stability.

5,JobLogReportHelper

public void start(){
    logrThread = new Thread(new Runnable() {
        @Override
        public void run() {
            // last clean log time
            long lastCleanLogTime = 0;
            while (!toStop) {
                // 1,log-report refresh: refresh log report in 3 days
                try {
                    for (int i = 0; i < 3; i++) {
                        // today
                        Calendar itemDay = Calendar.getInstance();
                        itemDay.add(Calendar.DAY_OF_MONTH, -i);
                        itemDay.set(Calendar.HOUR_OF_DAY, 0);
                        itemDay.set(Calendar.MINUTE, 0);
                        itemDay.set(Calendar.SECOND, 0);
                        itemDay.set(Calendar.MILLISECOND, 0);
                        Date todayFrom = itemDay.getTime();
                        itemDay.set(Calendar.HOUR_OF_DAY, 23);
                        itemDay.set(Calendar.MINUTE, 59);
                        itemDay.set(Calendar.SECOND, 59);
                        itemDay.set(Calendar.MILLISECOND, 999);
                        Date todayTo = itemDay.getTime();
                        // refresh log-report every minute
                        XxlJobLogReport xxlJobLogReport = new XxlJobLogReport();
                        xxlJobLogReport.setTriggerDay(todayFrom);
                        xxlJobLogReport.setRunningCount(0);
                        xxlJobLogReport.setSucCount(0);
                        xxlJobLogReport.setFailCount(0);

                        Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);
                        if (triggerCountMap!=null && triggerCountMap.size()>0) {
                            int triggerDayCount = triggerCountMap.containsKey("triggerDayCount")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))):0;
                            int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))):0;
                            int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))):0;
                            int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;

                            xxlJobLogReport.setRunningCount(triggerDayCountRunning);
                            xxlJobLogReport.setSucCount(triggerDayCountSuc);
                            xxlJobLogReport.setFailCount(triggerDayCountFail);
                        }
                        // do refresh
                        int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);
                        if (ret < 1) {
                            XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);
                        }
                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(">>>>>>>>>>> xxl-job, job log report thread error:{}", e);
                    }
                }
                // 2,log-clean: switch open & once each day
                if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays()>0
                        && System.currentTimeMillis() - lastCleanLogTime > 24*60*60*1000) {
                    // expire-time
                    Calendar expiredDay = Calendar.getInstance();
                    expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays());
                    expiredDay.set(Calendar.HOUR_OF_DAY, 0);
                    expiredDay.set(Calendar.MINUTE, 0);
                    expiredDay.set(Calendar.SECOND, 0);
                    expiredDay.set(Calendar.MILLISECOND, 0);
                    Date clearBeforeTime = expiredDay.getTime();

                    // clean expired log
                    List<Long> logIds = null;
                    do {
                        logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000);
                        if (logIds!=null && logIds.size()>0) {
                            XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds);
                        }
                    } while (logIds!=null && logIds.size()>0);
                    // update clean time
                    lastCleanLogTime = System.currentTimeMillis();
                }
                try {
                    TimeUnit.MINUTES.sleep(1);
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        }
    });
    logrThread.setDaemon(true);
    logrThread.setName("xxl-job, admin JobLogReportHelper");
    logrThread.start();
}

Mainly report-related functions to save a log of task execution.And generate a log closing report and clean up expired logs.Support real-time viewing of running data, such as number of tasks, number of schedules, number of executors, and scheduling reports, such as schedule date distribution, schedule success distribution, etc.

6,JobScheduleHelper


Its purpose is to schedule the distribution of tasks that will be performed in the next five seconds by a dead loop from the xxl_job_info table.Start two daemon threads, let's start with scheduleThread.

conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();

First, use the for update statement to acquire the qualification lock for the task, then to acquire the task that will be executed in the next 5 seconds.There are three branches of the task's logic code.

// 1,pre read
long nowTime = System.currentTimeMillis();
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
    // 2,push time-ring
    for (XxlJobInfo jobInfo: scheduleList) {

        // time-ring jump
        if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
            // 2.1,trigger-expire > 5s: pass && make next-trigger-time
            logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

            // fresh next
            refreshNextValidTime(jobInfo, new Date());

        } else if (nowTime > jobInfo.getTriggerNextTime()) {
            // 2.2,trigger-expire < 5s: direct-trigger && make next-trigger-time

            // 1,trigger
            JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null);
            logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );

            // 2,fresh next
            refreshNextValidTime(jobInfo, new Date());

            // next-trigger-time in 5s, pre-read again
            if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {

                // 1,make ring second
                int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                // 2,push time ring
                pushTimeRing(ringSecond, jobInfo.getId());

                // 3,fresh next
                refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

            }

        } else {
            // 2.3,trigger-pre-read: time-ring trigger && make next-trigger-time

            // 1,make ring second
            int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

            // 2,push time ring
            pushTimeRing(ringSecond, jobInfo.getId());

            // 3,fresh next
            refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

        }

    }

    // 3,update trigger info
    for (XxlJobInfo jobInfo: scheduleList) {
        XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
    }

} else {
    preReadSuc = false;
}

The trigger time of the first branch's current task has timed out more than 5 seconds. Instead of executing, the next trigger time is calculated directly.

The second branch uses the JobTriggerPoolHelper class to schedule tasks, and then determines if the next execution time is within 5 seconds, the task data is cached and the processing logic is the same as the third branch.

The third branch is to follow the pushTimeRing method by modelling the trigger time by 60 seconds.

ringData is a Map collection with 0 to 59 integers as key and jobId set as value.The logic for processing this set of data is in ringThread, our second daemon thread.

while (!ringThreadToStop) {
     try {
         // second data
         List<Integer> ringItemData = new ArrayList<>();
         int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // Avoid taking too long to process, check one scale forward across the scale;
         for (int i = 0; i < 2; i++) {
             List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
             if (tmpData != null) {
                 ringItemData.addAll(tmpData);
             }
         }
         // ring trigger
         logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
         if (ringItemData.size() > 0) {
             // do trigger
             for (int jobId: ringItemData) {
                 // do trigger
                 JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
             }
             // clear
             ringItemData.clear();
         }
     } catch (Exception e) {
         if (!ringThreadToStop) {
             logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
         }
     }
     // next second, align second
     try {
         TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
     } catch (InterruptedException e) {
         if (!ringThreadToStop) {
             logger.error(e.getMessage(), e);
         }
     }
 }

Tasks are acquired on the current seconds scale and on the previous scale in a time cycle, and then, as above, are scheduled using JobTriggerPoolHelper.The third article explains how to schedule tasks.The second article tells you what the executor did when it started.

Reference article: https://www.cnblogs.com/jiangyang/p/11576931.html

173 original articles published. Zan 221. 700,000 visits+
His message board follow

Tags: Spring

Posted on Mon, 13 Jan 2020 17:09:34 -0800 by fl0w