JAVA and contract: PriorityBlockingQueue

The Chinese name of PriorityBlockingQueue can be called priority blocking queue. Its internal structure is the smallest heap of array structure, which can ensure that the smallest node in the queue (the smallest element returned by the comparer) is taken out every time. But it is not an ordered b in the array. It only ensures that the first node of the array is the smallest, that is to say, the first node with the highest priority will be taken out first.

The figure below is a schematic diagram of a minimum heap. It can be said to be a binary tree structure. Only by ensuring that the parent node is larger than the child node, can the minimum heap be satisfied. Then start from the parent node, from top to bottom, from left to right, and put the node into the array.

Reference resources: Diagram minimum heap formation - in array

1, Internal code structure

	 /** Initial queue capacity */
	 private static final int DEFAULT_INITIAL_CAPACITY = 11;

    /** Maximum queue capacity, with capacity limit*/
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    /** Save an array of nodes */
    private transient Object[] queue;

    /** The size of the queue and how many nodes are saved */
    private transient int size;

    /** comparator */
    private transient Comparator<? super E> comparator;

    /** Reentrant lock */
    private final ReentrantLock lock;

    /** The waiting queue of lock. When the queue is empty, the acquisition needs to wait */
    private final Condition notEmpty;

    /** Expanding lock. If it is 1, the array is expanding */
    private transient volatile int allocationSpinLock;

	

2, In queue

As can be seen from the following code, the incoming method will call the offer() method at the end of the queue to control the concurrency through the reentry lock. The maximum capacity of the queue is Integer.MAX_VALUE - 8. Generally speaking, the call blocking method will not be blocked by the capacity limit, but it may wait for the lock or capacity expansion. Entering the queue keeps the data structure of the smallest heap.

	public boolean add(E e) {
        // Call the offer method
        return offer(e);
    }

	public void put(E e) {
        offer(e); // never need to block
    }

	public boolean offer(E e) {
		// The queued node is not allowed to be empty
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        // Get lock required for queue in operation
        lock.lock();
        int n, cap;
        Object[] array;
        // Expand if the number of nodes is greater than or equal to the array capacity
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            // How to select the minimum heap according to whether there is comparator
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            // Wake up the waiting thread of the get node
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

	/** Queue expansion method */
	private void tryGrow(Object[] array, int oldCap) {
		// Here, release the lock first, and then get it again. Because the capacity expansion is time-consuming, locking will affect other threads' operations on the queue, such as operating out of the queue
        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;
        // CAS operation allocationSpinLockOffset can also be used as a locking method, and other threads cannot be expanded
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
            	// Double + 2, or 1.5 times
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                // Here we can see that the maximum capacity of the queue is limited to Integer.MAX_VALUE - 8
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                // If the new capacity is larger than the old capacity, and the array is not overwritten, expand the capacity
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
        // If newArray is empty, it means that other threads are expanding capacity, so avoid cpu time slice,
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        // Need to lock again before overwriting array
        lock.lock();
        // We need to judge whether we need to expand the capacity again and whether the array is covered again to ensure the thread safety of the expansion
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }

	/** No heap method to initialize comparison */
	private static <T> void siftUpComparable(int k, T x, Object[] array) {
		// Object must be a compatible implementation class
        Comparable<? super T> key = (Comparable<? super T>) x;
        // Here is an upward comparison to find the right insertion point
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (key.compareTo((T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }

	/** The logic of the heap entry method with comparator is the same as the above method, but comparator is used for comparison */
    private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                       Comparator<? super T> cmp) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (cmp.compare(x, (T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = x;
    }

3, Out of queue

	public E poll() {
        final ReentrantLock lock = this.lock;
        // Lock up
        lock.lock();
        try {
        	// Outgoing queue
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

	public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
        	// Wait if data is not available
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }

	/** peek It also takes the lock and returns the first node of the array */
	public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (size == 0) ? null : (E) queue[0];
        } finally {
            lock.unlock();
        }
    }

	private E dequeue() {
        int n = size - 1;
        if (n < 0)
            return null;
        else {
            Object[] array = queue;
            E result = (E) array[0];
            E x = (E) array[n];
            // Array last position set to null
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            // Different methods are called depending on whether there is a comparer
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
            return result;
        }
    }

	/** After the first node is taken out, the later node is lifted up */
	private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
        if (n > 0) {
            Comparable<? super T> key = (Comparable<? super T>)x;
            // The binary tree principle determines that only half of the traversal is enough
            int half = n >>> 1;           // loop while a non-leaf
            // This traversal is to raise the child nodes that meet the minimum heap requirements
            while (k < half) {
                int child = (k << 1) + 1; // assume left child is least
                Object c = array[child];
                int right = child + 1;
                if (right < n &&
                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                    c = array[child = right];
                if (key.compareTo((T) c) <= 0)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = key;
        }
    }

Four, traversal

As can be seen from the following traversal method, it just copies the original array and returns it to the iterator object, so it will not guarantee order.

 	public Iterator<E> iterator() {
        return new Itr(toArray());
    }

	public Object[] toArray() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return Arrays.copyOf(queue, size);
        } finally {
            lock.unlock();
        }
    }

Five, delete

Here we will explain the deleted code, and we can understand the principle of the above access queue. Suppose, as shown in the figure below, we want to delete the 26 node. According to the above queue out principle, we will put the 14 node in the position of 26, and then put the 14 node in the position of 16 according to the queue in principle.

	public boolean remove(Object o) {
        final ReentrantLock lock = this.lock;
        // Lock up
        lock.lock();
        try {
            int i = indexOf(o);
            if (i == -1)
                return false;
            removeAt(i);
            return true;
        } finally {
            lock.unlock();
        }
    }

	private void removeAt(int i) {
        Object[] array = queue;
        int n = size - 1;
        if (n == i) // removed last element
            array[i] = null;
        else {
            E moved = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            // First, the minimum reactor requirements are guaranteed to be met downward
            if (cmp == null)
                siftDownComparable(i, moved, array, n);
            else
                siftDownUsingComparator(i, moved, array, n, cmp);
			// And then up to the minimum reactor
            if (array[i] == moved) {
                if (cmp == null)
                    siftUpComparable(i, moved, array);
                else
                    siftUpUsingComparator(i, moved, array, cmp);
            }
        }
        // Queue size minus one
        size = n;
    }

Six, summary

PriorityBlockingQueue is rarely used directly in business development. Understanding its principle is very helpful for later learning of timed thread pool.

Published 52 original articles, won praise 7, visited 8101
Private letter follow

Posted on Thu, 16 Jan 2020 06:11:46 -0800 by ragtek