Trace and analyze FutureTask source code

Zero preparation

0 FBI WARNING

The article is extremely wordy and winding.

1 version

JDK version: OpenJDK 11.0.1

IDE : idea 2018.3

2 Introduction to ThreadLocal

FutureTask is the default Future implementation class in jdk, which is often combined with Callable to perform multithreaded concurrent operations.

3 Demo

import java.util.concurrent.*;

public class FutureTaskDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        
        //Create a thread pool
        ExecutorService pool = Executors.newFixedThreadPool(1);
        try{
            //Create a Callable object to execute
            //In fact, Runnable objects can also be used here, but they usually do not
            Callable<String> task = () -> {
                //Three seconds dormancy
                TimeUnit.SECONDS.sleep(3);
                //Returns a string
                return "hello";
            };

            //Using FutureTask object to wrap Callable
            FutureTask<String> futureTask = new FutureTask<>(task);

            //Here, the FutureTask object is put into the thread pool
            pool.submit(futureTask);

            //Note that the futureTask here is essentially thrown into the pool as a Runnable
            //So you can also use the execute(...) method of the thread pool
            //pool.execute(futureTask)

            //Another more common way to execute is to use Thread directly
            //new Thread(futureTask).start();

            //Get results
            //Note that if you don't get it, the thread will block here until you get it
            String result = futureTask.get();

            //There is also a time limited strategy to obtain results
            //Exception will be thrown in case of timeout
            //String result = futureTask.get(1,TimeUnit.SECONDS);

            System.out.println(result);
        }finally {
            //Close connection pool
            pool.shutdown();
        }

    }
}

The creation of FutureTask

Go back to the create code in Demo:

FutureTask<String> futureTask = new FutureTask<>(task);

Constructor to track FutureTask:

//FutureTask.class
public FutureTask(Callable<V> callable) {
    //Validity judgment, cannot be empty
    if (callable == null)
        throw new NullPointerException();
    //Record the callable object
    this.callable = callable;
    //state is an object of type int, which is a
    //NEW = 0
    this.state = NEW;
}

Two run

FutureTask itself is a subclass of Runnable, which is also used as the implementation class of Runnable when consumed by ThreadPoolExecutor or or Thread object.

So its core logic must be in the run() method:

//FutureTask.class
public void run() {

    //Judge the status first. If the status is not NEW, it will return directly
    //RUNNER is a variable of type VarHandler, which points to the thread variable in FutureTask to store the current thread
    //But if the thread is not null, it will return directly here
    //Both of these return conditions mean that the run() method of this FutureTask has been executed
    if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread()))
        return;

    try {
        //Get callable
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //Execute the business logic of callable
                result = c.call();
                //ran is the success mark
                ran = true;
            } catch (Throwable ex) {
                //In case of error
                result = null;
                ran = false;
                //Save exception if unsuccessful
                setException(ex);
            }
            //If successful, it will operate here
            if (ran)
                set(result);
        }
    } finally {
        //Empty space
        runner = null;
        int s = state;
        if (s >= INTERRUPTING)
            //If the status of this FutreTask is interrupt, Thread.yield() will be called continuously here to idle
            handlePossibleCancellationInterrupt(s);
    }
}

There are two key methods here, setException(...) and set(...):

//FutureTask.class
protected void setException(Throwable t) {
    //Compare and update status value with CAS operation
    if (STATE.compareAndSet(this, NEW, COMPLETING)) {
        //outcome is an Object object Object that stores the return value of the callable
        //Because an error is reported here, the error object is stored
        outcome = t;
        //EXCEPTIONAL = 3
        STATE.setRelease(this, EXCEPTIONAL);
        //Finally, the cleaning work is mainly used to wake up the waiting thread and execute the callable
        finishCompletion();
    }
}

//FutureTask.class
protected void set(V v) {
    //The basic logic is the same as the setException(...) method, except that the stored values of STATE and outcome are different
    if (STATE.compareAndSet(this, NEW, COMPLETING)) {
        outcome = v;
        STATE.setRelease(this, NORMAL);
        finishCompletion();
    }
}

Let's look at the finishCompletion() method:

//FutureTask.class
private void finishCompletion() {
    //WaitNode is a static inner class of FutureTask
    //It is essentially a node representation class of one-way linked list, which is used to store the thread object of the thread that wants to get the return value of Callable but is blocked
    for (WaitNode q; (q = waiters) != null;) {
        if (WAITERS.weakCompareAndSet(this, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    //This method is empty
    done();
    //Empty callable
    callable = null;
}

As mentioned before, in the get(...) method of FutureTask, the thread will block until the Callable execution is completed and the return value can be obtained.

So the body of the finishCompletion() method is actually to wake up the blocked thread.

Three get

Go back to the create code in Demo:

String result = futureTask.get();

Trace get() method:

//step 1
//FutureTask.class
public V get() throws InterruptedException, ExecutionException {
    //Here, the state value is determined first. If it is not being completed, the awaitDone(...) method will be called to block the thread
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    //Return result
    return report(s);
}

//step 2
//FutureTask.class
private V report(int s) throws ExecutionException {
    //Get the object to be returned
    Object x = outcome;
    //If it ends normally, just return the object directly
    if (s == NORMAL)
        return (V)x;
    //Throw exception in case of error
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

Let's look at the awaitDone(...) method that blocks threads:

private int awaitDone(boolean timed, long nanos) throws InterruptedException {
    
    //Number of cycles
    long startTime = 0L;
    //Node object
    WaitNode q = null;
    //Linked list queue ID, which indicates whether the thread is added to the linked list or not. Initially, false indicates not added
    boolean queued = false;
    for (;;) {
        int s = state;
        if (s > COMPLETING) { //If Callable execution is complete
            if (q != null)
                q.thread = null;
            return s;
        }else if (s == COMPLETING) //The Callable implementation has just been completed, and the follow-up work has not been done
            Thread.yield();
        else if (Thread.interrupted()) {
            //If the thread is interrupted, an error will be thrown
            removeWaiter(q);
            throw new InterruptedException();
        } else if (q == null) { //The judgment entered here proves that the Callable has not been completed, so a waiting node will be created
            //The time passed in here is false and will not be returned here
            if (timed && nanos <= 0L)
                return s;
            q = new WaitNode(); //New node
        }else if (!queued)
            //queued is initially false. When you enter here, you will add the new q in the previous judgment condition to the first node of the list
            //And queued becomes true
            queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
        else if (timed) {
            //If this operation is time limited, then it needs to judge the time
            final long parkNanos;
            if (startTime == 0L) {
                startTime = System.nanoTime();
                if (startTime == 0L)
                    startTime = 1L;
                parkNanos = nanos;
            } else {
                long elapsed = System.nanoTime() - startTime;
                if (elapsed >= nanos) {
                    removeWaiter(q);
                    return state;
                }
                parkNanos = nanos - elapsed;
            }
            if (state < COMPLETING)
                //Thread is suspended here at parkNanos
                //In this case, the incoming is 0L, so it is permanently suspended
                LockSupport.parkNanos(this, parkNanos);
        }else
            //Thread permanently suspended
            LockSupport.park(this);
    }
}

Four little nagging

FutureTask, like ThreadLocal, is a small tool in the java.util.current package. It's easy to package and understand.

This article is only a personal study note. There may be some mistakes or unclear expressions, which should be supplemented

Tags: Java JDK

Posted on Mon, 02 Dec 2019 03:45:09 -0800 by ubersmuck