java thread pool source code interpretation

There are two kinds of threads: kernel level thread (KLT) and user level thread (ULT)

Let's start with the process:
Process is the basic unit of resource ownership. Process switching needs to save process state, which will cause waste of resources.
In the same process, threads share the resources owned by the process; thread switching will not cause process switching, and the required resources are far less than process switching, which can improve efficiency.

Kernel level threads:

All operations (creation, revocation) managed by threads are performed by the operating system kernel
The kernel of the operating system provides an application interface api for developers to use KLT
Thread features:
One thread in the process is blocked, and the kernel can schedule other threads (ready state) of the same process to occupy the processor
In a multiprocessor environment, the kernel can schedule multiple threads of the same process at the same time, map these threads to different processor cores, and improve the execution efficiency of the process.
Application threads run in user mode, and thread scheduling and management are implemented in the kernel. When scheduling a thread, the control right changes from one thread to another, which requires mode switching, and the system overhead is large.

User level threads:

Therefore, the creation, messaging, scheduling, save / restore context of threads are all completed by thread library.
The kernel is not aware of multithreading. The kernel continues to schedule processes with an execution state (ready, running, blocking, etc.) assigned to the process
Thread features:
Thread switching does not need kernel mode, which can save mode switching overhead and kernel resources.
Allows processes to choose different scheduling algorithms to schedule threads according to specific needs. The scheduling algorithm needs to be implemented by itself.
Because it does not need kernel support, it can run across OS.
If a thread is blocked, the whole process will be blocked.

JVM belongs to kernel level thread

When to use thread pool:
The processing time of a single task is relatively short
There are a lot of tasks to deal with

Advantages of thread pool:
Reuse existing threads, reduce the cost of thread creation and death, and improve performance
Improve response speed. When the task arrives, it can be executed immediately without thread creation
Improve thread manageability


After jdk1.5, JUC (java.util.concurrent) provides us with a variety of thread pools. We can choose the right one according to our needs.
Let's first look at the types:
newCachedThreadPool
newFixedThreadPool
newSingleThreadExecutor
newWorkStealingPool(@since 1.8)
newScheduledThreadPool / / timed thread pool
newSingleThreadScheduledExecutor

Follow the source code to see the basic execution steps of these thread pools:

// The following code creates a thread pool with a length of 10
ExecutorService executorService = Executors.newFixedThreadPool(10);
// Follow the source code to see what operations have been done
Executors.java
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

ThreadPoolExecutor.java
public ThreadPoolExecutor(int corePoolSize, // Number of general worker threads in thread pool 
                              int maximumPoolSize, // Maximum number of threads in thread pool
                              long keepAliveTime,// Maximum thread idle lifetime
                              TimeUnit unit, // Survival time unit
                              BlockingQueue<Runnable> workQueue) { // Store the task queue, which is a blocking queue
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
						  int maximumPoolSize,
						  long keepAliveTime,
						  TimeUnit unit,
						  BlockingQueue<Runnable> workQueue,
						  ThreadFactory threadFactory,
						  RejectedExecutionHandler handler) {// Rejection strategy, that is, when all threads in the thread pool are working and tasks are added, the measures taken for new tasks
	if (corePoolSize < 0 ||
		maximumPoolSize <= 0 ||
		maximumPoolSize < corePoolSize ||
		keepAliveTime < 0)
		throw new IllegalArgumentException();
	if (workQueue == null || threadFactory == null || handler == null)
		throw new NullPointerException();
	this.corePoolSize = corePoolSize;
	this.maximumPoolSize = maximumPoolSize;
	this.workQueue = workQueue;
	this.keepAliveTime = unit.toNanos(keepAliveTime);
	this.threadFactory = threadFactory;
	this.handler = handler;
}
// Thread pool creation completed

// Generally, there are four rejection policies. If these four rejection policies fail to meet the business requirements, you can customize the rejection policy.
CallerRunsPolicy // Retry adding the current task, and he will automatically call the execute() method repeatedly until it succeeds
AbortPolicy // Discard the task and throw an exception RejectedExecutionException
DiscardPolicy // Discard tasks
DiscardOldestPolicy // Discard the task with the longest waiting time in the queue and add the new task to the queue

// When we add a task to the thread pool, what does the thread pool do?
executorService.execute(new Runnable() {
	@Override
	public void run() {
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
  });
  
// Step by step debugging the above code, and the underlying execution logic is as follows:
public void execute(Runnable command) {
	if (command == null)// Task cannot be empty
		throw new NullPointerException();
	int c = ctl.get();
	if (workerCountOf(c) < corePoolSize) {// If the number of worker threads is less than the number of regular worker threads, add worker threads and bind tasks
		if (addWorker(command, true))
			return;
		c = ctl.get();
	}
	if (isRunning(c) && workQueue.offer(command)) {// Task join queue
		int recheck = ctl.get();
		if (! isRunning(recheck) && remove(command))
			reject(command);
		else if (workerCountOf(recheck) == 0)
			addWorker(null, false);
	}
	else if (!addWorker(command, false))
		reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
	retry:
	for (;;) {
		int c = ctl.get();
		int rs = runStateOf(c);

		// Check if queue empty only if necessary.
		if (rs >= SHUTDOWN &&
			! (rs == SHUTDOWN &&
			   firstTask == null &&
			   ! workQueue.isEmpty()))
			return false;

		for (;;) {
			int wc = workerCountOf(c);
			if (wc >= CAPACITY ||
				wc >= (core ? corePoolSize : maximumPoolSize))
				return false;
			if (compareAndIncrementWorkerCount(c))
				break retry;
			c = ctl.get();  // Re-read ctl
			if (runStateOf(c) != rs)
				continue retry;
			// else CAS failed due to workerCount change; retry inner loop
		}
	}

	boolean workerStarted = false;
	boolean workerAdded = false;
	Worker w = null;
	try {
		w = new Worker(firstTask);// Create a new worker thread
		final Thread t = w.thread;
		if (t != null) {
			final ReentrantLock mainLock = this.mainLock;
			mainLock.lock();
			try {
				int rs = runStateOf(ctl.get());

				if (rs < SHUTDOWN ||
					(rs == SHUTDOWN && firstTask == null)) {
					if (t.isAlive()) // precheck that t is startable
						throw new IllegalThreadStateException();
					workers.add(w);// The worker thread is added to the set of stored workers
					int s = workers.size();
					if (s > largestPoolSize)
						largestPoolSize = s;
					workerAdded = true;
				}
			} finally {
				mainLock.unlock();
			}
			if (workerAdded) {
				t.start();// Startup thread
				workerStarted = true;
			}
		}
	} finally {
		if (! workerStarted)
			addWorkerFailed(w);
	}
	return workerStarted;
}

public void run() {
	runWorker(this);
}
final void runWorker(Worker w) {
	Thread wt = Thread.currentThread();
	Runnable task = w.firstTask;
	w.firstTask = null;
	w.unlock(); // allow interrupts
	boolean completedAbruptly = true;
	try {
		while (task != null || (task = getTask()) != null) {
			w.lock();
			// If pool is stopping, ensure thread is interrupted;
			// if not, ensure thread is not interrupted.  This
			// requires a recheck in second case to deal with
			// shutdownNow race while clearing interrupt
			if ((runStateAtLeast(ctl.get(), STOP) ||
				 (Thread.interrupted() &&
				  runStateAtLeast(ctl.get(), STOP))) &&
				!wt.isInterrupted())
				wt.interrupt();
			try {
				beforeExecute(wt, task);
				Throwable thrown = null;
				try {
					task.run();
				} catch (RuntimeException x) {
					thrown = x; throw x;
				} catch (Error x) {
					thrown = x; throw x;
				} catch (Throwable x) {
					thrown = x; throw new Error(x);
				} finally {
					afterExecute(task, thrown);
				}
			} finally {
				task = null;
				w.completedTasks++;
				w.unlock();
			}
		}
		completedAbruptly = false;
	} finally {
		processWorkerExit(w, completedAbruptly);
	}
}

The execution process of thread pool is as follows:

// Status identifier bit of thread pool: ThreadPoolExecutor.java
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //The high 3 bits indicate the status, and the last 29 bits indicate the number of threads (2^29)-1 (about 500 million) threads

// Thread pool status:
private static final int RUNNING    = -1 << COUNT_BITS; // Accept new tasks and process queued tasks
private static final int SHUTDOWN   =  0 << COUNT_BITS; // Do not accept new tasks, but process queued tasks
private static final int STOP       =  1 << COUNT_BITS; // Do not accept new tasks, do not process queued tasks, and interrupt ongoing tasks
private static final int TIDYING    =  2 << COUNT_BITS; // All tasks are terminated, workercount is zero, and the thread converted to state pending will run the terminated() hook method
private static final int TERMINATED =  3 << COUNT_BITS; // terminated() completed

Tags: Programming Java less jvm

Posted on Mon, 04 Nov 2019 22:58:26 -0800 by Jimbit