Building distributed seckill system: Disruptor high performance queue

In the continuous optimization of seckill architecture, it is inevitable to make progress based on your own cognitive deficiencies. Please correct and make progress together.

I hope you can replace the ArrayBlockingQueue queue with a Disruptor. I heard that it's a good thing because I've been exposed to it before. I just took this opportunity to integrate it.

brief introduction

LMAX Disruptor is a high performance inter thread message library. It stems from LMAX's research on concurrency, performance, and nonblocking algorithms, and now forms the core of the Exchange infrastructure.

Disruptor is an open source concurrency framework, and won the 2011 Duke's program framework innovation award. It can realize the concurrent operation of Queue in the network without lock.

Disruptor is a high-performance asynchronous processing framework, or it can be considered as the fastest message framework (lightweight JMS), or it can be considered as an implementation of observer mode, or the implementation of event monitoring mode.

Here you can compare it with BlockingQueue queue queue, simply understand that it is an efficient "producer consumer" model, understand first and then go deep into the underlying principle.

core

Before you write a code case, you'd better understand the core concept of Disruptor, at least how it works.

1,Ring Buffer

As its name implies, a circular buffer. RingBuffer was once the most important object in the Disruptor, but since version 3.0, its responsibility has been simplified to only be responsible for storing and updating the data (events) exchanged through the Disruptor. In some more advanced application scenarios, Ring Buffer can be completely replaced by user-defined implementation.

2,Sequence Disruptor

The data (events) exchanged through the serial number management are numbered by the sequential increasing serial number. The processing of data (events) is always incremented one by one along the serial number. A Sequence is used to track the processing progress of a specific event handler (RingBuffer/Consumer). Although an AtomicLong can also be used to identify progress, there is another purpose to define Sequence to be responsible for this problem, that is, to prevent false sharing of CPU cache between different sequences.

**3,Sequencer **

Sequencer is the real core of Disruptor. This interface has two implementation classes, SingleProducerSequencer and MultiProducerSequencer, which define the concurrency algorithm for fast and correct data transfer between producers and consumers.

4,Sequence Barrier

Used to maintain a reference to the main published Sequence of RingBuffer and the Sequence of other consumers that the Consumer depends on. The Sequence Barrier also defines the logic that determines whether the Consumer still has events to process.

5,Wait Strategy

A policy that defines how consumers wait for the next event. (Note: the Disruptor defines a variety of different strategies and provides different performance for different scenarios.)

6,Event

In the semantics of Disruptor, the data exchanged between producers and consumers is called event. It is not a specific type defined by the Disruptor, but is defined and specified by the user of the Disruptor.

7,EventProcessor

The event processor holds the Sequence of a specific consumer and provides an event loop for invoking the event processing implementation.

8,EventHandler

The event handling interface defined by the Disruptor, which is implemented by the user and used to handle events, is the real implementation of the Consumer.

9,Producer

That is, the producer, which is just the user code that calls the interrupter to publish the event. The interrupter does not define a specific interface or type.

Use cases

Here we take the seckill in our system as an example, and there is a relatively complex scenario introduction later.

Define the second kill event object:

/**
 * Event object (seckill event)
 * Founder kebang.com
 */
public class SeckillEvent implements Serializable {
    private static final long serialVersionUID = 1L;
    private long seckillId;
    private long userId;

    public SeckillEvent(){

    }

    public long getSeckillId() {
        return seckillId;
    }

    public void setSeckillId(long seckillId) {
        this.seckillId = seckillId;
    }

    public long getUserId() {
        return userId;
    }

    public void setUserId(long userId) {
        this.userId = userId;
    }
}

In order for the Disruptor to pre assign these events to us, we need an EventFactory that will perform the construction:

/**
 * Event generation factory (used to initialize pre allocated event objects)
 * Founder kebang.com
 */
public class SeckillEventFactory implements EventFactory<SeckillEvent> {

    public SeckillEvent newInstance() {
        return new SeckillEvent();
    }
}

Then we need to create a consumer to handle these events:

/**
 * Consumer (seckill processor)
 * Founder kebang.com
 */
public class SeckillEventConsumer implements EventHandler<SeckillEvent> {
    //Business processing, which cannot be injected here, needs to be obtained manually. See the source code
    private ISeckillService seckillService = (ISeckillService) SpringUtil.getBean("seckillService");

    public void onEvent(SeckillEvent seckillEvent, long seq, boolean bool) throws Exception {
        seckillService.startSeckil(seckillEvent.getSeckillId(), seckillEvent.getUserId());
    }
}

Now that we have consumers, we will need the source of these second killing events:

/**
 * Producer by translator
 * Founder kebang.com
 */
public class SeckillEventProducer {

    private final static EventTranslatorVararg<SeckillEvent> translator = new EventTranslatorVararg<SeckillEvent>() {
        public void translateTo(SeckillEvent seckillEvent, long seq, Object... objs) {
            seckillEvent.setSeckillId((Long) objs[0]);
            seckillEvent.setUserId((Long) objs[1]);
        }
    };

    private final RingBuffer<SeckillEvent> ringBuffer;

    public SeckillEventProducer(RingBuffer<SeckillEvent> ringBuffer){
        this.ringBuffer = ringBuffer;
    }

    public void seckill(long seckillId, long userId){
        this.ringBuffer.publishEvent(translator, seckillId, userId);
    }
}

Finally, let's write a test class and run it (it doesn't work, you need to modify the consumer):

/**
 * Test class
 * Founder kebang.com
 */
public class SeckillEventMain {

    public static void main(String[] args) {
        producerWithTranslator();
    }
    public static void producerWithTranslator(){
        SeckillEventFactory factory = new SeckillEventFactory();
        int ringBufferSize = 1024;
        ThreadFactory threadFactory = new ThreadFactory() {
            public Thread newThread(Runnable runnable) {
                return new Thread(runnable);
            }
        };
        //Create disruptor
        Disruptor<SeckillEvent> disruptor = new Disruptor<SeckillEvent>(factory, ringBufferSize, threadFactory);
        //Connect consumer event method
        disruptor.handleEventsWith(new SeckillEventConsumer());
        //start-up
        disruptor.start();
        RingBuffer<SeckillEvent> ringBuffer = disruptor.getRingBuffer();
        SeckillEventProducer producer = new SeckillEventProducer(ringBuffer);
        for(long i = 0; i<10; i++){
            producer.seckill(i, i);
        }
        disruptor.shutdown();//When the disruptor is closed, the method will be blocked until all events are handled;
    }
}

Use scenario

Usage scenario:

  • PCP (producer consumer problem)

  • There are not many cases of actual combat in China searched on the Internet, which may be used by large factories

Here is a daily example of everyone, parking scene. When the car enters the parking lot (A), the system first records the car information (B). At the same time, it will also send a message to other systems to process related business (C), and finally send a message to inform the owner of the start of charging (D).

The event processing of producer A and three consumers B, C and D needs to be completed by B and C first. Then the model structure is as follows:

Under this structure, each consumer has its own event Sequence number, and there is no shared race among consumers.

SequenceBarrier1 listens for the sequence number cursor of RingBuffer, and consumers B and C wait for consumable events through SequenceBarrier1.

SequenceBarrier2 not only listens to the cursor, but also listens to the Sequence number of B and C, so as to return the minimum Sequence number to consumer D, thus realizing the logic that D depends on B and C.

Original link:

https://blog.51cto.com/itstyle/2121926

Source network, only for learning, if there is infringement, contact delete.

I have compiled the interview questions and answers into PDF documents, as well as a set of learning materials, including Java virtual machine, spring framework, java thread, data structure, design pattern, etc., but not limited to this.

Focus on the official account [java circle] for information, as well as daily delivery of quality articles.

Tags: Programming Java network Spring

Posted on Fri, 24 Apr 2020 21:58:08 -0700 by stuartc1