Thread pool continued: you must know the FutureTask of the implementation principle of thread pool submit()!

preface

In the last article, I wrote the implementation principle and source code analysis of the thread pool in Java. It is said that it is a real big satisfaction. I want to let you have a thorough understanding of the thread pool through an article, but what are the shortcomings of the article?

The previous article only mentioned the execute() method submitted by the thread, but didn't explain the submit() method submitted by the thread. Submit() has a return value, which can get the result of thread execution, future < T >. This lecture is about the implementation principle of submit() and FutureTask.

Usage scenarios & Ex amp les

Use scenario

The scenario I can think of is that when parallel computing is done, for example, calling methodA() and methodB() in a method, we can submit A and B through thread pool asynchronously, and then get the result of the assembly method A and B calculated in the main thread, which can greatly increase the throughput of the method.

Use example

/**
 * @author wangmeng
 * @date 2020/5/28 15:30
 */
public class FutureTaskTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService threadPool = Executors.newCachedThreadPool();

        System.out.println("====implement FutureTask Thread task====");
        Future<String> futureTask = threadPool.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                System.out.println("FutureTask Execute business logic");
                Thread.sleep(2000);
                System.out.println("FutureTask Business logic execution completed!");
                return "Welcome to pay attention: A flower is not romantic!";
            }
        });

        System.out.println("====Execute main thread task====");
        Thread.sleep(1000);
        boolean flag = true;
        while(flag){
            if(futureTask.isDone() && !futureTask.isCancelled()){
                System.out.println("FutureTask Asynchronous task execution result:" + futureTask.get());
                flag = false;
            }
        }

        threadPool.shutdown();
    }
}

The above use is very simple. The internal pass of submit() is actually a Callable interface. We implement the call() method ourselves. We can get the specific return value through futureTask.

Implementation principle of submit()

submit() is also used to submit tasks to the thread pool, but it can get the return results of tasks. The return results are achieved through FutureTask. First, look at the code implementation in ThreadPoolExecutor:

public class ThreadPoolExecutor extends AbstractExecutorService {
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
}

public abstract class AbstractExecutorService implements ExecutorService {
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
}

Submit task or execute() method, but task is wrapped as futuretask, that is to say, it will be executed after the thread is started in execute() FutureTask.run() method.

Let's see the complete link diagram of its execution

As can be seen from the above figure, the core logic of executing tasks and returning execution results is futuretask. We use FutureTask.run/get The two methods are the breakthrough, and the implementation principle of futuretask is analyzed.

On the source code of FutureTask

First look at some properties in FutureTask:

public class FutureTask<V> implements RunnableFuture<V> {
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    private Callable<V> callable;
    private Object outcome;
    private volatile Thread runner;
    private volatile WaitNode waiters;
}
  1. state

Current task status, 7 types in total.
NEW: the current task has not been executed
COMPLETING: the current task is ending, not completely finished, a critical state
NORMAL: the current task ends normally
Exception: an exception occurred during the execution of the current task.
Cancel: the current task is CANCELLED
INTERRUPTING: the current task is being interrupted
INTERRUPTED: the current task has been INTERRUPTED

  1. callble

User submits the Callable of task transfer, customizes the call method, and implements the business logic

  1. outcome

At the end of the task, output saves the execution result or exception information.

  1. runner

During the execution of the current task by a thread, save the thread object reference of the current task

  1. waiters

Because there will be many threads to get the results of the current task, a stack data structure is used to save

FutureTask.run() implementation principle

We already know that runWorker() in the thread pool will eventually call FutureTask.run() in the method, let's look at its execution principle:

The specific code is as follows:

public class FutureTask<V> implements RunnableFuture<V> {
    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
}

The first step is to determine the state of FutureTask. You must be NEW to continue.

Then the runner reference is modified to the current thread through CAS.

Then the user-defined call() method is executed to set the returned result to result, which may be normal or abnormal information. The main call here is set()/setException()

FutureTask.set() implementation principle

The implementation of the set() method is very simple. Look at the following code directly:

public class FutureTask<V> implements RunnableFuture<V> {
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
            finishCompletion();
        }
    }
}

Assign the data returned by call() to the global variable outcome, then modify the state state to NORMAL, and finally call finishCompletion() to do the wake-up operation of the pending thread. This method will be explained later after get().

FutureTask.get() implementation principle

Next, look at the code:

public class FutureTask<V> implements RunnableFuture<V> {
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
}

If the state in FutureTask is NORMAL or COMPLETING, it means that the current task is not completed and the call to get() method will be blocked. The specific blocking logic is in awaitDone() method:

private int awaitDone(boolean timed, long nanos) throws InterruptedException {

        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING)
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

This method can be said to be the core method in FutureTask. Step by step analysis:

If timed is not empty, this means that the specified nanos time has not returned results, and the thread will exit.

q is a WaitNode object, which encapsulates the current reference thread in a stack data structure. The WaitNode object properties are as follows:

 static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

Then determine whether the current thread is interrupted, and throw an interrupt exception if it is interrupted.

Next, we will enter a round of if... else if... Judgment logic. We still use the branch method to analyze.

Branch 1: if (s > completing){

At this time, the get() method has a result. Whether it is a normal result returned, or an exception, interrupt, cancel, etc., at this time, it directly returns to the state state, and then executes the report() method.

Branch 2: else if (s == COMPLETING)

The condition is set, indicating that the current task is close to the completion state. Here, let the current thread release the cpu for the next round of preemption.

Branch 3: else if (q == null)

For the first spin execution, the WaitNode has not been initialized yet, initialization q=new WaitNode();

Branch 4: else if (!queued){

queued indicates whether the current thread is in the stack. If it is not in the stack, perform the stack operation. By the way, point the global variable waiters to the top element of the stack.

Branch 5 / 6: LockSupport.park

If the timeout is set, use parkNanos to suspend the current thread, otherwise use park()

After such a spin cycle, if the execution of call() does not return a result, the thread calling the get() method will be suspended.

The suspended thread will wake up after waiting for the result returned by run(). The specific execution logic is in finishCompletion().

The data in the final stack structure is as follows:

FutureTask.finishCompletion() implementation principle

The specific implementation code is as follows:

private void finishCompletion() {
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null;
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;
}

The code implementation is very simple. After reading the get() method, we know that all the threads calling the get() method will be saved in a statck data structure composed of WaitNode before the run() returns the result, and each thread will be suspended.

Here is to traverse the top elements of the waiters stack, then query the next node to wake up in turn, and then wake up to call the report() method later.

FutureTask.report() implementation principle

The specific code is as follows:

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

This method is very simple, because when it is executed here, it means that the current state state must be greater than COMPLETING. If it is normal to return, then the output data will be returned.

If state is a cancel state, a cancelationexception is thrown.

If the status is not satisfied, there is an error in the execution and an ExecutionException is thrown directly

FutureTask.cancel() implementation principle

public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally {
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

The logic of the cancel() method is very simple, that is, change the state state to CANCELLED, then call finishCompletion() to wake up the waiting thread.

Here, if mayInterruptIfRunning, it will interrupt the current thread first, and then wake up the waiting thread.

summary

The implementation principle of FutureTask is actually very simple. Each method basically draws a simple flow chart to facilitate the immediate.

Later, I plan to share a source code interpretation related to BlockingQueue, so that the thread pool can also be regarded as the end.

Before this, you may share a spring cloud common configuration code analysis, best practice manual, etc., which is convenient to use in your work and also a summary of the source code you have seen before. Coming soon!
Welcome to:

Tags: Java Spring

Posted on Sun, 31 May 2020 20:23:03 -0700 by mattw