Using thread pool and memory queue to deal with business problems asynchronously

background

When there are a large number of the same tasks in the business of the system (such as sending a large number of mail), and the time of each task is relatively long, the front segment needs to respond quickly. For this demand, we can use message queue for asynchronous notification, and also use thread pool + memory queue for asynchronous notification to deal with business problems.

code implementation

Send mail as demo

Mailbox entity class

@Data
public class Email implements Serializable {
	
	private static final long serialVersionUID = 1L;
	
	/**
	 * Self increasing primary key
	 */
	private Long id;
	
	/**
	 * Recipient mailbox (multiple commas separated)
	 */
	private String receiveEmail;
	
	/**
	 * theme
	 */
	private String subject;
	
	/**
	 * send content
	 */
	private String content;
	
	/**
	 * Template
	 */
	private String template;
	
	/**
	 * Sending time
	 */
	private Timestamp sendTime;
}

Mail queue

public class MailQueue {
	 //Queue size
    static final int QUEUE_MAX_SIZE   = 1000;

    static BlockingQueue<Email> blockingQueue = new LinkedBlockingQueue<Email>(QUEUE_MAX_SIZE);
    
    /**
     * Private default constructor to ensure that the external world cannot be instantiated directly
     */
    private MailQueue(){};
    /**
     * Class level inner class, that is, static member inner class, instances of the inner class and instances of the outer class
     * There is no binding relationship, and it will only be loaded when it is called, thus realizing delayed loading
     */
    private static class SingletonHolder{
    	/**
    	 * Static initializer, JVM to ensure thread safety
     	 */
			private  static MailQueue queue = new MailQueue();
    }
    //Singleton queue
    public static MailQueue getMailQueue(){
        return SingletonHolder.queue;
    }
    //Production entry
    public  void  produce(Email mail) throws InterruptedException {
    	blockingQueue.put(mail);
    }
    //Consumption team
    public  Email consume() throws InterruptedException {
        return blockingQueue.take();
    }
    // Get queue size
    public int size() {
        return blockingQueue.size();
    }
}

In fact, the number of valid conversions of 1000 concurrent requests in the mail consumption queue is 2 = core > pool > size + work > queue > size,

When core > pool > size = 2, Max > pool > size = 5, work > queue > size = 50, the effective conversion number of 1000 concurrent requests is 55

So you can set the size of the queue buffer pool and the maximum number of threads according to your business access.

@Component
public class ConsumeMailQueue {
	private static final Logger logger = LoggerFactory.getLogger(ConsumeMailQueue.class);

  //General business
	@Autowired
	IMailService mailService;

	// Minimum number of thread pool maintenance threads
	private final static int CORE_POOL_SIZE = 1;
	// Maximum number of thread pool maintenance threads
	private final static int MAX_POOL_SIZE = 1;
	// Idle time allowed by thread pool to maintain threads
	private final static int KEEP_ALIVE_TIME = 0;
	// Size of the buffer queue used by the thread pool
	private final static int WORK_QUEUE_SIZE = 1;

	// Message buffer queue
	Queue<Email> msgQueue = new LinkedList<Email>();


	//Handler used when execution is blocked due to out of thread range and queue capacity
	final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
		@Override
		public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
			logger.info("The thread pool is too busy to handle too many tasks.........Extra threads will be put in msgQueue");
			//You can open a new scheduler to process these scheduling tasks, or save the unprocessed tasks to the database, and then the scheduled tasks continue to process
			msgQueue.add(((PollMail)r).getEmail());


		}
	};

	// Task thread pool
	final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
			CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME,
			TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), handler);


	// Scheduling thread pool. This thread pool supports the need to perform tasks on a scheduled and periodic basis.
	final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);


	@PostConstruct
	public void init() {

		//Enable mail consumption queue check
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					while (true){
						Email mail = MailQueue.getMailQueue().consume();
						logger.info("Total messages left:{}",MailQueue.getMailQueue().size());
						threadPool.execute(new PollMail(mailService,mail));
					}
				} catch (InterruptedException e) {
					logger.info("Mail queue consumption failed due to---->",e.getMessage());
				}


			}
		}).start();
	}


	//lombok
	@Data
	class PollMail implements Runnable {

		IMailService mailService;

		Email email;

		public PollMail(IMailService mailService,Email email) {
			this.mailService = mailService;
			this.email = email;
		}

		@Override
		public void run() {
			logger.info("The message being processed is----->{}",this.email.getEmail());
			mailService.dealSend(this.email);
		}
	}
	@PreDestroy
	public void stopThread() throws InterruptedException {

		/**
		 * pool.awaitTermination(1, TimeUnit.SECONDS)
		 * Check whether the execution is completed every second (the status is TERMINATED),
		 * When exiting from the while loop, the thread pool is completely terminated.
		 */
		scheduler.shutdown();
		threadPool.shutdown();
		while (!threadPool.awaitTermination(1, TimeUnit.SECONDS)) {
			logger.info("The thread is still executing...");
		}
	}


}

		

The control layer code is as follows

@Api(tags ="Mail management")
@RestController
@RequestMapping("/mail")
public class mailController {

	@Autowired
	private IMailService mailService;
	
	@PostMapping("send")
	public Result send(Email mail) throws InterruptedException {
		mailService.send(mail);
		return  Result.ok();
	}

}

The interface layer code is as follows

/**
 * @description:
 * @author: Simple heart
 * @version:
 * @modified By:1170370113@qq.com
 */
public interface IMailService {

	/**
	 * Mail sending business
	 * @param mail
	 */
	void send(Email mail) throws InterruptedException;

	/**
	 * Process mail tasks that need to be sent
	 * @param email
	 */
	void dealSend(Email email);
}

The interface implementation layer code is as follows

@Service
public class MailServiceImpl implements IMailService {
	private static final Logger logger = LoggerFactory.getLogger(MailServiceImpl.class);


	@Override
	public void send(Email mail) {

		try {
			MailQueue.getMailQueue().produce(mail);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

 //Used to represent a valid request that is finally reached
	static AtomicInteger flag=new AtomicInteger(0);

	@Override
	public void dealSend(Email email) {
		try {
			Thread.sleep(500);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		logger.info("The email information has been sent. The specific content is as follows:------->{}",email.toString());
		logger.info("Total valid requests reached {}",flag.addAndGet(1));
	}

}

Tags: Programming jvm Database Lombok

Posted on Tue, 03 Dec 2019 16:11:48 -0800 by vlcinsky