[JDK source code analysis] through source code analysis, CyclicBarrier

Preface

CyclicBarrier what is it? A synchronization helper class that allows a group of threads to wait for each other until they reach a common barrier point. It's similar to a meeting between friends in the middle of the day, when all the friends arrive, they start drinking and eating.

Source code

CyclicBarrier properties and constructors

public class CyclicBarrier {
    // mutex
    private final ReentrantLock lock = new ReentrantLock();
    // Conditional wait
    private final Condition trip = lock.newCondition();
    // Number of participants
    private final int parties;
    // Tasks to be performed when the fence is released
    private final Runnable barrierCommand;
    // Used to indicate whether the fence is released or reset
    private Generation generation = new Generation();
    // Number of parameters waiting
    private int count;

    // structure parties Fences for participants
    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    // structure parties Fences for participants,And set the tasks to be performed when the fence is released
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }   
}    

CyclicBarrier method

1. await method

Wait until all participants have called the await method on this fence or timeout

    public public int int await() await() throws InterruptedException, BrokenBarrierException {
        throws InterruptedException, BrokenBar try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

    // Set timeout
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }   

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        // Assign a value to the fence object lock
        final ReentrantLock lock = this.lock;
        // Lock up
        lock.lock();
        try {
            final Generation g = generation;
            // Barrier state
            if (g.broken)
                throw new BrokenBarrierException();
            // Thread interrupt
            if (Thread.interrupted()) {
                // Set fence status to corrupt, reset remaining waiting threads, and wake up waiting threads
                breakBarrier();
                throw new InterruptedException();
            }
            // Every call await Reduce once count number
            int index = --count;
            if (index == 0) {  // tripped
                // All participants have reached the fence
                // Initialize the execution task state. It can be seen that the execution task set by the constructor is executed by the last thread that reaches the fence position
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // Reset fence and wake all threads
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        // command Execution error
                        // Task execution error set fence state to corrupt, reset remaining waiting threads and wake waiting threads
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    // Timeout not set
                    if (!timed)
                        // Thread enters condition queue and waits
                        trip.await();
                    else if (nanos > 0L)
                        // Thread enters timeout condition queue and waits
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    // Returns the order in which the current thread arrives. The larger the number, the earlier it arrives
                    return index;

                if (timed && nanos <= 0L) {
                    // Timeout sets fence state to corrupt, resets remaining waiting threads, and wakes waiting threads
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

In my article, signalAll called by breakBarrier method and trip.await and trip.awaitNanos in dowait method [JDK source code analysis] analyze AbstractQueuedSynchronizer in depth through source code Blog has analysis, no more details here.

   // Set fence status to corrupt, reset remaining waiting threads, and wake up waiting threads
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        // Wake all threads
        trip.signalAll();
    }
    // Reset the fence and use it again next time
    private void nextGeneration() {
        // Wake all threads
        trip.signalAll();
        count = parties;
        generation = new Generation();
    }

2. reset method

Reset the fence to its original state and open a new fence

    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

This is the end of the source code analysis of CyclicBarrier

usage

For example, Zhang Guo and sister-in-law Zhang set up an appointment to eat hot pot in the hot pot shop. After the two arrived, they asked the boss to serve. Here's an example:

        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2,
                () -> System.out.println(Thread.currentThread().getName() + "Shout: boss, we are all here. Serve the dishes!"));

        new Thread(() ->{

            System.out.println("Zhang Guo is on his way to the hot pot shop...");
            try {
                Thread.sleep(System.currentTimeMillis() % 1000);
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }, "Zhang Guo").start();

        new Thread(() ->{
            System.out.println("Sister Zhang is on her way to the hot pot shop...");
            try {
                Thread.sleep(System.currentTimeMillis() % 1000);
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }, "Zhang sister").start();

 

Print results

Zhang Guo is on his way to the hot pot shop
 Sister Zhang is on her way to the hot pot shop
 Zhang Guo yells: boss, we're all here. Let's serve!

summary

When multiple threads need to wait for each other, using the CyclicBarrier tool is a better choice.

Tags: Java IE JDK

Posted on Fri, 31 Jan 2020 08:18:23 -0800 by LawsLoop