FutureTask Source Complete Interpretation

1 Introduction

Last blog post " Introduction to Asynchronous Task Service "Brief introduction and analysis of FutureTask, here is another in-depth analysis of FutureTask (based on JDK1.8).

FutureTask implements both Future and Runnable interfaces, so it can be handed over to Executor to perform this task, or run methods can be executed directly by the calling thread.

Based on the execution status of the FutureTask.run method, it can be divided into the following three states

(1) Not started: The run method has not been executed, and FutureTask is not started.

(2) Started: FutureTask is started during the execution of the run method

(3) Completed: FutureTask is in the completed state when the run method returns normally or is cancelled or ends normally due to an exception thrown during execution.

Executing the FutureTask.get() method causes the calling thread to block when the FutureTask is not started or started; executing the FutureTask.get() method causes the calling thread to immediately return a result or throw an exception when the FutureTask is in the completed state.

When FutureTask is not started, executing FutureTask.cancel() will result in the task never being executed; when FutureTask is started, executing FutureTask.cancel(true) will attempt to stop the task by interrupting the thread executing the task; when FutureTask is started, executingThe FutureTask.cancel(false) method will not affect the threads that are performing this task (allowing the task being executed to run to completion); when the FutureTask is in the completed state, the FutureTask.cancel method returns false (completed tasks cannot be cancelled).

2 Example usage

FutureTask, because it inherits from the Runnable interface itself, can be handed over to the executor Executor for execution; it also represents the result of an asynchronous task and can return a FutureTask through ExecutorService.submit.FutureTask can also be used alone.To better understand FutureTask, a task cache is demonstrated below in conjunction with ConcurrentHashMap.There are multiple tasks in the cache that are executed using multiple threads. A task is consumed by at most one thread. If multiple threads attempt to execute this task, only one thread is allowed to execute, and other threads must wait for it to complete.

import java.util.concurrent.*;

public class FutureTaskTest {
    private final ConcurrentMap<String, Future<String>> taskCache = new ConcurrentHashMap<>();
    public  String executionTask(final String taskName)
            throws ExecutionException, InterruptedException {
        while (true) {
            Future<String> future = taskCache.get(taskName);// Get Tasks from Cache
            if (future == null) {//This task does not exist, build a new task into the cache, and start the task
                Callable<String> task = () ->{
                    System.out.println("The name of the task performed is"+taskName);
                    return taskName;
                } ; // 1.2 Create Task
                FutureTask<String> futureTask = new FutureTask<String>(task);
                future = taskCache.putIfAbsent(taskName, futureTask);// Try putting tasks in the cache
                if (future == null) {
                    future = futureTask;
                    futureTask.run();//Execute Tasks
                }
            }
            try { //If the task is in the cache, you can wait directly for the task to complete
                return future.get();// Waiting for task execution to complete
            } catch (CancellationException e) {
                taskCache.remove(taskName, future);
            }
        }
    }

    public static void main(String[] args)    {
     final   FutureTaskTest taskTest = new FutureTaskTest();
        for (int i = 0; i < 7; i++) {
            int finalI = i;
            new Thread(()->{
                try {
                    taskTest.executionTask("taskName" + finalI);
                } catch (ExecutionException | InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
            new Thread(()->{
                try {
                    taskTest.executionTask("taskName" + finalI);
                    taskTest.executionTask("taskName" + finalI);
                } catch (ExecutionException | InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

Print Output

Task name executed is taskName0

Task name executed is taskName5

The name of the task being executed is taskName4

The name of the task being executed is taskName6

The name of the task being executed is taskName3

Task name executed is taskName1

The name of the task being executed is taskName2

3 Implementation Principle

1) Member variables

It has a member variable state to represent the state

private volatile int state;

It has these possible values

private static final int NEW          = 0;//The initial state or task is running
private static final int COMPLETING   = 1;//Temporary state, task is about to end, setting results
private static final int NORMAL       = 2;//Task Completed Normally
private static final int EXCEPTIONAL  = 3;//End task by throwing an exception
private static final int CANCELLED    = 4;//Task Cancelled
private static final int INTERRUPTING = 5;//Task is being interrupted
private static final int INTERRUPTED  = 6;//Task interrupted(Final state of interrupt)

There are several possible state transitions for a state

/** NEW -> COMPLETING -> NORMAL State Transition Process at Normal End of Task
     * NEW -> COMPLETING -> EXCEPTIONAL state transition process when an exception is thrown during task execution
     * State transition process when NEW -> CANCELLED task is cancelled
     * NEW -> INTERRUPTING -> INTERRUPTED Status Transition Process when an interrupt occurs during task execution
     */

 

Other member variables

private Callable<V> callable;
private Object outcome; // non-volatile, protected by state reads/writes
private volatile Thread runner;
private volatile WaitNode waiters;

The member variable callable represents the task to be performed.

The member variable outcome represents the result of a task or an exception to the abnormal end of a task

The member variable runner represents the thread that executes the task

The member variable waiter represents the wait stack (the data structure is a one-way chain table) waiting for the results of a task to execute.WaitNode is a simple static interior, with one member variable threading representing the thread waiting for the result and another member variable next representing the next waiting node (thread).

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

2) Construction methods

The construction method of FutureTask initializes callable and state, and it has two construction methods that accept tasks of type Callable and Runnable to be executed, respectively.For Runnable type parameters, however, it calls Executors.callable to convert Runnable to Callable type instances for uniform processing.

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

The Executors.callable method is also simple, returning an object of type RunnableAdapter, an implementation class of Callable.

public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}

3) Major API s

(1) run and runAndReset

The run method is Funture's most important method, and everything about FutureTask starts with the run method, which is the way to perform the callable task.

public void run() {
        if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                        null, Thread.currentThread()))
            //Set the current thread as the thread to execute the task, CAS Fail and return directly
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();//Execute Tasks
                    ran = true;
                } catch (Throwable ex) {
                    //Runtime exception, set exception
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);//Set Results
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null; //state Is the final state, will not change, will runer Set to null,Prevent run Method is called concurrently
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state; //Empty Running Threads runner Retrieve later state,Prevent missed handling of interrupts
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

Its main logic is:

1. Check the status and set the thread to run the task

(2) Call the call method of callable to execute the task and catch possible exceptions in operation

(3) If the task completes normally, call set to set the result of the task, set state to NORMAL, save the result to the result, and wake up all threads waiting for the result

(4) If an exception occurs during task execution, call setException to set the exception, set state to EXCEPTIONAL, save the exception to the outcome, and wake up all threads waiting for the result

Finally, the running thread runner is emptied, and if the status may be a cancelled interrupt, the interrupt is processed.

 

The set and setException methods are used to set the results and set exceptions, respectively, but this is their main logic and they do other processing.

They set the results or exceptions to the member variable outcome, update the state, and finally call finishCompletion to remove and wake up all (node) threads from the waiting stack table (task completed, no need to wait, you can get the results directly and wait for the stack to no longer exist).

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

 

There is interrupt handling in the run method. Let's see how the handlePossibleCancellationInterrupt method handles interrupts.

This is simply to make the current thread give up the time slice and let other threads execute the task first, that is, thread comity.

private void handlePossibleCancellationInterrupt(int s) {
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); // wait out pending interrupt
    }

 

The runAndReset method is a protected-level method (called by a subclass) added by the FutureTask class itself that primarily performs tasks that can be executed multiple times without requiring results and returns true only if the task runs and resets successfully.The run method of the static internal ScheduledFutureTask of the timed task executor ScheduledThreadPoolExecutor calls this API.

The runAndSet method has roughly the same logic as the run method, except that the runAndSet does not set the result by calling the set method (it does not need the result itself, and it is also designed to prevent the state from being modified).

protected boolean runAndReset() {
        if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                        null, Thread.currentThread()))
            //Task started or CAS Setting the thread to run the task failed and returned directly false
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result  No call set(V)Method, no end set
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW; //The task ran successfully and state still NEW Return on true,Reverse Return false
    }

(2) get method

The get method is used to get the final result of a task and has two versions, one of which is the timeout version.The main difference between the two versions is that the non-timeout version can wait indefinitely for the result to return, and the non-timeout version does not throw a TimeoutException timeout exception.The basic logic of the get method timeout version is that if a task is not completed, it waits for the task to complete, and finally calls report to report the result, the report returns the result or throws an exception based on the status.

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);//awaitDone The first parameter is false,Indicates that you can wait indefinitely
    return report(s);
}
public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&//Not yet completed
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)//Waiting for completion
        throw new TimeoutException();//When the time limit is reached, the task is still incomplete and a timeout exception is thrown TimeoutException
    return report(s);//Report results
}

The core implementation of the get method is to call the awaitDone method, which is used to wait for the results of the task and will block the current thread if the task does not complete awaitDone.

The basic logic of the awaitDone method is as follows: 1. If an interrupt occurs while executing a task, an InterruptedException exception is thrown;(2) If the task is completed at this time, return to the latest state; (3) if the task is about to be completed, make the current thread give up the CPU time slice and let other threads execute first; (4) if the task is still executing, add the current thread to the wait stack, and then let the current thread sleep until it exceeds the time limit or when the task is completed, the run method calls finishCompletion to wake the thread (run method)FinishCompletion is called by set or setException of, and LockSupport.unpark is called by finishCompletion.

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) { //Interrupted, waiting for the thread to be removed from the stack table and throwing an interrupt exception
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) { //Task completed, emptying the current thread from the waiting queue, returning to the latest state
            if (q != null)
                q.thread = null;
            return s;
        }
        //Task is about to be completed, current thread comity allows other threads to execute
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null) 
            //Initialize the node corresponding to the current thread
            q = new WaitNode();
        else if (!queued)
            //If previous stacking fails, try stacking again ( CAS Update), set the current node to the top of the waiting stack
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                    q.next = waiters, q);
        else if (timed) { //If timeout is set
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) { 
                //Remove the current node (thread) from the wait stack if the task has been executed longer than the given time
                removeWaiter(q);
                return state;
            }
            //Let the current thread sleep for a given time(Or wait run In the method set or setException call finishCompletion To wake up)
            LockSupport.parkNanos(this, nanos);
        }
        else//Timeout not set
      //Wait indefinitely for the current thread until the task is completed run In the method set or setException call finishCompletion To wake up this thread
            LockSupport.park(this);
    }
}

 

The above awaitDone method calls removeWaiter to remove breaks and timeout nodes waiting on the stack table.

Its internal implementation is not easy to understand, but the main logic is clear: traverse the list from beginning to end, remove interrupt/timeout nodes from the list, and restart the chain check and remove invalid nodes if there is thread competition.

private void removeWaiter(WaitNode node) {
    if (node != null) {
        node.thread = null; //First empty the thread corresponding to the node,Below"q.thread != null"You can tell if a node has timed out or broken.
        retry:
        for (;;) {          // restart on removeWaiter race
            //q Represents the node that is currently traversed, pred Express q The precursor node, s Express q Successor Nodes 
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {//Exit inner loop only after traversing the list of chains
                s = q.next;
                //q.thread!=null Indicates that this is not a timeout or break node, it is a valid node and cannot be removed from the stack table
                //(removeWaiter Nodes that start with a timeout or interrupt will begin with thread nulling,So node.thread=null Code)
                if (q.thread != null)
                    pred = q; //When we get the next loop q Precursor Node
                else if (pred != null) { //q.thread== null And pred!=null,Need to invalidate node q Remove from Stack Table
                    //take q The precursor and successor nodes are directly linked together. q Itself has been removed from the stack table
                    pred.next = s;
                    //This is a forward-backward traversal of the chain table. Without competition, it is not possible to fail to check that there are invalid nodes in front of the current node.
                    //Then there must be other threads modifying the current node q A pioneer, sometimes with threads competing, that needs to go through the checks again from the head of the chain table
                    if (pred.thread == null) // check for race
                        continue retry;
                }
                // pred==null And q.thread=null
                //q The precursor node is empty, indicating that q Is the head node of the chain table
                //q.thread==null,Indicate q Is Invalid Node
                //Invalid node cannot be the head node of the chain table, so to update the head node, set the q Successor Nodes s As a new header node in the list of chains
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,  //CAS Update Head Node
                        q, s))
                    //CAS Update failed, retry
                    continue retry;
            }
            break;
        }
    }
}

 

The get method needs to call the report method to report the results, and the basic logic of the report method is simple: return the results of the task if the task ends normally, throw the CancellationException exception if the task is cancelled, and wrap the exception into ExecutionException if it occurs during the execution of the task.

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

(3) cancel method

The cancel method is used to cancel a task, so let's see how the cancel(boolean) method works

public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
            UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                    mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        //①No NEW Status, indicating that the task is at least COMPLETING(Coming to an end)Status, Return false
        //②CAS To update state by INTERRUPTING or CANCELLED Failed, returned false
        //only state Status updates are successful to cancel tasks (prevent concurrent calls)
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {//Allow interrupt to set interrupt flag
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();//Set interrupt flag
            } finally { // final state Set the final state of the interrupt
                //INTERRUPTING -> INTERRUPTED ,take state Update from Interrupting to Interrupting has been interrupted
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        //Wake up from the wait stack and remove all threads(node)
        finishCompletion();
    }
    return true;
}

Its basic logic:

1 Task ended or cancelled, returning false

(2) If mayInterruptIfRunning is true, call interrupt to set the interrupt flag, set state to INTERRUPTED, and if mayInterruptIfRunning is false, set state to CANCELLED.

(3) Call finishCompletion to wake up and remove all threads in the waiting stack

 

finishCompletion() mainly handles the end-of-task sweeping. Its main logic is to empty the waiting stack waiters, wake up and remove all nodes (threads) on the waiting stack, and finally empty the task callable.

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        //After the task is cancelled, wait for the stack table to no longer exist, wait for the stack waiters Assign as null,
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) { //Traverse the list (stack), remove all nodes, and wake up the thread corresponding to the node
                Thread t = q.thread;
                if (t != null) {
                //Empty threads on nodes
                    q.thread = null; 
                    LockSupport.unpark(t);//Wake up this thread
                }
                WaitNode next = q.next;
                if (next == null)//End of list, exit traversal
                    break;
                q.next = null; // unlink to help gc Will node next Property emptying for garbage collection
                q = next;//Move a node backward
            }
            break;
        }
    }
    done();//Empty method, reserved for subclass overrides
    callable = null; //Empty, less trace       // to reduce footprint 
}
 

(4) Other auxiliary methods

The isCancelled method returns the Boolean value of whether the task was canceled

The isDone method returns a Boolean value of whether the task completed or not (even if it ended abnormally)

isCancelled and isDone determine the status of the task directly from the state.

public boolean isCancelled() {
    return state >= CANCELLED;
}

public boolean isDone() {
    return state != NEW;
}

Tags: Java less

Posted on Thu, 12 Mar 2020 11:45:06 -0700 by jj33