The 6th volume of thread pool: expanding economy and improving the sky

Let's talk about Doug Lea's business experience

1. The first stage: Executor

I didn't want so much when I started my business in the early days. I just wanted to do business. So there's only one way: execute()

public interface Executor {
    void execute(Runnable command);
}

2. The second stage: ExecutorService

Development is the last word. We should be strict with ourselves.

public interface ExecutorService extends Executor {
	void shutdown(); // Normal closing
	List<Runnable> shutdownNow(); //When the bandits come, close the door immediately
	boolean isShutdown(); // Judge whether the door is closed
	boolean isTerminated(); // Is it bankrupt?
	boolean awaitTermination(long timeout, TimeUnit unit) // Waiting for bankruptcy
	submit() // Serve a guest
	invokeAll() // Serve multiple guests and return everyone's status and results
	invokeAny() // Serve multiple guests and only return the comments of the people who have finished eating
}

.
.
.
The above part is enough to expand our topic. For more details, please refer to: 4th volume of thread pool: ROC spreads its wings and hates heaven's low

State management of thread pool

Life is unpredictable, things are unpredictable, business of course, there are successes and failures In order to facilitate the management of the head office, of course, it is necessary to master the state of the chain stores created in real time.

The implementation of Doug Lea is in ThreadPoolExecutor. Now let's look at the status of the downline process pool in combination with the source code:

public class ThreadPoolExecutor extends AbstractExecutorService {
	/**
	*The main pool control state, ctl, is an atomic integer packing
    * two conceptual fields:
    * Thread pool control state: contains two parts.
    *   workerCount(indicating the effective number of threads
    *   runState(Status of thread pool): indicating which running, shutting down etc
	*/
 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
	/**
	* In order to pack them into one int, we limit workerCount to
    * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
    * billion) otherwise representable.
	*  To limit it to an Int, limit the number of cooks to about 500 million
	*/
	private static final int COUNT_BITS = Integer.SIZE - 3;
 	// runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
}

1. Talk about the meaning of runState's lifecycle state:

  1. RUNNING: normal business: accept new tasks and process queued tasks
  2. SHUTDOWN: closing status: do't accept new tasks, but process queued tasks
  3. STOP: shutdown status: all tasks are suspended, and try to interrupt the executing tasks (Don't accept new tasks, don't process queued tasks, and interrupt in progress tasks)
  4. TIDYING: Liquidation status: all tasks have terminated, workercount is zero, the thread transitioning to state TIDYING will run the terminated() hook method
  5. TERMINATED: Bankruptcy status: past ( terminated() has completed)
    • (SHUTDOWN and pool and queue empty), in SHUTDOWN state, and both thread pool and task queue are empty
    • (STOP and pool empty), in STOP state, thread pool is empty

2. The life course of a ThreadPoolExecutor

It's hard to know that life has its limits

3. Methods involving runState

To say more, the source code of thread pool is very good, except that it's a bit urgent to judge the statement without {}

3.1 execute()

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        // Step 1: judge workercount < corepoolsize, then add worker
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // Step 2: judge the status of thread pool and add tasks successfully
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // Since the above is not an atomic operation, double check is required to determine whether the task is not in RUNNING state and can be deleted from the task queue (see remove() method for details). The rejection policy is executed.
            if (! isRunning(recheck) && remove(command)){
           		 reject(command);
            }else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
          //Step 3: if the task cannot be added, add worker, and if it fails, execute the rejection policy
        }else if (!addWorker(command, false)){
        	reject(command);
        }       
    }

Here's the remove() method

 public boolean remove(Runnable task) {
        boolean removed = workQueue.remove(task);
        // The point here is to try to terminate the operation.
        tryTerminate(); // In case SHUTDOWN and now empty
        return removed;
    }

3.2 shutdown()

Code logic is very clear, not much verbose

 public void shutdown() {
 		// Ensure thread safety, use reentrant lock, and comment if you need help
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
        	// 1. Inspection authority
            checkShutdownAccess();
            // 2. Set the runState state to SHUTDOWN,
            advanceRunState(SHUTDOWN);
            // 3. Interrupt all * * idle threads**
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        // Attempt to terminate thread pool
        tryTerminate();
    }

3.3 shutdownNow()

Pay attention to the difference between comparison and shutdown()

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            // Break all threads
            interruptWorkers();
            // Return to unimplemented tasks
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        // Attempt to terminate thread pool
        tryTerminate();
        return tasks;
    }

3.4 awaitTermination()

 public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
        	// Here, the status of the dead loop polling thread pool and the determination of the waiting time
            for (;;) {
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }

This is about the status of thread pool. In fact, with your experience of opening a shop as a boss, the context is very clear
RTFSC is more recommended here Doug Lea's code logic and comments are very good

Published 17 original articles, won praise 14, visited 90000+
Private letter follow

Posted on Thu, 05 Mar 2020 02:50:32 -0800 by marting