A quick understanding of Atomic (Atomic integer / Atomic reference / Atomic array / LongAdder)

Catalog

Preface

Ask: is int thread safe?

Demarcation line of basic chapter of Atomic

Atomic integer (base type)

Overall introduction

AtomicInteger

Improvement of the above int type example

Source code analysis

CAS

Atomic reference

Unsafe implementation

Security implementation - using CAS

ABA problems and Solutions

Atomic array

Atomic advanced level chapter boundary

Source code analysis of LongAdder

LongAdder use

Comparative advantage with Atomic

Source code analysis

epilogue

Reference material

Preface

I believe that most developers have read or written concurrent programming code more or less. In addition to Synchronized (if you don't understand, please move to the portal, XXXX), there is another big branch of Atomic. If you haven't heard about it, read the basic chapter first. If you've heard about it, please slide to the bottom to see the advanced chapter and go deep into the source code analysis.

Ask: is int thread safe?

After reading the Synchronized related articles, you should know that it is not safe. Again, use the code to verify its insecurity:

public class testInt {
    static int number = 0;

    public static void main(String[] args) throws Exception {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 100000; i++) {
                    number = number+1;
                }
            }
        };

        Thread t1 = new Thread(runnable);
        t1.start();
        Thread t2 = new Thread(runnable);
        t2.start();

        t1.join();
        t2.join();
        System.out.println("number:" + number);
    }
}

 

Operation result:

In the above example, we define a static variable number with an initial value of 0, and then create and run two threads to each execute 100000 times of auto increment operation. If it is thread safe, the result should be 200000 after two threads execute. However, we find that the final result is less than 200000, which means it is not safe.

As mentioned in the previous Synchronized article, thread safety can be achieved by adding the Synchronized keyword to the code number=number+1. But it costs a lot of resources, so let's take a look at another method of thread safety, Atomic.

Demarcation line of basic chapter of Atomic

Atomic integer (base type)

Overall introduction

Atomic is the general name of a series of packages provided by jdk. This family includes atomic integer (atomiclong, atomicboolean), atomic reference (atomicreference, atomicstampededreference, atomicmarkablereference), atomic array (AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray), atomicintegfieldupdater, atomiclongfieldupdater, AtomicReferenceFieldUpdater).

AtomicInteger

The functions of AtomicInteger, AtomicBoolean and AtomicLong are similar. Let's take AtomicInteger as the main method to analyze atomic classes.

Let's first look at the API s and their specific functions:

public class testInt {

    public static void main(String[] args) {
        //Defines a variable of type AtomicInteger with a value of 1
        AtomicInteger i = new AtomicInteger(1);
        //The incrementAndGet method first adds 1 and then returns, so prints 2. At this time, i is 2
        System.out.println(i.incrementAndGet());
        //The getandincrease method returns the value first and then adds 1, so print 2. At this time, i is 3
        System.out.println(i.getAndIncrement());
        //The get method returns the current i value, so print 3, where i is 3
        System.out.println(i.get());
        //If the parameter is positive, it is added. The getAndAdd method returns the value first and then adds 666, so print 3. At this time, i is 669
        System.out.println(i.getAndAdd(666));
        //If the parameter is negative, it is subtracted. The getAndAdd method returns the value first and then subtracts 1, so it prints 669. At this time, i is 668
        System.out.println(i.getAndAdd(-1));
        //If the parameter is a positive number, it is added. The addAndGet method first adds 666 and then returns a value, so 1334 is printed. At this time, i is 1334
        System.out.println(i.addAndGet(666));
        //If the parameter is negative, it is subtracted. The addAndGet method first subtracts - 1 and then returns a value, so print 1333. At this time, i is 1333
        System.out.println(i.addAndGet(-1));
        //getAndUpdate method IntUnaryOperator parameter is an arrow function, which can write any operation later, so print 1333, and i is 13331 at this time
        System.out.println(i.getAndUpdate(x -> (x * 10 + 1)));
        //The final print i is 13331
        System.out.println(i.get());
    }
} 

 

Execution result:

Improvement of the above int type example

public class testInt {
    //1. Define AtomicInteger type variable number with initial value of 0
    static AtomicInteger number = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 100000; i++) {
                    //2. Call the incrementAndGet method to add 1
                    number.incrementAndGet();
                }
            }
        };

        Thread t1 = new Thread(runnable);
        t1.start();
        Thread t2 = new Thread(runnable);
        t2.start();

        t1.join();
        t2.join();
        System.out.println("number:" + number.get());
    }
}

 

We can see that the running result is the correct 200000, which shows that the assurance of AtomicInteger proves the thread safety, that is, in the process of multithreading, the running result is still correct. But there's an ABA problem. Let's talk about the atomic reference. First, set up a flag.

Source code analysis

Take the incrementAndGet method as an example to see how the underlying layer is implemented. The incrementAndGet method in the atomicinter class calls the getAndAddInt method of the Unsafe class.

public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

Let's take a look at the getAndAddInt method. There is a loop in it. If the direct value is compareAndSwapInt and the return value is true, the loop will end. CAS has to be mentioned here, which is the solution to the problem of multithreading security.

public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
}

CAS

The colleagues of thread 1 and thread 2 get the main memory variable value 0. Thread 1 adds 1 and writes it into the main memory. Now, the main memory variable value 1 and thread 2 add 2 and try to write it into the main memory. At this time, it can't be written into the main memory, because it will cover the operation of thread 1. The specific process is as follows.

CAS is when thread 2 tries to write to memory. By comparing and setting (CompareAndSet), it is found that the current value of main memory is 1, which is different from the value 0 he just started to read. Therefore, he will give up this modification, reread the latest value of main memory, retry the specific logical operation of end of line 2, and try to write to main memory again. If thread 1 modifies the main memory again at this time, thread 2 finds that the current value of main memory is not the same as expected, so it will give up this modification, read the latest value of main memory again, try again and try to write it into main memory. We can find that this is a process of repeated comparison, that is, until the initial value is the same as the expected value, it will not be written to the main memory, otherwise, it will always read the retry cycle. This is what the for loop above means.

CAS is implemented by CPU instruction. If the operating system does not support CAS, it will still be locked. If the operating system supports CAS, it will use atomic CPU instruction.

Atomic reference

In daily use, we need atomic operations not only for the above basic types, but also for some complex types, so we need AtomicReference.

Unsafe implementation

Look at the insecure BigDecimal types first:

public class testReference {
    static BigDecimal number = BigDecimal.ZERO;

    public static void main(String[] args) throws Exception {

        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 1000; i++) {
                   number=number.add(BigDecimal.ONE);
                }
            }
        };

        Thread t1 = new Thread(runnable);
        t1.start();

        Thread t2 = new Thread(runnable);
        t2.start();

        t1.join();
        t2.join();

        System.out.println(number);
    }
} 

 

The running result is as shown in the figure below. We can see that two threads add 1 operation for 1000 times of self circulation, and the final result should be 2000, but the result is less than 2000.

Security implementation - using CAS

public class testReference {
    //Define the AtomicReference type BigDecimal variable
    static AtomicReference<BigDecimal> number = new AtomicReference<BigDecimal>(BigDecimal.ZERO);

    public static void main(String[] args) throws Exception {

        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 1000; i++) {
                    //Manual write cycle + CAS judgment
                    while(true){
                        BigDecimal pre=number.get();
                        BigDecimal next=number.get().add(BigDecimal.ONE);
                      if(number.compareAndSet(pre,next))  {
                          break;
                      }
                    }
                }
            }
        };

        Thread t1 = new Thread(runnable);
        t1.start();

        Thread t2 = new Thread(runnable);
        t2.start();

        t1.join();
        t2.join();

        System.out.println(number.get());

    }
}

 

The operation results are as follows:

ABA problems and Solutions

In the above CAS process, you can know whether the update is successful through value comparison. If thread 1 is increased first and then decreased by 1, then the main memory is still the original value, that is, thread 2 can be updated successfully. But the logic is wrong. Thread 1 has been modified, and thread 2 cannot be updated directly.

Code:

public class testInt {

    static AtomicInteger number = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {


        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                int a = number.get();
                System.out.println("start number:" + a);
                try {
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(number.compareAndSet(a, a++));


            }
        });
        t1.start();
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("Start add operation");
                int a = number.incrementAndGet();
                System.out.println("current number:" + a);
                int b = number.decrementAndGet();
                System.out.println("current number:" + b);
            }
        });
        t2.start();

        t1.join();
        t2.join();
    }
} 

 

Let's see that thread 2 has performed a series of operations on it, but it prints true at the end, indicating that it can be updated successfully. This is obviously not right.

Then we can use atomicstampededreference to add a version number to it. When thread 1 starts to read the main memory, it gets the values 0 and version 1. Thread 2 also gets these two values. When thread 1 adds 1 and subtracts 1, each version adds 1. Now the main memory value is 0 and version 2, while thread 2 tries to write the data with the expected value 0 and version 1 into the main memory. At this time, the update fails due to different versions. Let's try the following with code:

public class testInt {

    static AtomicStampedReference<Integer> number = new AtomicStampedReference<Integer>(0, 0);

    public static void main(String[] args) throws Exception {


        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                int a = number.getReference();
                int s = number.getStamp();
                System.out.println("start number:" + a + ",stamp:" + s);
                try {
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(number.compareAndSet(a, a + 1, s, s + 1));


            }
        });
        t1.start();
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("Start add operation");
                int a = number.getReference();
                int s = number.getStamp();
                number.compareAndSet(a, a + 1, s, s + 1);
                System.out.println("current number:" + a + ",stamp:" + (s + 1));
                a = number.getReference();
                s = number.getStamp();
                number.compareAndSet(a, a - 1, s, s + 1);
                System.out.println("current number:" + a + ",stamp:" + (s+1));
            }
        });
        t2.start();

        t1.join();
        t2.join();
    }
} 

We can see that each operation will update the stamp (version number). In the final comparison, not only the value but also the version number will be compared, so it cannot be updated successfully, false

Atomic array

AtomicIntegerArray, AtomicLongArray and AtomicReferenceArray are similar, so take AtomicIntegerArray for example, we can regard the following AtomicIntegerArray as an array of atomicinterray type, its bottom layer is very similar, so we won't write in detail.

AtomicIntegerArray  array = new AtomicIntegerArray(10);
array.getAndIncrement(0);   // Increase element 0 by 1 atomically

 

AtomicInteger[]  array = new AtomicInteger[10];
array[0].getAndIncrement();  // Increase element 0 by 1 atomically

Field updaters and atomic accumulators are relatively simple, not to mention here.  

Atomic advanced level chapter boundary

Source code analysis of LongAdder

LongAdder use

LongAdder is newly added after jdk1.8, so why add it? This question will be answered next. Let's see how to use it first.

public class testLongAdder {
    public static void main(String[] args) throws Exception {
        LongAdder number = new LongAdder();
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for (int j = 0; j < 10000; j++) {
                    number.add(1L);
                }
            }
        };
        Thread t1 = new Thread(runnable);
        Thread t2 = new Thread(runnable);
        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("number:" + number);
    }
}

We can see that the use of LongAdder is roughly the same as that of AtomicLong. Two threads, thread1 and thread2, are used to increase the number value 10000 times, and the last number is the correct 20000.

Comparative advantage with Atomic

That's the problem. Since AtomicLong can complete the thread safe operation of number under multithreading, why do we need LongAdder? Let's compare the two codes first, and see the performance gap under the premise that the results are correct.

public class testLongAdder {
    public static void main(String[] args) {
       //1 thread for 1 million auto increment operations
        test1(1,1000000);
      //10 threads, 1 million auto increment operations
        test1(10,1000000);
     //100 threads, 1 million auto increment operations
        test1(100,1000000);
    }

    static void test1(int threadCount,int times){
        long startTime=System.currentTimeMillis();
        AtomicLong number1=new AtomicLong();
        List<Thread> threads1=new ArrayList<>();
        for(int i=0;i<threadCount;i++) {
            threads1.add(new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < times; j++) {
                        number1.incrementAndGet();
                    }
                }
            }));
        }
        threads1.forEach(thread -> thread.start());
        threads1.forEach(thread ->{
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } );

        long endTime=System.currentTimeMillis();
        System.out.println("AtomicLong:"+number1+",time:"+(endTime-startTime));

        LongAdder number2=new LongAdder();
        List<Thread> threads2=new ArrayList<>();
        for(int i=0;i<threadCount;i++) {
            threads2.add(new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < times; j++) {
                        number2.add(1);
                    }
                }
            }));
        }
        threads2.forEach(thread -> thread.start());
        threads2.forEach(thread ->{
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } );

        System.out.println("LongAdder:"+number2+",time:"+(System.currentTimeMillis()-endTime));

    }
} 

The above code compares the time spent by AtomicLong and LongAdder after 100 times of auto increment operation by 1 thread, 10 threads and 100 threads. By printing the statement, we find that on the basis that both number1 and number2 are correct in the end, LongAdder takes one order of magnitude less time than AtomicLong.

Source code analysis

Then why does this happen? We need to analyze it from the source level. Why is AtomicLong inefficient? Because if the number of threads is large, especially in the case of high concurrency, for example, if 100 threads want to operate on the object at the same time, only one thread will acquire the lock, and the other 99 threads may idle, and keep cycling until the thread releases the lock. If the lock is released after the thread operation, the other 99 threads compete again, and only one thread acquires the lock. The other 98 threads still idle until the lock is released. In this way, CAS operation will waste a lot of resources on idling, which makes AtomicLong run more and more slowly.

AtomicLong is that multiple threads operate on the same value value, resulting in too many spins of multiple threads and performance degradation. In the case of no competition, LongAdder, like AtomicLong, operates on the same base. When there is a competition relationship, it adopts the method of breaking up into parts, exchanging space for time, using an array of cells to split a value into the array of cells. When multiple threads need to operate on value at the same time, they can hash the thread id to get the hash value, map to a subscript of the array cells according to the hash value, and then carry out the auto increment operation on the value corresponding to the subscript. When all threads are finished, all the values of the array cells and the uncompetitive value base are added as the final result.

First, let's look at the fields in LongAdder and find that there are no such fields, mainly in the inherited striped64 class, which has the following four main variables.

 /** CPU Quantity, that is, the maximum length of the cells array*/
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    /**
     *cells Array, power of 2, 2,4,8,16......, convenient for later bit operation
     */
    transient volatile Cell[] cells;

    /**
     * Base value, mainly used in the case of no competition, updated through CAS.
     */
    transient volatile long base;

    /**
     * The lock used to resize (enlarge) cells and create them.
     */
    transient volatile int cellsBusy;

 

Here is the start of the add method.

 public void add(long x) {
        //as: reference to cells array
        //b: Base value of base
        //v: Expectation
        //m: Cell array size
        //a: Cell hit by current array
        Cell[] as; long b, v; int m; Cell a;
        //as is not empty (cells have been initialized, indicating that other threads have initialized before) or CAS operation is not successful (there is competition between threads)
        if ((as = cells) != null || !casBase(b = base, b + x)) {
        //Initialize uncompetited. True means uncompetitive (because there are two cases, initialize first, and then modify later, you can distinguish the two cases)
        boolean uncontended = true;
        //as equals null(cells not initialized)
        //Or the value corresponding to the subscript hashed out by thread id is null (cell is equal to null), getprobe() & M function is to get the subscript, and the underlying logic is bit operation
        //Or if the update fails to be false, the competition will occur, and if the update fails to be true, the competition will be true
        if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
        //Enter if to indicate that the update of case failed, or the update of a cell failed, or the cell is empty, or the cell is empty
                longAccumulate(x, null, uncontended);
        }
    }

 

 

 


 

Call the longAccumulate method of Stripped64 from the LongAdder, mainly to initialize the cells, expand the cells, and multiple threads hit the competition operation of a cell at the same time.

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        //x: The value of add, fn: null,wasUncontended: whether there is competition, true means there is competition, false means there is no competition
        int h;//hash value of thread
        //If the thread is 0, it is the first time to enter, so ThreadLocalRandom forces initialization of thread id, and then hash es it
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); 
            h = getProbe();
            wasUncontended = true;
        }
        //Expansion intention: false means no expansion; true means possible expansion
        boolean collide = false; 
        //Dead cycle               
        for (;;) {
            //As: reference to cells array
            //a: cell hit by current thread
            //n: Length of cells
            //v: The value of the cell hit by the current thread
            Cell[] as; Cell a; int n; long v;
            //cells not empty
            if ((as = cells) != null && (n = as.length) > 0) {
                //The cell hit by the current thread is empty. The following logic is to add a new cell
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                //Competition
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                //No competition, try to modify the cell value corresponding to the current thread, and successfully jump out of the loop
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                //If n is greater than the maximum number of CPU s, the capacity cannot be expanded
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                //The lock is acquired, and the capacity is expanded to the power of 2,
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];//Shift left one bit operator, double the number
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
            //Cell is equal to null, and the lock is acquired, initialization begins, creation ends, release locks, and cycle continues
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }

 

epilogue

It's over. Sprinkle flowers. This article mainly talks about some uses of Atomic, including Atomic integer, Atomic long, Atomic Boolean, Atomic reference, and the advantages of LongAdder after 1.8, source code analysis. Some CAS and ABA problems are introduced and solved in the process.

If you think it's OK, please give me a compliment A kind of Your recognition is the driving force of my writing!

If you think something is wrong, please comment.

All right, bye.

Reference material

Advanced Java multithreading (17) -- the atomic framework of J.U.C: LongAdder

CAS principle

Java 8 Performance Improvements: LongAdder vs AtomicLong

In depth understanding of AtomicInteger

Detailed explanation of atomic operation class AtomicInteger

Play with Java concurrency tools, master JUC, and become a concurrency generalist

 

167 original articles published, 72 praised, 160000 visitors+
Private letter follow

Tags: less Java Programming JDK

Posted on Sun, 15 Mar 2020 22:38:25 -0700 by rkm11