Look at AQS blocking queue and condition queue

The previous article briefly introduced AQS. We probably know that AQS is a framework, which implements many functions (such as queue rules, waking up threads in nodes, etc.). If we want to use AQS, we only need to implement some methods (such as tryAcquire, etc.)! This time, we mainly talk about the queue entry rules and conditional variables of blocking queue in AQS;

 

1, AQS entry rules

Let's carefully analyze how AQS maintains the blocking queue, and how to throw the thread that fails to compete for lock into the blocking queue when acquiring resources exclusively?

Let's look at the acquire method. First, we will call the tryAcquire method implemented by the subclass to try to modify the state. If the modification fails, it means that the thread fails to compete for the lock, so we will go to the later condition;

This addWaiter method encapsulates the current thread as a node of type Node.EXCLUSIVE and then throws it into the blocking queue;

 

The first time we haven't blocked the queue, we will go to the enq method. Let's take a closer look at the enq method

 

In the enq() method, when we first enter this method, as shown in Figure 1 below, tail and head both point to null;

In the first loop, we will go to figure 2 first, and then judge whether the node t points to is null. If so, we will update the node with CAS. This CAS can be seen as: the head node is null, we will update the head node to a sentinel node (the sentinel node is new Node()), and then we will point the tail to head, which is figure 3

 

The second for loop: go to the above else statement and set the previous node of the new node as the sentinel node;

 

Then there is the CAS update node. Here, CAS means: if the last node tail points to the same as t, then point the tail to the node node

 

Finally, set the next node of t to node, as shown in the figure below, ok

 

2, Use of AQS condition variables

What are conditional variables? When we started to introduce AQS, we didn't say that there was another internal class, ConditionObject. Do you remember the park and unpark methods in Unsafe? This ConditionObject encapsulates these two methods once. The await() and signal() methods are more flexible. They can create multiple condition variables. Each condition variable maintains a condition queue (that is, a one-way linked list. You can see that the attribute of Node is nextWaiter);

Note: a condition queue is maintained in each condition variable

For example, it is shown as follows:;

package com.example.demo.study;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class Study0201 {

    public static void main(String[] args) throws InterruptedException {
        // Create lock object
        ReentrantLock lock = new ReentrantLock();
        // Create condition variable
        Condition condition = lock.newCondition();
        // Two threads will be created below, in which the lock will be acquired and the lock will be released
        Thread thread1 = new Thread(() -> {
            lock.lock();
            try {
                System.out.println("await begin");
                // Note that if the await method of the condition variable is called here, the current thread will be lost and blocked in the condition queue in the condition condition variable
                condition.await();
                System.out.println("await end");
            } catch (InterruptedException e) {
                //
            } finally {
                lock.unlock();
            }

        });

        Thread thread2 = new Thread(() -> {
            lock.lock();
            try {
                System.out.println("signal begin");
                // Wake up a thread in the internal queue of the condition variable
                condition.signal();
                System.out.println("signal end");
            } finally {
                lock.unlock();
            }
        });
        thread1.start();
        Thread.sleep(500);
        thread2.start();
    }
}

 

You can also create multiple condition variables, as shown below. Each condition variable maintains a condition queue:

package com.example.demo.study;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class Study0201 {

    public static void main(String[] args) throws InterruptedException {
        // Create lock object
        ReentrantLock lock = new ReentrantLock();
        // Create condition variable 1
        Condition condition1 = lock.newCondition();
        //Conditional variable 2
        Condition condition2 = lock.newCondition();
        
        // Create two threads below, in which the lock will be acquired and the lock will be released
        Thread thread1 = new Thread(() -> {
            lock.lock();
            try {
                System.out.println("await begin");//1
                condition1.await();
                System.out.println("await end");//5
                
                System.out.println("condition2---signal---start");//6
                condition2.signal();
                System.out.println("condition2---signal---endend");//7
            } catch (InterruptedException e) {
                //
            } finally {
                lock.unlock();
            }

        });

        Thread thread2 = new Thread(() -> {

            lock.lock();
            try {
                System.out.println("signal begin");//2
                condition1.signal();
                System.out.println("signal end");//3
                
                System.out.println("condition2---await---start");//4
                condition2.await();
                System.out.println("condition2---await---end");//8
            } catch (InterruptedException e) {
                //
            } finally {
                lock.unlock();
            }

        });

        thread1.start();
        Thread.sleep(500);
        thread2.start();

    }

}

 

3. Entering condition variable

Let's take a look at the above way to get condition variables: Condition condition1 = lock.newCondition(), open the newCondition method, and finally create a ConditionObject instance; this class is the internal class of AQS, through which you can access the internal properties and methods of AQS;

Note: before calling the await and signal methods, you must obtain the lock first

 

Then let's take a look at the await method of the condition variable. As shown in the figure below, we can go to the addConditionWaiter() method

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //Create a new Node.CONDITION node and put it at the end of the condition queue
    Node node = addConditionWaiter();
    //Release the lock acquired by the current thread
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //Call the park() method to block and suspend the current thread
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

 

private Node addConditionWaiter() {
    Node t = lastWaiter;
    //The first time I come in, the lastWaiter is null, that is, t = null. I will not enter the if statement
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //Create a node of type Node.CONDITION, and then point the first node firstWaiter and the last node to the newly created node in the following if
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

 

By the way, take a look at the signal method:

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //The conditional queue removes the first node, drops the node into the blocking queue, and then activates the thread
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

  

Let's think about the relationship between blocking queue and conditional queue in AQS?

1. When multiple threads call the lock.lock() method, only one thread gets the lockable, and other threads will be turned into Node nodes and thrown into the blocking queue of AQS, and CAS spins to acquire the lock;

2. When the await() method of the condition variable corresponding to the thread obtaining the lock is called, the thread will release the lock and turn the current thread into a Node node and put it into the condition queue corresponding to the condition variable;

3. At this time, there will be another thread in the AQS blocking queue that can be locked. If this thread happens to call the await() method of the corresponding condition variable, it will repeat step 2 again, and then another thread in the blocking queue will be locked

4., then another thread calls the conditional variable's signal() or signalAll() method to move one or all nodes in the conditional queue to the AQS blocking queue, and then calls the unpark method to authorize it, so that it can wait to get the lock.

A lock corresponds to a blocking queue, but it corresponds to multiple condition variables. Each condition variable corresponds to a condition queue. In these two queues, the Node node stores the thread and its status, as shown in the following figure:

Tags: Java Attribute

Posted on Sun, 02 Feb 2020 09:56:53 -0800 by wvwisokee