java Concurrent artifact AQS(AbstractQueuedSynchronizer)

AbstractQueuedSynchronizer

  • The full name of AQS is (AbstractQueuedSynchronizer), which is in the java.util.concurrent.locks package

  • The core idea of AQS is that if the requested shared resource is idle, the current thread requesting the resource is set as an effective working thread, and the shared resource is set as a locked state. If the requested shared resource is occupied, then a set of mechanism of thread blocking waiting and lock allocation when it is awakened is needed. AQS is implemented by CLH queue lock, which is about to be temporary Threads that do not get locks join the queue.

  • CLH (Craig, Landin, and Hagersten) queue is a virtual two-way queue. The virtual two-way queue means that there is no queue instance, only the relationship between nodes.

  • In the big white, AQS is based on CLH queue. It modifies the shared variable state with volatile. The thread changes the state symbol through CAS. If it succeeds, it obtains the lock successfully. If it fails, it enters the waiting queue and waits to be awakened.

  • CAS(Compare and Swap) science popularization

    In the literal sense, it means comparison before replacement,

    The cas method has three important parameters: the value to be compared, the expected value and the new value to be modified. If the expected value is the same as the value to be compared, the value to be modified is assigned to the value to be compared

    The pseudo code is as follows:

    Object old; // Value to be compared
    Object new; // Expected value
    Object update;// New value to modify
    if(old == new) {
        old = update;
    }
    
    

    CAS (compare and exchange) is a CPU instruction level operation with only one atomic operation, so it is very fast. Moreover, CAS avoids the problem of requesting the operating system to determine the lock. It does not need to bother the operating system. It can be solved directly in the CPU.

    Operation cas in java:

    sun.misc.Unsafe unsafe = sun.misc.Unsafe.getUnsafe();
    Demo old = new Demo();
    long offset = unsafe.objectFieldOffset
                    (Demo.class.getDeclaredField("id"))
        // If the old.id is 1, change it to 2
    return unsafe.compareAndSwapObject(old, offset, 1, 2);
    
    
  • AQS defines two resource sharing methods: Exclusive (Exclusive, only one thread can execute, such as ReentrantLock) and Share (sharing, multiple threads can execute at the same time, such as Semaphore/CountDownLatch)

    • Exclusive lock API

      • Isheldexclusive(): whether the thread is exclusive of resources. Only condition is needed to implement it.

        aqs does not implement this method. It needs to be implemented manually to determine whether the current thread has obtained resources and return boolean

      • tryAcquire(int): exclusive mode. When trying to get the resource, it returns true if it succeeds, and false if it fails.

        aqs abstract class does not implement this method, so it needs to be overridden

      • tryRelease(int): exclusive. When trying to release a resource, it returns true if it succeeds, and false if it fails.

        aqs abstract class does not implement this method, so it is necessary to override this method to release resources

    • Shared lock API

      • tryAcquireShared(int): sharing mode. Try to get resources. A negative number indicates failure; a 0 indicates success, but there are no remaining resources available; a positive number indicates success, and there are remaining resources.

        aqs abstract class does not implement this method, so it needs to be overridden

      • Tryreleased (int): shared by. Try to release the resource. If it is allowed to wake up after releasing and wait for the node to return true, otherwise return false. aqs abstract class does not implement this method, so it is necessary to override this method to release resources

    • The difference between exclusive api and shared api

      • When a subclass of AQS implements exclusive functions, such as ReentrantLock, whether a resource can be accessed is defined as: as long as the state variable of AQS is not 0 and the thread holding the lock is not the current thread, then the resource is inaccessible.
      • When the subclass of AQS implements the sharing function, such as CountDownLatch, whether the resource can be accessed is defined as: as long as the state variable of AQS is not 0, it means that the resource cannot be accessed.
      • Shared lock for read lock and exclusive lock for write lock in ReentrantReadWriteLock
    • Exclusive lock example

      • For example, ReentrantLock (reentrant exclusive lock): when state is initialized to 0, it indicates the unlocked state. When A thread locks (), it will call * * tryAcquire() * * exclusive lock and put state+1. After that, other threads will fail when they think about tryAcquire again. Until A thread unlocks() until state=0, other threads will have the chance to acquire the lock. Before releasing the lock, A can also acquire the lock repeatedly (state accumulation), which is the concept of reentry. Note: release the lock as many times as you want to acquire it to ensure that the state can return to zero.
    • Shared lock example

      • Take CountDownLatch as an example. The task is executed by N sub threads. State is initialized to N, and N threads execute in parallel. After each thread executes, countDown() once, and state will reduce CAS by one. When all n sub threads are executed, state=0, the main calling thread will be unpark(), and the main calling thread will return from the await() function to continue the following actions.

    Generally speaking, the custom synchronizer is either exclusive or shared. They only need to implement one of the tryacquire tryrelease and tryacquireshared tryreleased shared. However, AQS also supports both exclusive and shared modes of custom synchronizer, such as ReentrantReadWriteLock. In the two ways of acquire() acquiresered(), the thread ignores the interrupt in the waiting queue. Acquireserdinterruptible() / acquireserdinterruptible() supports the response interrupt.

**Note: AQS is a spin lock: * * when waiting for wake-up, you often use the spin (while(!cas())) method to continuously try to acquire the lock until it is successfully acquired by other threads

An exclusive lock of aqs simple DEMO

private static class Sync extends AbstractQueuedSynchronizer {
    // Determine whether it is locked
    protected boolean isHeldExclusively() {
        return getState() == 1;
    }

    // Try to get resources and return immediately. Returns true if successful, otherwise false.
    public boolean tryAcquire(int acquires) {
        assert acquires == 1; // Only one quantity is limited here
        if (compareAndSetState(0, 1)) {//state is set to 1 only when it is 0. It cannot be re entered!
            setExclusiveOwnerThread(Thread.currentThread());//Set as current thread exclusive resource
            return true;
        }
        return false;
    }

    // Try to free the resource and return immediately. true if successful, otherwise false.
    protected boolean tryRelease(int releases) {
        assert releases == 1; // Limited to 1 quantity
        if (getState() == 0)//Now that it is released, it must be in possession. Just for insurance, multi-level judgment!
            throw new IllegalMonitorStateException();
        setExclusiveOwnerThread(null);
        setState(0);//Release resources, give up possession
        return true;
    }
}

aqs shared lock demo:

// Copy to the source code of RocketMQ, and share lock based on boolean
public class BooleanMutex {

    private Sync sync;

    public BooleanMutex() {
        sync = new Sync();
        set(false);
    }

    public BooleanMutex(Boolean mutex) {
        sync = new Sync();
        set(mutex);
    }

    /**
     * Block wait Boolean is true
     *
     * @throws InterruptedException
     */
    public void get() throws InterruptedException {
        sync.innerGet();
    }

    /**
     * Blocking waiting for Boolean is true, allowing setting timeout
     *
     * @param timeout
     * @param unit
     * @throws InterruptedException
     * @throws TimeoutException
     */
    public void get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
        sync.innerGet(unit.toNanos(timeout));
    }

    /**
     * Reset the corresponding Boolean mutex
     *
     * @param mutex
     */
    public void set(Boolean mutex) {
        if (mutex) {
            sync.innerSetTrue();
        } else {
            sync.innerSetFalse();
        }
    }

    public boolean state() {
        return sync.innerState();
    }

    /**
     * Synchronization control for BooleanMutex. Uses AQS sync state to
     * represent run status
     */
    private final class Sync extends AbstractQueuedSynchronizer {

        private static final long serialVersionUID = 2559471934544126329L;
        /**
         * State value representing that TRUE
         */
        private static final int TRUE = 1;
        /**
         * State value representing that FALSE
         */
        private static final int FALSE = 2;

        private boolean isTrue(int state) {
            return (state & TRUE) != 0;
        }

        /**
         * Realize the interface of AQS and get the judgment of shared lock
         */
        @Override
        protected int tryAcquireShared(int state) {
            // If it is true, the lock object can be obtained directly
            // If it is false, enter the blocking queue and wait to be awakened
            return isTrue(getState()) ? 1 : -1;
        }

        /**
         * Realize the interface of AQS and judge the release of shared lock
         */
        @Override
        protected boolean tryReleaseShared(int ignore) {
            // Always return true, which means you can release
            return true;
        }

        boolean innerState() {
            return isTrue(getState());
        }

        void innerGet() throws InterruptedException {
            acquireSharedInterruptibly(0);
        }

        void innerGet(long nanosTimeout) throws InterruptedException, TimeoutException {
            if (!tryAcquireSharedNanos(0, nanosTimeout)) throw new TimeoutException();
        }

        void innerSetTrue() {
            for (; ; ) {
                int s = getState();
                if (s == TRUE) {
                    return; // immediate withdrawal
                }
                if (compareAndSetState(s, TRUE)) {// cas update status, avoid concurrent update true operation
                    releaseShared(0);// Release the lock object and wake up the blocked Thread
                    return;
                }
            }
        }

        void innerSetFalse() {
            for (; ; ) {
                int s = getState();
                if (s == FALSE) {
                    return; // immediate withdrawal
                }
                if (compareAndSetState(s, FALSE)) {// cas update status, avoid concurrent update false operation
                    return;
                }
            }
        }

    }

}

Conclusion:

AQS can be divided into two modes: exclusive lock and shared lock

  • Exclusive lock

    When executing * * acquire(int arg) * * there can only be one thread, and multiple threads will be added to the blocking queue for synchronous execution

    Wake up the next thread when executing * * release() * *

    tryAcquire(int arg) returns a boolean value

  • Shared lock

    When executing * * acquiresharded (int ARG) * *, multiple threads can compete

    If it succeeds in executing * * releaseShared() *, all threads in the queue will be executed

    **releaseShared() * * return only after completion

    tryAcquireShared(int arg) returns a value of type int

-----------------------------------------Advertising time-----------------------------------------

Hello, everyone, welcome to the official account and push the interesting little things everyday!!!

Tags: Programming Java

Posted on Sat, 21 Mar 2020 10:10:49 -0700 by k994519