A brief introduction and application of Disruptor

Preface

Recently, I have been busy with my work. In my work project, I saw that many people have implemented a set of data task processing mechanism by themselves. I feel a bit disordered, and it is also convenient for other people's follow-up maintenance. So I came up with a data processing mode, that is, the mode of producer, buffer queue and consumer, to unify everyone's implementation logic.

The following is a demonstration of the basic use of Disruptor. Dependency needs to be introduced in use

<dependency>
  <groupId>com.lmax</groupId>
  <artifactId>disruptor</artifactId>
  <version>3.4.2</version>
</dependency>

Name explanation

  • Ring Buffer

    The cache of the environment, version 3.0 was previously considered a major member of the Disruptor. After version 3.0, Ring Buffer is only responsible for storing and updating data through the event mode of Disruptor. In some advanced application scenarios, Ring Buffer can be completely replaced by user-defined implementation.

  • Sequence

    Disruptor uses Sequence as a way to determine the location of a specific component. Each consumer (event processor) maintains a Sequence just like the disruptor itself. Most concurrent code relies on the movement of these Sequence values, so sequences support many of the current features of AtomicLong. In fact, the only real difference between the two is that sequences contain additional functionality to prevent erroneous sharing between sequences and other values.

  • Sequencer

    Sequencer is the real core. The two implementations of the interface (single producer, multi consumer) implement all the concurrent algorithms for fast and correct data transfer between producers and consumers.

  • Sequence Barrier

    The sequence barrier is generated by the Sequencer and contains references to the Sequencer and any sequence that depends on the consumer. It contains the logic to determine if there are any events for the consumer to process.

  • Wait Strategy

    The wait policy determines how consumers will wait for messages generated by producers, and the Disruptor places the messages in the Event.

  • Event

    The unit of data from producer to consumer. There is no specific code representation of a completely user-defined event.

  • EventProcessor

    The event processor holds the Sequence of a specific Consumer and provides an event loop for invoking the event processing implementation.

  • BatchEventProcessor

    BatchEventProcessor contains an effective implementation of the event loop and calls back to the used EventHandle interface implementation.

  • EventHandler

    The event handling interface defined by the Disruptor, which is implemented by the user and used to handle events, is the real implementation of the Consumer.

  • Producer

    Producer refers to the user code that calls the dispatcher to publish the event in general. The dispatcher does not define a specific interface or type.

Architecture diagram

Simple and practical Disruptor

1 define events

Events are data types that are exchanged through disruptors.

package com.disruptor;

public class Data {

    private long value;

    public long getValue() {
        return value;
    }

    public void setValue(long value) {
        this.value = value;
    }
}

2 define event factory

The Event factory defines how to instantiate the events defined in step 1. Disruptor pre creates an instance of Event in RingBuffer through EventFactory.

An Event instance is used as a data slot. Before publishing, the publisher obtains an Event instance from RingBuffer, inserts data into the Event instance, and then publishes it to RingBuffer. Finally, the Consumer obtains the Event instance and reads data from it.

package com.disruptor;

import com.lmax.disruptor.EventFactory;

public class DataFactory implements EventFactory<Data> {

    @Override
    public Data newInstance() {
        return new Data();
    }
}

3 define producers

package com.disruptor;

import com.lmax.disruptor.RingBuffer;

import java.nio.ByteBuffer;

public class Producer {

    private final RingBuffer<Data> ringBuffer;

    public Producer(RingBuffer<Data> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void pushData(ByteBuffer byteBuffer) {
        long sequence = ringBuffer.next();

        try {
            Data even = ringBuffer.get(sequence);
            even.setValue(byteBuffer.getLong(0));
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}

4 define consumers

package com.disruptor;

import com.lmax.disruptor.WorkHandler;

import java.text.MessageFormat;


public class Consumer implements WorkHandler<Data> {

    @Override
    public void onEvent(Data data) throws Exception {
        long result = data.getValue() + 1;

        System.out.println(MessageFormat.format("Data process : {0} + 1 = {1}", data.getValue(), result));
    }
}

5 start the Disruptor

  • Test Demo
package com.disruptor;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;

import java.nio.ByteBuffer;
import java.util.concurrent.ThreadFactory;


public class Main {

    private static final int NUMS = 10;

    private static final int SUM = 1000000;

    public static void main(String[] args) {
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        long start = System.currentTimeMillis();

        DataFactory factory = new DataFactory();

        int buffersize = 1024;

        Disruptor<Data> disruptor = new Disruptor<Data>(factory, buffersize, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r);
            }
        });

        Consumer[] consumers = new Consumer[NUMS];
        for (int i = 0; i < NUMS; i++) {
            consumers[i] = new Consumer();
        }

        disruptor.handleEventsWithWorkerPool(consumers);
        disruptor.start();

        RingBuffer<Data> ringBuffer = disruptor.getRingBuffer();
        Producer producer = new Producer(ringBuffer);

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long i = 0; i < SUM; i++) {
            bb.putLong(0, i);
            producer.pushData(bb);
            System.out.println("Success producer data : " + i);
        }
        long end = System.currentTimeMillis();

        disruptor.shutdown();

        System.out.println("Total time : " + (end - start));
    }
}

  • Results (partial results display)
Data process : 999,987 + 1 = 999,988
Success producer data : 999995
Data process : 999,990 + 1 = 999,991
Data process : 999,989 + 1 = 999,990
Data process : 999,991 + 1 = 999,992
Data process : 999,992 + 1 = 999,993
Data process : 999,993 + 1 = 999,994
Data process : 999,995 + 1 = 999,996
Success producer data : 999996
Success producer data : 999997
Success producer data : 999998
Success producer data : 999999
Data process : 999,994 + 1 = 999,995
Data process : 999,996 + 1 = 999,997
Data process : 999,997 + 1 = 999,998
Data process : 999,998 + 1 = 999,999
Data process : 999,999 + 1 = 1,000,000
Total time : 14202

It can be seen from the results that production and consumption are carried out at the same time.

Egg

1 event conversion class

package com.mm.demo.disruptor.translator;

import com.lmax.disruptor.EventTranslatorOneArg;
import com.mm.demo.disruptor.entity.Data;

public class DataEventTranslator implements EventTranslatorOneArg<Data, Long> {

    @Override
    public void translateTo(Data event, long sequence, Long arg0) {
        System.out.println(MessageFormat.format("DataEventTranslator arg0 = {0}, seq = {1}", arg0, sequence));
        event.setValue(arg0);
    }
}

2 consumers

2.1 consumer Demo1

The consumer adds 1 to the result of the event each time.

package com.mm.demo.disruptor.handler;

import com.lmax.disruptor.EventHandler;
import com.mm.demo.disruptor.entity.Data;

import java.text.MessageFormat;

public class D1DataEventHandler implements EventHandler<Data> {

    @Override
    public void onEvent(Data event, long sequence, boolean endOfBatch) throws Exception {
        long result = event.getValue() + 1;
        Thread t = new Thread();
        String name = t.getName();
        System.out.println(MessageFormat.format("consumer "+name+": {0} + 1 = {1}", event.getValue(), result));
    }

}

The EventHandler is used here. It also uses WorkHandler. The difference between EventHandler and WorkHandler is that the former does not need to be pooled, while the latter needs to be pooled.

2.2 consumer Demo2

package com.mm.demo.disruptor.handler;

import com.lmax.disruptor.EventHandler;
import com.mm.demo.disruptor.entity.Data;

import java.text.MessageFormat;


public class D2DataEventHandler implements EventHandler<Data> {

    @Override
    public void onEvent(Data event, long sequence, boolean endOfBatch) throws Exception {
        long result = event.getValue() + 2;
        System.out.println(MessageFormat.format("consumer 2: {0} + 2 = {1}", event.getValue(), result));
    }
}

2.3 sequential calculation

Consumer1 is executed before Consumer2.

package com.mm.demo.disruptor.process;

import com.lmax.disruptor.dsl.Disruptor;
import com.mm.demo.disruptor.entity.Data;
import com.mm.demo.disruptor.handler.D1DataEventHandler;
import com.mm.demo.disruptor.handler.D2DataEventHandler;

/**
 * Serial sequential calculation
 * @DateT: 2020-01-07
 */
public class Serial {

    public static void serial(Disruptor<Data> disruptor) {
        disruptor.handleEventsWith(new D1DataEventHandler()).then(new D2DataEventHandler());
        disruptor.start();
    }
}

2.4 parallel real-time computing

Consumer1 and Consumer2 execute at the same time.

package com.mm.demo.disruptor.process;

import com.lmax.disruptor.dsl.Disruptor;
import com.mm.demo.disruptor.entity.Data;
import com.mm.demo.disruptor.handler.D1DataEventHandler;
import com.mm.demo.disruptor.handler.D2DataEventHandler;

/**
 * Parallel execution
 * @DateT: 2020-01-07
 */
public class Parallel {

    public static void parallel(Disruptor<Data> dataDisruptor) {
        dataDisruptor.handleEventsWith(new D1DataEventHandler(), new D2DataEventHandler());
        dataDisruptor.start();
    }
}

2.5 test

package com.mm.demo.disruptor;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.mm.demo.disruptor.entity.Data;
import com.mm.demo.disruptor.handler.D1DataEventHandler;
import com.mm.demo.disruptor.process.Parallel;
import com.mm.demo.disruptor.process.Serial;
import com.mm.demo.disruptor.translator.DataEventTranslator;

import javax.swing.plaf.synth.SynthTextAreaUI;
import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;


public class Main {

    private static final int BUFFER = 1024 * 1024;

    public static void main(String[] args) {

        DataFactory factory = new DataFactory();

        Disruptor<Data> disruptor = new Disruptor<Data>(factory, BUFFER, Executors.defaultThreadFactory(), ProducerType.MULTI, new BlockingWaitStrategy());

      
        Serial.serial(disruptor);
//        Parallel.parallel(disruptor);

        RingBuffer<Data> ringBuffer = disruptor.getRingBuffer();
        for (int i = 0; i < 2; i++) {
            ringBuffer.publishEvent(new DataEventTranslator(), (long)i);
        }
        disruptor.shutdown();
    }
}

summary

The above only demonstrates the way of serial and parallel, but in fact, it is still a combination of ways to create no computing processing (multiple event handler needs to be created).

Supplementary waiting strategy

  • BlockingWaitStrategy: the most inefficient strategy, but the consumption of cpu is the smallest, which can provide more consistent performance in different deployment environments.
  • SleepingWaitStrategy: the performance is almost the same as BlockingWaitStrategy, and the cpu consumption is similar. However, it has the least impact on the producer thread, which is suitable for asynchronous data processing scenarios.
  • YieldingWaitStrategy: the performance is the best, suitable for low latency scenarios. This strategy is recommended when high performance is required and the number of event processing threads is less than the number of cpu processing cores.
  • BusySpinWaitStrategy: low latency, but more cpu resources.
  • Phase backoff wait strategy: a combination of the above strategies, with large delay, but less cpu resources.

Reference resources

This paper refers to Disruptor Source code and some instructions in github.

Demo source address

github

  • Writing is not easy, please indicate the source of the reprint, like a small partner can pay attention to the public number to see more favorite articles.
  • Contact: 4272231@163.com
  • QQ:95472323
  • Wechat: ffj2000

Tags: Programming Java less github

Posted on Tue, 07 Jan 2020 05:55:55 -0800 by bakaneko