Future Mechanism Principle for Java Asynchronous Programming

1. Review of Runnable and Callable

Difference:

  • Callable defines the call() method, and Runnale defines the run () method.
  • The call () method can throw an exception, but the run () method cannot.
  • Callable has a return value, which is generic, passed in when it is created and returned after execution.
  • Callable can get the status of task execution through FutureTask when it executes a task.

Contact:

  • Callable's call method actually executes in Runnable's run method.
  • The Runnable instance object requires the Thread wrapper to start. Callable wraps through FutureTask (essentially Runnable) before executing the Thread wrapper.

2. Principles of Future Mechanism

Future cancels the execution result of a specific Runnable or Callable task, queries whether it is completed, and obtains the result.If necessary, the result of execution can be obtained through the get method, which will block until the task returns the result.

JDK's built-in Futures mainly use the Callable interface and the FutureTask class.The source code is parsed below:

Callable interface:

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

How to use Callable:

Typically used in conjunction with ExecutorService, three submit methods are declared in the ExecutorService interface to generate a Future object with a Callable instance or a Runnable instance parameter.

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

Inheritance:

public class FutureTask<V> implements RunnableFuture<V> {}
 
 
public interface RunnableFuture<V> extends Runnable, Future<V> { void run();}

Clearly related: The FutureTask class implements the RunnableFuture interface, which inherits both the RunnableFuture interface and the Future interface.So it can be executed by a thread as a Runnable, and it can also be used as a Future to get the return value of a Callable, with various methods provided by the Future interface.

Process runtime: Tasks are usually defined within the call method of the Callable interface, return values are generic, and a FutureTask object is regenerated. The FutureTask construction method internal parameters encapsulate the Callable instance and pass the object as a Runnable to the Thread wrapper for execution.

3. Future Source Parsing

Methods provided by the Future interface:

public interface Future<V> {
 
    //Cancel the task.Parameter: Whether to immediately interrupt the task execution, or wait for the task to end
    boolean cancel(boolean mayInterruptIfRunning);
 
    //Whether the task has been canceled or not, if cancelled, returns true
    boolean isCancelled();
 
    //Has the task been completed?Returns true if the task completes normally, throws an exception, or is cancelled
    boolean isDone();
 
    /*Waiting until the task finishes is blocked for results of type V.InterruptedException: Thread interrupted exception, ExecutionException: Task execution exception, CancellationException thrown if task is canceled*/
    V get() throws InterruptedException, ExecutionException;
 
    /*The parameter timeout specifies the timeout time, uint specifies the unit of time, and is defined in the enumeration class TimeUnit.TimeoutException will be thrown if calculation times out*/
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

FutureTask Source Parsing:

Construction method:

public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        //Status is NEW
        this.state = NEW;       // ensure visibility of callable
    }
 
public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

In fact, Callable = Runnable + result, see the implementation of Executors.callable (runnable, result):

public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        //Newa RunnableAdapter, returns Callable, indicating that RunnableAdapter implements Callable
        return new RunnableAdapter<T>(task, result);
    }

Convert Runnable to Callable using the RunnableAdapter adapter, and continue with the RunnableAdapter class:

static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            //Runnable task executed run()
            task.run();
            //T result returned
            return result;
        }
    }

Essentially, Runnable executes the run method by adding a result constant to accept the returned result.

Status value:

    /* Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
 
    private volatile int state;
    //Initialization state
    private static final int NEW          = 0;
    //Executing
    private static final int COMPLETING   = 1;
    //Complete normally
    private static final int NORMAL       = 2;
    //An exception occurred
    private static final int EXCEPTIONAL  = 3;
    //Cancelled
    private static final int CANCELLED    = 4;
    //Being interrupted
    private static final int INTERRUPTING = 5;
    //Interrupted
    private static final int INTERRUPTED  = 6;

run method of FutureTask:

public void run() {
        /*compareAndSwapObject(this, runnerOffset,]null, Thread.currentThread()))
         The first parameter is the object that needs to be changed, the second is the offset, the third parameter is the expected value, and the fourth is the updated value.
        */
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //The call() method is called by FutureTask, indicating that call() is not asynchronous
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    //Set exception
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            //Determine if it is interrupted
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

set method:

protected void set(V v) {
           // NEW -> COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            //Return results, including exceptions
            outcome = v;
            //COMPLETING -> NORMAL
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            //Wake up waiting threads
            finishCompletion();
        }
    }

Assign return values and status values back.

get method:

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        //Is it incomplete or wait
        if (s <= COMPLETING)
            //Waiting process
            s = awaitDone(false, 0L);
        return report(s);
    }
 
    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

The run method parses, and the get method is used to get the return result.

Summary: Future only implements asynchronization, not callbacks. The main thread get s blocked and can poll to see if the asynchronous call is complete.However, Guava ListenableFuture implements asynchronous and non-blocking for the purpose of multi-task asynchronous execution. It obtains execution results through callbacks without polling the status of the task. In other words, it sets the listener, and automatic callbacks return results at the end of task execution without always blocking in waiting for the results.

96 original articles published. 3. 8467 visits
Private letter follow

Tags: JDK

Posted on Fri, 10 Jan 2020 19:34:17 -0800 by zkent