The use of thread pool in Java Concurrent Programming

1. Why use multithreading?

With the progress of science and technology, the number of processors in computers and servers is relatively large now, and it may be more and more in the future. For example, there are eight processors in my work computer. How can I check them?

Right click Properties device manager, open the properties window, and then click device manager. Under processor, you can see all processors:

You can also get the number of processors through the following Java code:

System.out.println("CPU Number:" + Runtime.getRuntime().availableProcessors());

The operation results are as follows:

CPU number: 8

Since the number of processors has increased, if we still use traditional serial programming, it will be a bit of waste of resources. Therefore, in order to improve the utilization rate of resources and make every processor busy, we need to introduce concurrent programming. To introduce concurrent programming, we need to introduce multithreading.

It can be said that the most direct purpose of using multithreading is to improve the utilization rate of resources. When the utilization rate of resources is increased, the throughput of the system is correspondingly increased.

2. Why use thread pool?

In a certain range, increasing threads can improve the throughput of the application, but the more threads are not the better (because the creation and destruction of threads need a lot of overhead). If it exceeds a certain range, it will not only reduce the execution speed of the application, seriously, the application will even crash, so that it has to restart the application.

In order to avoid this problem, it is necessary to limit the number of threads that can be created by the application program, to ensure that when the number of threads reaches the limit, the program will not run out of resources, and the thread pool is to solve this problem.

Thread pool: resource pool that manages a set of worker threads.

Thread pools are closely related to work queues, which hold all tasks waiting to be executed.

The task of the worker thread is to get a task from the work queue, execute the task, and then return to the thread pool and wait for the next task.

Using thread pools can bring the following benefits:

  1. By reusing existing threads instead of creating new ones, you can reduce the huge overhead in the process of thread creation and destruction when dealing with multiple tasks.
  2. When the task arrives, the worker thread usually already exists, so the execution of the task will not be delayed due to waiting for the thread to be created, thus improving the responsiveness.
  3. By adjusting the size of the thread pool, you can create enough threads to keep the processor busy, and prevent the application from running out of memory or crashing due to too many threads competing for resources.

3. Create thread pool

3.1 create using Executors static method (not recommended)

The Executors class provides the following four static methods to quickly create a thread pool:

  1. newFixedThreadPool
  2. newCachedThreadPool
  3. newSingleThreadExecutor
  4. newScheduledThreadPool

First, let's see how the newFixedThreadPool() method is used:

ExecutorService threadPool = Executors.newFixedThreadPool(10);

Its source code is:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

Note: newFixedThreadPool will create a fixed length thread pool. When a task is submitted, a thread will be created until the maximum number of thread pools is reached. At this time, the size of the thread pool will not change (if a thread ends due to an unexpected Exception, the thread pool will add a new thread).

Then look at how the newCachedThreadPool() method is used:

ExecutorService threadPool = Executors.newCachedThreadPool();

Its source code is:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

Note: newCachedThreadPool will create a cacheable thread pool. If the size of the thread pool exceeds the processing demand, idle threads will be recycled. If the demand increases, new threads will be added. The maximum size of the thread pool is Integer.MAX_VALUE.

Then look at how the newsinglethreadexecution() method is used:

ExecutorService threadPool = Executors.newSingleThreadExecutor();

Its source code is:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

Description: new single thread Executor is a single thread Executor. It creates a single worker thread to execute tasks. If this thread ends abnormally, a new thread will be created to replace it.

The new single thread executor ensures that tasks are executed serially in the order they are in the queue.

Finally, let's see how the newScheduledThreadPool() method is used:

ExecutorService threadPool = Executors.newScheduledThreadPool(10);

Its source code is:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

super points to the following code:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

Note: the new scheduledthreadpool will create a fixed length thread pool and perform tasks in a delayed or timed manner, similar to Timer.

It can be found that the above four methods finally point to the following constructors of ThreadPoolExecutor, but many parameters are not specified by you, and the default value is passed:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // Omit specific code
}

Although these four methods can be used to quickly create thread pools, they are not recommended. First, many parameters are set with default values, which is not convenient for you to understand the specific meaning of each parameter. Second, the default values of parameters may cause some problems, and it is better for the user to specify them according to their own needs.

So what do these seven parameters mean? Please keep looking down.

3.2 create using ThreadPoolExecutor constructor (recommended)

ThreadPoolExecutor has the following four constructors, which is recommended for creating thread pools:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

The above three also point to the fourth constructor with the most complete parameters:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // Omit specific code
}

The following is the explanation of each parameter:

  • corePoolSize: number of core threads.

  • maximumPoolSize: maximum number of threads.

    Maximum number of threads = number of core threads + number of non core threads.

  • keepAliveTime: non core thread idle timeout.

    A non core thread will be destroyed if it does not work (idle state) for more than the time set by this parameter. If allowCoreThreadTimeOut = true is set, it will work on the core thread.

  • Unit: the time unit of the parameter keepAliveTime, such as seconds, minutes, and hours.

  • workQueue: work queue, that is, the task queue to be executed, which stores the tasks waiting to be executed.

    The blocking queues here can be selected as: LinkedBlockingQueue, ArrayBlockingQueue, SynchronousQueue, DelayedWorkQueue.

    The LinkedBlockingQueue used by the newFixedThreadPool() method by default,

    The SynchronousQueue used by default by the newCachedThreadPool() method,

    The LinkedBlockingQueue used by default by the newSingleThreadExecutor() method,

    DelayedWorkQueue used by default by the newScheduledThreadPool() method.

  • threadFactory: a thread factory used to create threads.

  • handler: saturation policy / policy when the task is rejected.

    When the workQueue is full and the number of threads in the thread pool has reached the maximum poolsize, the newly submitted task will be handed over to the RejectedExecutionHandler handler handler for processing. There are four main strategies:

    AbortPolicy: abort policy, discard task and throw unchecked RejectedExecutionException, which is also the default saturation policy.

    DiscardPolicy: discards the policy, discards the task directly, but does not throw an exception.

    Discard oldestpolicy: discard the oldest policy, discard the next task to be executed, and then try to resubmit the new task.

    CallerRunsPolicy: the caller runs the policy to return the task to the caller and execute the task in the thread where the caller is.

4. Operation principle of thread pool

You can understand the operation principle of thread pool through the following two figures:

1) if the thread in the thread pool is smaller than the corePoolSize, a new thread will be created to handle the task. At this time, the created thread is the core thread.

2) if the thread in the thread is equal to or greater than corePoolSize, put the task into the work queue, that is, BlockingQueue in the figure above.

3) if the work queue is full and the task cannot be added to the BlockingQueue, a new thread will be created to process the task. At this time, the created thread is a non core thread, which will be recycled and destroyed after a period of idle time (keepAliveTime and unit are used to define the idle time).

4) if creating a new thread causes the number of threads in the thread pool to exceed the maximumPoolSize, the task will be rejected and the RejectedExecutionHandler.rejectedExecution() method will be called.

5. Example of ThreadPoolExecutor

Create a new thread pool with corePoolSize of 2 and maximumPoolSize of 3 by creating the following example code:

import java.util.concurrent.*;

public class ThreadPoolExecutorTest {
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 3, 60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1));

        threadPoolExecutor.execute(() -> {
            try {
                Thread.sleep(3 * 1000);
                System.out.println("Task 1 execution thread:" + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        threadPoolExecutor.execute(() -> {
            System.out.println("Task 2 execution thread:" + Thread.currentThread().getName());
        });
    }
}

The operation result is:

Task 2 execution thread: pool-1-thread-2

Task 1 execution thread: pool-1-thread-1

As you can see, because the number of threads in the thread pool is less than corePoolSize, the thread pool creates two core threads to execute task 1 and task 2 respectively.

Modify the code as follows to open three tasks:

import java.util.concurrent.*;

public class ThreadPoolExecutorTest {
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 3, 60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1));

        threadPoolExecutor.execute(() -> {
            try {
                Thread.sleep(3 * 1000);
                System.out.println("Task 1 execution thread:" + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        threadPoolExecutor.execute(() -> {
            try {
                Thread.sleep(5 * 1000);
                System.out.println("Task 2 execution thread:" + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        threadPoolExecutor.execute(() -> System.out.println("Task 3 execution thread:" + Thread.currentThread().getName()));
    }
}

The operation result is:

Task 1 execution thread: pool-1-thread-1

Task 3 execution thread: pool-1-thread-1

Task 2 execution thread: pool-1-thread-2

It can be seen that when task 3 is executed, instead of creating a new thread, it is put into the work queue first, and finally completed by thread 1.

Add task 4 in the above code:

threadPoolExecutor.execute(() -> System.out.println("Task 4 execution thread:" + Thread.currentThread().getName()));

The running result is:

Task 4 execution thread: pool-1-thread-3

Task 3 execution thread: pool-1-thread-3

Task 1 execution thread: pool-1-thread-1

Task 2 execution thread: pool-1-thread-2

It can be seen that task 3 is put into the work queue first, and task 4 is not put into the work queue (the space is full). Therefore, the third thread is created to execute. After the execution, task 3 is obtained from the queue for execution. Task 1 and task 2 are executed by thread 1 and thread 2 respectively.

Modify the code of task 4 and add task 5:

threadPoolExecutor.execute(() -> {
    try {
        Thread.sleep(2 * 1000);
        System.out.println("Task 4 execution thread:" + Thread.currentThread().getName());
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});

threadPoolExecutor.execute(() -> System.out.println("Task 5 execution thread:" + Thread.currentThread().getName()));

The running result is:

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task ThreadPoolExecutorTest$$Lambda$5/935044096@179d3b25 rejected from java.util.concurrent.ThreadPoolExecutor@254989ff[Running, pool size = 3, active threads = 3, queued tasks = 1, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at ThreadPoolExecutorTest.main(ThreadPoolExecutorTest.java:37)
//Task 4 execution thread: pool-1-thread-3

Task 3 execution thread: pool-1-thread-3

Task 1 execution thread: pool-1-thread-1

Task 2 execution thread: pool-1-thread-2

It can be seen that when task 5 is submitted, because the work queue is full and the number of threads in the thread pool is already 3, the task is discarded and a java.util.concurrent.RejectedExecutionException exception is thrown.

If you see this, are you curious about how much the parameter maximumPoolSize is set to fit?

This question, we will explain next time, welcome to continue to pay attention, ha ha!

6. Source code and reference

Brian Goetz, Java Concurrent Programming Practice

How to check the number of cores of the processor (cpu)

How to use ThreadPoolExecutor

Principle analysis and practice of Java thread pool ThreadPoolExecutor

Deep understanding of Java multithreading core knowledge: necessary for job hopping interview

Using thread pool of infinite queue will lead to memory soaring? [notes on the structure of Shishan]

Finally, welcome to my WeChat public address: "Shanghai stranger". All blogs will be updated in sync.

Tags: Java Programming less Lambda

Posted on Fri, 01 Nov 2019 23:14:18 -0700 by Xasho