Source code analysis of DelayQueue take()

  • I used the delay queue in my work and was curious about its internal implementation, so I studied its operation principle and introduced the source code of take() method here

1 take() source code is as follows

public E take() throws InterruptedException {
		// A lock action ensures the security of data acquisition
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
            	//The peek method is to remove the header data, that is, the first data
                E first = q.peek();
                if (first == null)
                	//Note that the queue uses the condition.await() method for air conditioning, which will cause the current thread to release the lock and then join the waiting queue
                    available.await();
                else {
                	//If the first data is not empty, the delay time of the cancellation body is obtained (getDelay() will override the custom add delay time in the message body)
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                    	//If the delay time is less than or equal to 0, it means that the delay time has been reached. Call the poll method to return the message body
                        return q.poll();
                    //If the delay time is not empty, it means that you need to wait for a period of time and then cycle again, so frist is set to empty
                    first = null; 
                    if (leader != null)
                    	//You can go to Baidu if you are interested in using the Leader/Followers mode
                    	//If the leader is not empty, it means that a thread is listening, that is, a thread is preferentially getting the first element of the queue
                    	//Release the lock acquired by the current thread and join the waiting queue, that is, the current thread becomes Followers
                        available.await();
                    else {
                    	//If there is no leader, no thread is listening (no thread is preferentially getting the first element of the queue)
                    	// Set current thread as leader thread 
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                        	//Let the current thread wait for the longest delay time
                            available.awaitNanos(delay);
                        } finally {
                        	//Release leader permission
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
        	// If the leader is empty and there is data in the queue, no other thread is waiting 
            if (leader == null && q.peek() != null)
            	//Thread to wake up sleep
                available.signal();
            //Release lock
            lock.unlock();
        }
    }

Note: at first, I didn't understand why frist was set to Null. Later, I found relevant information on the Internet and learned that if I don't say that first is set to Null, it will lead to memory leakage. The specific reasons are as follows:

  • If the first is not set to Null, thread A arrives, and the queue head element has not reached the dequeue time, set thread A = leader
  • Thread B will block because the leader is not empty. Subsequent threads are the same.
  • If thread A succeeds in getting the column head element after blocking, the column head element should be recycled at this time, but it is still held by thread B C... And will not be recycled all the time, which leads to memory leak (gc has been unable to recycle frist).

Tags: less

Posted on Thu, 23 Apr 2020 10:05:32 -0700 by kweathe