Understanding Netty Concurrency Tool from Source-Promise

premise

Recently, I have been watching Netty-related content and writing a lightweight RPC framework to practice. I found many highlights in Netty's source code. Some implementations can even be described harshly.In addition, Netty provides excellent tools that can be used out of the box.Here's an analysis of your favorite areas, Promise, a Netty tool module for concurrency.

Environment version:

  • Netty:4.1.44.Final
  • JDK1.8

Introduction to Promise

Promise, translated in Chinese as a promise or promise, means something that one person has a certain vision for another person, and it is generally possible.

io.netty.util.concurrent.Promise has only one sentence in the comment: the special writable io.netty.util.concurrent.Future (the Promise interface is a subinterface of io.netty.util.concurrent.Future).And io.netty.util.concurrent.Future is java An extension of.util.concurrent.Future that represents the result of an asynchronous operation.We know that Futures in JDK concurrent packages are not writable and do not provide a listenable entry (no observer mode is applied), and Promise makes up for both.On the other hand, from an inheritance perspective, DefaultPromise is the final implementation class for these interfaces, so you need to focus on the DefaultPromise class when analyzing the source code.In general, the functions provided by a module are defined by interfaces. Here is a list of the functions of two interfaces:

  • io.netty.util.concurrent.Promise
  • io.netty.util.concurrent.Future

First look at the io.netty.util.concurrent.Future interface:

public interface Future<V> extends java.util.concurrent.Future<V> {

    // Is the I/O operation performed successfully
    boolean isSuccess();

    // Mark whether I/O operations can be canceled by the following cancel(boolean mayInterruptIfRunning)
    boolean isCancellable();

    // Returns an exception instance of an I/O operation - this method returns null if the I/O operation itself is successful
    Throwable cause();

    // Add a listener for the current Future instance to listen for the completion of the Future operation - all listener instances will be callback after the isDone() method is activated
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    
    // Remove listeners that listen for Future operation completion for current Future
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    // Synchronization waits for Future to complete for the final result (success) or throws an exception (failure) in response to an interruption
    Future<V> sync() throws InterruptedException;

    // Synchronization waits for Future to complete for the final result (success) or throws an exception (failure) without responding to an interrupt
    Future<V> syncUninterruptibly();

    // Wait for Future to complete, response interrupted
    Future<V> await() throws InterruptedException;

    // Wait for Future to finish without responding to interrupts
    Future<V> awaitUninterruptibly();

    // Waiting for Future to complete with timeout, response interrupted
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;
    boolean await(long timeoutMillis) throws InterruptedException;
    
    // Wait for Future to finish with timeout, no interrupt response
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);
    boolean awaitUninterruptibly(long timeoutMillis);

    // Non-blocking returns the result of Future immediately, which must return null if Future is not complete; in some scenarios, if Future successfully obtains a result of null, it needs to double-check if the isDone() method is true
    V getNow();

    // Cancels the execution of the current Future instance and throws a CancellationException exception if the cancellation succeeds
    @Override
    boolean cancel(boolean mayInterruptIfRunning);
}

The sync() method is similar to the await() method except that sync() checks for exceptional execution, throws out the exception instance wrapper as soon as it finds an execution exception, and the await() method is not aware of the exception.

Next, look at the io.netty.util.concurrent.Promise interface:

public interface Promise<V> extends Future<V> {
   
    // Marks the current Future as successful, sets the result, informs all listeners if it is successful, and throws IllegalStateException if Future has succeeded or failed
    Promise<V> setSuccess(V result);

    // Mark the current Future as successful, set the result, if successful, notify all listeners and return true, otherwise return false
    boolean trySuccess(V result);

    // Marking the current Future failed, setting the result to an exception instance, notifying all listeners if the setting succeeded, and throwing IllegalStateException if the Future succeeded or failed
    Promise<V> setFailure(Throwable cause);

    // Marking the current Future failed, setting the result to an exception instance, if set successfully, notify all listeners and return true, otherwise return false
    boolean tryFailure(Throwable cause);
    
    // Mark the current Promise instance as irrevocable, set successfully returns true, otherwise returns false
    boolean setUncancellable();

    // The following method is essentially identical to the method in io.netty.util.concurrent.Future except that the return type is modified to Promise

    @Override
    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    @Override
    Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

    @Override
    Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    Promise<V> await() throws InterruptedException;

    @Override
    Promise<V> awaitUninterruptibly();

    @Override
    Promise<V> sync() throws InterruptedException;

    @Override
    Promise<V> syncUninterruptibly();
}

At this point, all the functions of the Promise interface are analyzed, and then the implementation of Promise is analyzed in detail from the source point of view.

Promise Source Implementation

The implementation class of Promise is io.netty.util.concurrent.DefaultPromise (there are actually many subclasses of DefaultPromise, some of which are extensions to customize specific scenarios), and DefaultPromise inherits from io.netty.util.concurrent.AbstractFuture:

public abstract class AbstractFuture<V> implements Future<V> {

    // Permanently block methods waiting to get results
    @Override
    public V get() throws InterruptedException, ExecutionException {
        // Call the permanent wait method for response interruption to block
        await();
        // After waking up from permanent blocking, determine if Future performs an exception
        Throwable cause = cause();
        if (cause == null) {
            // Exception is empty indicating successful execution, call getNow() method to return results
            return getNow();
        }
        // Exception is empty but not empty, where a specific cancel exception is distinguished and converted to a CancellationException throw
        if (cause instanceof CancellationException) {
            throw (CancellationException) cause;
        }
        // All other exceptions that are not undo exceptions are wrapped to execute exceptions ExecutionException thrown
        throw new ExecutionException(cause);
    }
    
    // Method with timeout blocking waiting to get results
    @Override
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        // Call response interrupt wait method with timeout to block
        if (await(timeout, unit)) {
             // After waking up from a blocking with timeout, determine if Future is executing an exception
            Throwable cause = cause();
            if (cause == null) {
                // Exception is empty indicating successful execution, call getNow() method to return results
                return getNow();
            }
            // Exception is empty but not empty, where a specific cancel exception is distinguished and converted to a CancellationException throw
            if (cause instanceof CancellationException) {
                throw (CancellationException) cause;
            }
            // All other exceptions that do not cancel exceptions are wrapped to execute exceptions ExecutionException without waiting for a timeout
            throw new ExecutionException(cause);
        }
        // Method Step In Here Explains Wait Timeout throws TimeoutException
        throw new TimeoutException();
    }
}

AbstractFuture only implements get() and get(long timeout, TimeUnit unit), which are implemented in much the same way as in java.util.concurrent.FutureTask.

DefaultPromise has a lot of source code, so read it separately, starting with its properties and constructors:

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

    // Normal log handle, InternalLogger is Netty's internally encapsulated log interface
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);

    // Log handle at the time the task refuses to execute - Promise needs to be committed to the thread as a task to execute, if the task refuses to execute use this log handle to print the log
    private static final InternalLogger rejectedExecutionLogger =
            InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");

    // The maximum stack depth for the listener, which is 8 by default, to prevent memory overflow due to excessive stack depth during nested callback calls. An example of its use will be given later
    private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
            SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
    
    // Result Updater, used by CAS to update the value of the result result result result
    @SuppressWarnings("rawtypes")
    private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
            AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
    
    // The value used to populate the result, Promise executes successfully when the result result result is set to pass in null, and this value is used to represent the successful result
    private static final Object SUCCESS = new Object();
    
    // The value used to populate the result, indicating that Promise cannot be cancelled
    private static final Object UNCANCELLABLE = new Object();
    
    // Holder of the CancellationException instance, used to determine the Promise cancel state and throw the CancellationException
    private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
            new CancellationException(), DefaultPromise.class, "cancel(...)"));
    
    // Array of exception stack information elements for CANCELLATION_CAUSE_HOLDER
    private static final StackTraceElement[] CANCELLATION_STACK = CANCELLATION_CAUSE_HOLDER.cause.getStackTrace();
    
    // True result object, using Object type, may end up null, true result instance, SUCCESS, UNCANCELLABLE or CANCELLATION_CAUSE_HOLDER, etc.
    private volatile Object result;
    
    // Event Executor, which does not expand at this time, can be understood as a single dispatch thread
    private final EventExecutor executor;
    
     // A collection of listeners, either a single GenericFutureListener instance or a DefaultFutureListeners instance
    private Object listeners;
    
    // Number of threads waiting to get results
    private short waiters;

    // Mark if the listener is being callback
    private boolean notifyingListeners;

    // Constructor depends on EventExecutor
    public DefaultPromise(EventExecutor executor) {
        this.executor = checkNotNull(executor, "executor");
    }

    protected DefaultPromise() {
        // only for subclasses - This constructor is reserved for subclasses
        executor = null;
    }

    // ...omit other code...

    // Private static internal class for holding Throwable instances, which are the reason instances for holding exceptions
    private static final class CauseHolder {
        final Throwable cause;
        CauseHolder(Throwable cause) {
            this.cause = cause;
        }
    }

    // Private static internal class that overrides the stack information of CancellationException for the previously defined CANCELLATION_STACK and the full name of the class that toString() returns CancellationException
    private static final class LeanCancellationException extends CancellationException {
        private static final long serialVersionUID = 2794674970981187807L;

        @Override
        public Throwable fillInStackTrace() {
            setStackTrace(CANCELLATION_STACK);
            return this;
        }

        @Override
        public String toString() {
            return CancellationException.class.getName();
        }
    }
    // ...omit other code...
}

Promise currently supports two types of listeners:

  • GenericFutureListener: Supports generic FutureListeners.
  • GenericProgressiveFutureListener: It is a subclass of GenericFutureListener that supports progress representation and generic FutureListeners (some scenarios require multiple steps, similar to the progress bar).
// GenericFutureListener
public interface GenericFutureListener<F extends Future<?>> extends EventListener {

    void operationComplete(F future) throws Exception;
}

// GenericProgressiveFutureListener
public interface GenericProgressiveFutureListener<F extends ProgressiveFuture<?>> extends GenericFutureListener<F> {
    
    void operationProgressed(F future, long progress, long total) throws Exception;
}

To enable Promise to support multiple listeners, Netty adds a DefaultFutureListeners class decorated with a default modifier to hold an array of listener instances:

// DefaultFutureListeners
final class DefaultFutureListeners {

    private GenericFutureListener<? extends Future<?>>[] listeners;
    private int size;
    private int progressiveSize; // the number of progressive listeners
    
    // This construction is relatively special in order for listeners (Object type) instances in Promise to be converted from a single GenericFutureListener instance to a DefaultFutureListeners type
    @SuppressWarnings("unchecked")
    DefaultFutureListeners(GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
        listeners = new GenericFutureListener[2];
        listeners[0] = first;
        listeners[1] = second;
        size = 2;
        if (first instanceof GenericProgressiveFutureListener) {
            progressiveSize ++;
        }
        if (second instanceof GenericProgressiveFutureListener) {
            progressiveSize ++;
        }
    }

    public void add(GenericFutureListener<? extends Future<?>> l) {
        GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
        final int size = this.size;
        // Notice here that each expansion array is twice as long as the original
        if (size == listeners.length) {
            this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
        }
        // Add the current GenericFutureListener to the array
        listeners[size] = l;
        // Total number of listeners plus 1
        this.size = size + 1;
        // If GenericProgressiveFutureListener, add 1 to the total number of listeners with progress indication
        if (l instanceof GenericProgressiveFutureListener) {
            progressiveSize ++;
        }
    }

    public void remove(GenericFutureListener<? extends Future<?>> l) {
        final GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
        int size = this.size;
        for (int i = 0; i < size; i ++) {
            if (listeners[i] == l) {
                // Calculate subscripts for monitors that need to be moved
                int listenersToMove = size - i - 1;
                if (listenersToMove > 0) {
                    // The elements behind listenersToMove all move to the front of the array
                    System.arraycopy(listeners, i + 1, listeners, i, listenersToMove);
                }
                // The last location of the current total number of listeners is set to null, minus 1
                listeners[-- size] = null;
                this.size = size;
                // If the listener is GenericProgressiveFutureListener, the total number of listeners with progress indication is reduced by 1
                if (l instanceof GenericProgressiveFutureListener) {
                    progressiveSize --;
                }
                return;
            }
        }
    }
    
    // Returns an array of listener instances
    public GenericFutureListener<? extends Future<?>>[] listeners() {
        return listeners;
    }
    
    // Total number of returned listeners
    public int size() {
        return size;
    }
    
    // Total number of listeners for return zone progress indication
    public int progressiveSize() {
        return progressiveSize;
    }
}

Next, looking at the remaining method implementations of DefaultPromise, the author feels that the implementation of DefaultPromise method is artistic in terms of code order.First look at several ways to determine the status of Promise execution:

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

    // ...omit other code...

    @Override
    public boolean setUncancellable() {
        // Update resultto UNCANCELLABLE with the result updater CAS, expecting the old value to be null, the updated value to be the UNCANCELLABLE attribute, and returning true if successful
        if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
            return true;
        }
        Object result = this.result;
        // Step in here to indicate that the current value of result is not null, isDone0() and isCancelled0() are both final states, and return false if the final state is hit here
        //(Author's note: Actually, it can be said that the result cannot be null here, if it is not final, it can only be an instance of UNCANCELLABLE attribute)
        return !isDone0(result) || !isCancelled0(result);
    }

    @Override
    public boolean isSuccess() {
        Object result = this.result;
        // If successful, the result is not null, is not UNCANCELLABLE, and is not of CauseHolder type
        //(Author's note: Actually, if Promise is successful, then result can only be a developer-defined instance or a SUCCESS attribute instance)
        return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
    }

    @Override
    public boolean isCancellable() {
        // If cancellable, result null indicates that Promise is in the initialization state and has not been executed, it is considered cancellable
        return result == null;
    }

    @Override
    public Throwable cause() {
        // Get the Throwable instance from the current result
        return cause0(result);
    }

    private Throwable cause0(Object result) {
        // result is not CauseHolder type, returns null directly
        if (!(result instanceof CauseHolder)) {
            return null;
        }
        // If the result is CANCELLATION_CAUSE_HOLDER (persistence of static CancellationException)
        if (result == CANCELLATION_CAUSE_HOLDER) {
            // Create a new custom LeanCancellationException instance
            CancellationException ce = new LeanCancellationException();
            // Return if CAS update result result result result is a new instance of LeanCancellationException
            if (RESULT_UPDATER.compareAndSet(this, CANCELLATION_CAUSE_HOLDER, new CauseHolder(ce))) {
                return ce;
            }
            // Go here and say result is a custom CauseHolder instance of non-CANCELLATION_CAUSE_HOLDER
            result = this.result;
        }
        // Back to CauseHolder's cause
        return ((CauseHolder) result).cause;
    }
      
    // Static method to determine whether Promise is cancelled based on the fact that the result must be of type CauseHolder and the cause in CauseHolder must be of type CancellationException or its subclasses
    private static boolean isCancelled0(Object result) {
        return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
    }
    
    // Static method to determine if Promise is complete based on result not null and not an instance of the UNCANCELLABLE property
    private static boolean isDone0(Object result) {
        return result != null && result != UNCANCELLABLE;
    }

    // Determine whether the Promise instance is cancelled
    @Override
    public boolean isCancelled() {
        return isCancelled0(result);
    }
    
    // Determine if the Promise instance is complete
    @Override
    public boolean isDone() {
        return isDone0(result);
    }
    // ...omit other code...
}    

Next, look at how listeners are added and removed (which also includes logic to notify the listener):

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

    // ...omit other code...
    @Override
    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        // Enter non-empty checks
        checkNotNull(listener, "listener");
        // Lock, the object being locked is the Promise instance itself
        synchronized (this) {
            // Add listener
            addListener0(listener);
        }
        // If the Promise instance has been executed, notify the listener to call back
        if (isDone()) {
            notifyListeners();
        }
        return this;
    }

    @Override
    public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
        // Enter non-empty checks
        checkNotNull(listeners, "listeners");
        // Lock, the object being locked is the Promise instance itself
        synchronized (this) {
            // Traverse through parameter groups to add listeners, empty elements jump out directly
            for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
                if (listener == null) {
                    break;
                }
                addListener0(listener);
            }
        }
        // If the Promise instance has been executed, notify the listener to call back
        if (isDone()) {
            notifyListeners();
        }

        return this;
    }

    @Override
    public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
        // Enter non-empty checks
        checkNotNull(listener, "listener");
        // Lock, the object being locked is the Promise instance itself
        synchronized (this) {
            // Remove listener
            removeListener0(listener);
        }
        return this;
    }

    @Override
    public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) {
        // Enter non-empty checks
        checkNotNull(listeners, "listeners");
        // Lock, the object being locked is the Promise instance itself
        synchronized (this) {
            // Traverse through parameter groups to remove listeners, empty elements jump out
            for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
                if (listener == null) {
                    break;
                }
                removeListener0(listener);
            }
        }
        return this;
    }

    private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
        // If the Promise instance holds listeners as null, it is set directly to participate in listeners
        if (listeners == null) {
            listeners = listener;
        } else if (listeners instanceof DefaultFutureListeners) {
             // If the current Promise instance holds listeners of type DefaultFutureListeners, call its add() method to add them
            ((DefaultFutureListeners) listeners).add(listener);
        } else {
            // Step in here to show that the current Promise instance holds listeners as a single GenericFutureListener instance and needs to be converted to a DefaultFutureListeners instance
            listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
        }
    }

    private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
        // If the current Promise instance holds listeners of type DefaultFutureListeners, call its remove() method to remove them
        if (listeners instanceof DefaultFutureListeners) {
            ((DefaultFutureListeners) listeners).remove(listener);
        } else if (listeners == listener) {
            // If the current Promise instance holds listeners that are not of type DefaultFutureListeners, that is, a single GenericFutureListener and are the same as the incoming listener,
            // Promise instance holds listeners set to null
            listeners = null;
        }
    }

    private void notifyListeners() {
        EventExecutor executor = executor();
        // The current execution thread is an event loop thread, so direct synchronization calls are simply the same thread that calls the notifyListeners() method as EventExecutor
        if (executor.inEventLoop()) {
            // The following ThreadLocal and listenerStackDepth are related to call stack deep protection, and the blog will start a new section dedicated to this, which you can ignore for the moment
            final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
            final int stackDepth = threadLocals.futureListenerStackDepth();
            if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
                threadLocals.setFutureListenerStackDepth(stackDepth + 1);
                try {
                    notifyListenersNow();
                } finally {
                    threadLocals.setFutureListenerStackDepth(stackDepth);
                }
                return;
            }
        }
        // If the current execution thread is not an event loop thread, wrap notifyListenersNow() as a Runnable instance and place it in EventExecutor to execute
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                notifyListenersNow();
            }
        });
    }
    
    // Using EventExecutor for task execution, exceptions thrown by the execute() method are printed using the rejectedExecutionLogger handle
    private static void safeExecute(EventExecutor executor, Runnable task) {
        try {
            executor.execute(task);
        } catch (Throwable t) {
            rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
        }
    }
   
    // Notify all listeners immediately for callback
    private void notifyListenersNow() {
        Object listeners;
        // Lock here, set the value of notifyingListeners under lock protection, if multiple threads call the notifyListenersNow() method of the same Promise instance
        // Threads that hit notifyingListeners can return directly
        synchronized (this) {
            // Only proceed if there are listeners to notify and we are not already notifying listeners.
            if (notifyingListeners || this.listeners == null) {
                return;
            }
            notifyingListeners = true;
            // The temporary variable listeners holds the instantaneous listener instance, making it easier to set the listeners of the Promise instance to null next
            listeners = this.listeners;
            // Reset listeners of current Promise instance to null
            this.listeners = null;
        }
        for (;;) {
            if (listeners instanceof DefaultFutureListeners) {
                // Notifications in multiple listener scenarios
                notifyListeners0((DefaultFutureListeners) listeners);
            } else {
                // Notification in the case of a single listener
                notifyListener0(this, (GenericFutureListener<?>) listeners);
            }
            synchronized (this) {
                if (this.listeners == null) {
                    // Since there is no possibility of exception throwing, instead of writing in the finally block, reset notifyingListeners to false and return to jumping out of the loop
                    notifyingListeners = false;
                    return;
                }
                  // The temporary variable listeners holds the instantaneous listener instance, and the callback operation judges that it is based on the temporary instance - here another thread may have updated the value of listeners
                listeners = this.listeners;
                // Reset the listeners of the current Promise instance to null, ensuring that the listener is only called back once, and that the next time it jumps out of the for dead loop
                this.listeners = null;
            }
        }
    }
   
    // Traverse through the listeners array in DefaultFutureListeners and call the static method notifyListener0()
    private void notifyListeners0(DefaultFutureListeners listeners) {
        GenericFutureListener<?>[] a = listeners.listeners();
        int size = listeners.size();
        for (int i = 0; i < size; i ++) {
            notifyListener0(this, a[i]);
        }
    }
    
    // This static method is the final listener callback, which simply calls GenericFutureListener#operationComplete() and passes in the current Promise instance, capturing all exceptions and printing the warn log
    @SuppressWarnings({ "unchecked", "rawtypes" })
    private static void notifyListener0(Future future, GenericFutureListener l) {
        try {
            l.operationComplete(future);
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
            }
        }
    }
}    

Then look at the wait() and sync() method systems:

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

    // ...omit other code...

    @Override
    public Promise<V> await() throws InterruptedException {
        // If Promise finishes executing, return directly
        if (isDone()) {
            return this;
        }
        // Throw InterruptedException directly if current thread is interrupted
        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }
        // Deadlock detection
        checkDeadLock();
        // Lock, the lock object is the current Promise instance
        synchronized (this) {
            // A dead loop is set here, terminating if isDone() is true
            while (!isDone()) {
                // Number of waiting threads plus 1
                incWaiters();
                try {
                    // This calls the Object#wait() method to block and throws an InterruptedException if the thread is interrupted
                    wait();
                } finally {
                    // Number of waiting threads minus 1 after unblocking
                    decWaiters();
                }
            }
        }
        return this;
    }

    @Override
    public Promise<V> awaitUninterruptibly() {
        // If Promise finishes executing, return directly
        if (isDone()) {
            return this;
        }
        // Deadlock detection
        checkDeadLock();
        boolean interrupted = false;
        // Lock, the lock object is the current Promise instance
        synchronized (this) {
            // A dead loop is set here, terminating if isDone() is true
            while (!isDone()) {
                 // Number of waiting threads plus 1
                incWaiters();
                try {
                    // This calls the Object#wait() method to block and catch the InterruptedException exception if the InterruptedException is thrown to record the thread's interrupt state to interrupted
                    wait();
                } catch (InterruptedException e) {
                    // Interrupted while waiting.
                    interrupted = true;
                } finally {
                    // Number of waiting threads minus 1 after unblocking
                    decWaiters();
                }
            }
        }
        // If the thread is interrupted and jumps out of the waiting block, clear the thread's interrupt flag bit
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
        return this;
    }

    // The next few wait() methods with timeout are calls to await0()

    @Override
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return await0(unit.toNanos(timeout), true);
    }

    @Override
    public boolean await(long timeoutMillis) throws InterruptedException {
        return await0(MILLISECONDS.toNanos(timeoutMillis), true);
    }

    @Override
    public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
        try {
            return await0(unit.toNanos(timeout), false);
        } catch (InterruptedException e) {
            // Should not be raised at all.
            throw new InternalError();
        }
    }

    @Override
    public boolean awaitUninterruptibly(long timeoutMillis) {
        try {
            return await0(MILLISECONDS.toNanos(timeoutMillis), false);
        } catch (InterruptedException e) {
            // Should not be raised at all.
            throw new InternalError();
        }
    }
    
    // Check for deadlocks, where it is determined that the wait thread is an event loop thread and throws the BlockingOperationException exception directly
    // Simply put, the execution thread of Promise and the thread waiting for the result cannot be the same thread, otherwise the dependencies will loop
    protected void checkDeadLock() {
        EventExecutor e = executor();
        if (e != null && e.inEventLoop()) {
            throw new BlockingOperationException(toString());
        }
    }

    @Override
    public Promise<V> sync() throws InterruptedException {
        // Sync Permanent Blocking Wait
        await();
        // Blocking waits to be released and throws if there is an exception to execution
        rethrowIfFailed();
        return this;
    }

    @Override
    public Promise<V> syncUninterruptibly() {
        // Synchronization Permanent Blocking Wait-Response Interrupt
        awaitUninterruptibly();
        // Plug waits to be released and throws if there is an exception to execution
        rethrowIfFailed();
        return this;
    }
    
    // Waters plus 1, throws IllegalStateException if it exceeds Short.MAX_VALUE
    private void incWaiters() {
        if (waiters == Short.MAX_VALUE) {
            throw new IllegalStateException("too many waiters: " + this);
        }
        ++waiters;
    }
    
    // waiters minus 1
    private void decWaiters() {
        --waiters;
    }
    
    // cause throws if it is not null
    private void rethrowIfFailed() {
        Throwable cause = cause();
        if (cause == null) {
            return;
        }
        PlatformDependent.throwException(cause);
    }

    private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
        // If Promise finishes executing, return directly
        if (isDone()) {
            return true;
        }
        // Returns the result of isDone() if the timeout is less than 0
        if (timeoutNanos <= 0) {
            return isDone();
        }
        // If an interrupt is allowed and the interrupt flag bit of the current thread is true, an InterruptedException is thrown
        if (interruptable && Thread.interrupted()) {
            throw new InterruptedException(toString());
        }
        // Deadlock detection
        checkDeadLock();
        // Record current nanosecond timestamp
        long startTime = System.nanoTime();
        // The length of the wait time in nanoseconds
        long waitTime = timeoutNanos;
        // Records whether the thread was interrupted
        boolean interrupted = false;
        try {
            // Dead cycle
            for (;;) {
                synchronized (this) {
                    // If Promise finishes executing, return directly to true - this step is a priori judgment, hit without blocking and waiting
                    if (isDone()) {
                        return true;
                    }
                    // Number of waiting threads plus 1
                    incWaiters();
                    try {
                        // The Object#wait() method with timeout is called to block
                        wait(waitTime / 1000000, (int) (waitTime % 1000000));
                    } catch (InterruptedException e) {
                        // Thread is interrupted and externally allowed to be interrupted, then throw InterruptedException directly
                        if (interruptable) {
                            throw e;
                        } else {
                            // Otherwise only interrupted status is recorded
                            interrupted = true;
                        }
                    } finally {
                        // Number of waiting threads minus 1 after unblocking
                        decWaiters();
                    }
                }
                // After unblocking, return to true if Promise finishes executing
                if (isDone()) {
                    return true;
                } else {
                    // Step in here to indicate that Promise has not finished executing, recalculate the number of wait intervals (corrections), and go to the next cycle if greater than 0
                    waitTime = timeoutNanos - (System.nanoTime() - startTime);
                    if (waitTime <= 0) {
                        return isDone();
                    }
                }
            }
        } finally {
            // If the thread is interrupted and jumps out of the waiting block, clear the thread's interrupt flag bit
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }
    // ...omit other code...
}

Finally, there are several ways to set the results and get them:

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

    // ...omit other code...
    @Override
    public Promise<V> setSuccess(V result) {
        // Set successful results and return current Promise instance if set successfully
        if (setSuccess0(result)) {
            return this;
        }
        // Setup failure indicates multiple settings, Promise has finished executing and throws an exception
        throw new IllegalStateException("complete already: " + this);
    }

    @Override
    public boolean trySuccess(V result) {
        // Set the success result and return a Boolean value indicating success or failure
        return setSuccess0(result);
    }

    @Override
    public Promise<V> setFailure(Throwable cause) {
        // Set failed result, return current Promise instance if set successfully
        if (setFailure0(cause)) {
            return this;
        }
        // Setup failure indicates multiple settings, Promise has finished executing and throws an exception
        throw new IllegalStateException("complete already: " + this, cause);
    }

    @Override
    public boolean tryFailure(Throwable cause) {
        // Set the failure result and return a Boolean value indicating success or failure
        return setFailure0(cause);
    }

    @SuppressWarnings("unchecked")
    @Override
    public V getNow() {
        // Returns null if the result is CauseHolder type, SUCCESS attribute instance, or UNCANCELLABLE implementation instance, otherwise returns the result value after the conversion type
        // Unaware of exceptions, this method still returns null if CauseHolder wraps the exception
        Object result = this.result;
        if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) {
            return null;
        }
        return (V) result;
    }

    @SuppressWarnings("unchecked")
    @Override
    public V get() throws InterruptedException, ExecutionException {
        // Permanent Blocking Get Results
        Object result = this.result;
        // Permanent blocking wait if Promise is not finished
        if (!isDone0(result)) {
            await();
            // Update Result Temporary Variable
            result = this.result;
        }
        // Return null directly when result is an instance of SUCCESS or UNCANCELLABLE property
        if (result == SUCCESS || result == UNCANCELLABLE) {
            return null;
        }
        // If the result is of type CauseHolder, get the cause property held in it, and possibly null
        Throwable cause = cause0(result);
        if (cause == null) {
            // Return of result value after converting type on the premise of successful execution
            return (V) result;
        }
        // Cancelation, throw CancellationException
        if (cause instanceof CancellationException) {
            throw (CancellationException) cause;
        }
        // All remaining cases are encapsulated as ExecutionException exceptions
        throw new ExecutionException(cause);
    }

    @SuppressWarnings("unchecked")
    @Override
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        // Blocking with timeout to get results
        Object result = this.result;
        // Blocking wait with timeout if Promise is not finished
        if (!isDone0(result)) {
            if (!await(timeout, unit)) {
                // Wait timeout throws TimeoutException directly
                throw new TimeoutException();
            }
            // Update Result Temporary Variable
            result = this.result;
        }
        // Return null directly when result is an instance of SUCCESS or UNCANCELLABLE property
        if (result == SUCCESS || result == UNCANCELLABLE) {
            return null;
        }
        // If the result is of type CauseHolder, get the cause property held in it, and possibly null
        Throwable cause = cause0(result);
        if (cause == null) {
            // Return of result value after converting type on the premise of successful execution
            return (V) result;
        }
        // Cancelation, throw CancellationException
        if (cause instanceof CancellationException) {
            throw (CancellationException) cause;
        }
        // All remaining cases are encapsulated as ExecutionException exceptions
        throw new ExecutionException(cause);
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        // CAS updates result to CANCELLATION_CAUSE_HOLDER, the expected value of result must be null
        if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
            // Determine if notifications from waiting threads are required
            if (checkNotifyWaiters()) {
                // Notify the listener to call back
                notifyListeners();
            }
            return true;
        }
        return false;
    }

    private boolean setSuccess0(V result) {
        // Set the result of successful execution, select the SUCCESS property if the join result is null, otherwise use the result
        return setValue0(result == null ? SUCCESS : result);
    }

    private boolean setFailure0(Throwable cause) {
        // Set the result of execution failure, the input is of Throwable type, encapsulated as CauseHolder, and stored in CauseHolder instance's cause property
        return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
    }

    private boolean setValue0(Object objResult) {
        // CAS updates the result to include objResult, and the expected value of the result must be null or UNCANCELLABLE to update successfully
        if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
            // Determine if notifications from waiting threads are required
            if (checkNotifyWaiters()) {
                // Notify the listener to call back
                notifyListeners();
            }
            return true;
        }
        return false;
    }
    
    // Determining whether a notification from a waiting thread is required - actually determining whether a notification listener callback is required
    private synchronized boolean checkNotifyWaiters() {
        // Call Object#notifyAll() to wake up all waiting threads if the number of waiting threads is greater than 0
        if (waiters > 0) {
            notifyAll();
        }
        // Return true if listeners are not empty (that is, there are listeners)
        return listeners != null;
    }
    // ...omit other code...
}    

Basic use of Promise

To use Netty's Proise module, you don't need to introduce all of Netty's dependencies, just netty-common:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-common</artifactId>
    <version>4.1.44.Final</version>
</dependency>

With respect to EventExecutor selection, Netty has prepared a GlobalEventExecutor for global event processing, which can be chosen directly (or implemented by itself or with other implementation classes of EventExecutor, of course):

EventExecutor executor = GlobalEventExecutor.INSTANCE;
Promise<String> promise = new DefaultPromise<>(executor);

Here's a scenario: download a linked resource asynchronously to disk, asynchronously notify the downloaded disk file path when the download is complete, and print the download results to the console when notified.

public class PromiseMain {

    public static void main(String[] args) throws Exception {
        String url = "http://xxx.yyy.zzz";
        EventExecutor executor = GlobalEventExecutor.INSTANCE;
        Promise<DownloadResult> promise = new DefaultPromise<>(executor);
        promise.addListener(new DownloadResultListener());
        Thread thread = new Thread(() -> {
            try {
                System.out.println("Start downloading resources,url:" + url);
                long start = System.currentTimeMillis();
                // Analog Download Time-consuming
                Thread.sleep(2000);
                String location = "C:\\xxx\\yyy\\z.md";
                long cost = System.currentTimeMillis() - start;
                System.out.println(String.format("Successful download of resources,url:%s,Save to:%s,time consuming:%d ms", url, location, cost));
                DownloadResult result = new DownloadResult();
                result.setUrl(url);
                result.setFileDiskLocation(location);
                result.setCost(cost);
                // Notify Result
                promise.setSuccess(result);
            } catch (Exception ignore) {

            }
        }, "Download-Thread");
        thread.start();
        Thread.sleep(Long.MAX_VALUE);
    }

    @Data
    private static class DownloadResult {

        private String url;

        private String fileDiskLocation;

        private long cost;
    }

    private static class DownloadResultListener implements GenericFutureListener<Future<DownloadResult>> {

        @Override
        public void operationComplete(Future<DownloadResult> future) throws Exception {
            if (future.isSuccess()) {
                DownloadResult downloadResult = future.getNow();
                System.out.println(String.format("Download Completion Notification,url:%s,File Disk Path:%s,time consuming:%d ms", downloadResult.getUrl(),
                        downloadResult.getFileDiskLocation(), downloadResult.getCost()));
            }
        }
    }
}

Execute the console output:

Start downloading resources, url:http://xxx.yyy.zzz
 Successfully downloaded resource, url:http://xxx.yyy.z Z z, saved to: C:\xxx\yy\z.md, time consuming: 2000 ms
 Download Completion Notification, url:http://xxx.yyyy.z Z z, File Disk Path: C:\xxx\yyy\z.md, Time-consuming: 2000 ms

Promise can be used in many scenarios, and can be used for synchronous calls in addition to asynchronous notifications. It is designed to be much more flexible than JUC's Future, which extends many new features based on Future and can be used directly by introducing this dependency separately if needed.

Problem with Promise listener stack depth

Sometimes, due to encapsulation or human encoding abnormalities, callbacks from listeners may have chains based on multiple Promise s (reference) Issue-5302 a promise listener chain), so it is possible that the recursive call depth is too large to cause stack overflow, so a threshold to limit the maximum stack depth of recursive calls, temporarily referred to as the stack depth protection threshold, is 8 and can be overridden by the system parameter io.netty.defaultPromise.maxListenerStackDepth.The code block mentioned earlier is pasted here:

private void notifyListeners() {
    EventExecutor executor = executor();
    // Event executors must be event loop types, that is, when executor.inEventLoop() is true, to enable recursive stack depth protection
    if (executor.inEventLoop()) {
        // Gets an instance of InternalThreadLocalMap bound by the current thread, similar to ThreadLocal here
        final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
        // Gets the listener call stack depth for the current thread
        final int stackDepth = threadLocals.futureListenerStackDepth();
        // Listener call stack depth not exceeding threshold MAX_LISTENER_STACK_DEPTH
        if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
            // Set listener call stack depth + 1 before calling notifyListenersNow()
            threadLocals.setFutureListenerStackDepth(stackDepth + 1);
            try {
                notifyListenersNow();
            } finally {
                // Set the listener call stack depth after the call to notifyListenersNow() to the value before the call, that is, to restore the thread's listener call stack depth
                threadLocals.setFutureListenerStackDepth(stackDepth);
            }
            return;
        }
    }
    // If the listener call stack depth exceeds the threshold MAX_LISTENER_STACK_DEPTH, notify the listener directly each time as a new asynchronous task to process
    safeExecute(executor, new Runnable() {
        @Override
        public void run() {
            notifyListenersNow();
        }
    });
}

If we want to simulate an example that triggers listener call stack depth protection, we just need to find a way to recursively call the notifyListeners() method in the same EventLoop type thread.

The most typical example is the setSuccess() that triggered the next Promise listener in the previous Promise listener callback method (simply understood as a doll), draw a picture to understand:

Test code:

public class PromiseListenerMain {

    private static final AtomicInteger COUNTER = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {
        EventExecutor executor = ImmediateEventExecutor.INSTANCE;
        // root
        Promise<String> root = new DefaultPromise<>(executor);
        Promise<String> p1 = new DefaultPromise<>(executor);
        Promise<String> p2 = new DefaultPromise<>(executor);
        Promise<String> p3 = new DefaultPromise<>(executor);
        Promise<String> p4 = new DefaultPromise<>(executor);
        Promise<String> p5 = new DefaultPromise<>(executor);
        Promise<String> p6 = new DefaultPromise<>(executor);
        Promise<String> p7 = new DefaultPromise<>(executor);
        Promise<String> p8 = new DefaultPromise<>(executor);
        Promise<String> p9 = new DefaultPromise<>(executor);
        Promise<String> p10 = new DefaultPromise<>(executor);
        p1.addListener(new Listener(p2));
        p2.addListener(new Listener(p3));
        p3.addListener(new Listener(p4));
        p4.addListener(new Listener(p5));
        p5.addListener(new Listener(p6));
        p6.addListener(new Listener(p7));
        p7.addListener(new Listener(p8));
        p8.addListener(new Listener(p9));
        p9.addListener(new Listener(p10));
        root.addListener(new Listener(p1));
        root.setSuccess("success");
        Thread.sleep(Long.MAX_VALUE);
    }

    private static class Listener implements GenericFutureListener<Future<String>> {

        private final String name;
        private final Promise<String> promise;

        public Listener(Promise<String> promise) {
            this.name = "listener-" + COUNTER.getAndIncrement();
            this.promise = promise;
        }

        @Override
        public void operationComplete(Future<String> future) throws Exception {
            System.out.println(String.format("Monitor[%s]Callback succeeded...", name));
            if (null != promise) {
                promise.setSuccess("success");
            }
        }
    }
}

Since safeExecute() is executed around the bottom, all Promise s above will be callback, where IDEA's advanced breakpoint functionality can be used to add additional logs where breakpoints are entered, with the following output:

MAX_LISTENER_STACK_DEPTH(notifyListenersNow) execution - -
The listener [listener-9] callback succeeded...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow) execution - -
The listener [listener-0] callback succeeded...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow) execution - -
The listener [listener-1] callback succeeded...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow) execution - -
The listener [listener-2] callback succeeded...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow) execution - -
The listener [listener-3] callback succeeded...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow) execution - -
The listener [listener-4] callback succeeded...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow) execution - -
The listener [listener-5] callback succeeded...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow) execution - -
The listener [listener-6] callback succeeded...
safeExecute(notifyListenersNow) Execution ------------------------------------------------------------------------------------------------------
The listener [listener-7] callback succeeded...
safeExecute(notifyListenersNow) Execution ------------------------------------------------------------------------------------------------------
The listener [listener-8] callback succeeded...

The author is a little confused here. If the call stack depth is greater than 8, the excess will be packaged as a Runnable instance and submitted to the event executor for execution. Does not it turn the potential for recursive stack overflow into the potential for memory overflow (because asynchronous tasks may also be backed up, unless task submission is rejected, depending on the implementation of EventExecutor)?

Summary

The Promise tool provided by Netty has been analyzed both in source and in usage. The design concept and code are very useful for reference, and can be used out of the box. It can be directly introduced into the daily code to reduce the labor and risk of repeating wheel making.

Personal Blog

(e-a-20200123 c-3-d)

145 original articles published. 10% praised. 10,000 visits+
Private letter follow

Tags: Netty Attribute Java P4

Posted on Sat, 25 Jan 2020 02:00:57 -0800 by lehara