Disruptor - Performance comparison with ArrayBlockingQueue

This code test is based on the same capacity, producer threads, individual producer productions, and individual consumption threads.

Test results:

 

Tested and proven:

  1. Disruptor performs better when capacity reaches a threshold
  2. The size of the RingBuffer and the speed of consumer spending both directly affect the overall time consumed.

Test Entry

package com.lmax.disruptor.noob;

import java.time.Instant;
import java.time.format.DateTimeFormatter;

public class CompareTest {
	public static int THREAD = 2 << 10; // Number of threads
	public static int PER = 2 << 10; // Number of production per thread
	public static int CAP = 256; // Maximum capacity

	public static void main(String[] args) {
		println("Number of production lines:" + THREAD + " Single Thread Production: " + PER + " Maximum capacity:" + CAP);
		new Thread(() -> ArrayBlockingQueueTest.execute()).start();
		new Thread(() -> DisruptorTest.execute()).start();
	}

	public static void println(String msg) {
		System.out.println(DateTimeFormatter.ISO_INSTANT.format(Instant.now()) + "  " + msg);
	}
}

ArrayBlockingQueue test case

package com.lmax.disruptor.noob;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class ArrayBlockingQueueTest {

	public static void execute() {
		ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(CompareTest.CAP);
		AtomicLong count = new AtomicLong();
		AtomicBoolean endP = new AtomicBoolean(false);
		AtomicBoolean endC = new AtomicBoolean(false);
		long startTime = System.currentTimeMillis();
		for (int i = 0; i < CompareTest.THREAD; i++) {
			final int m = i;
			new Thread(() -> {
				for (int j = 0; j < CompareTest.PER; j++) {
					queue.offer("i" + m + "j" + j);
					count.incrementAndGet();
					if (count.get() == CompareTest.CAP) {
						CompareTest.println("ArrayBlockingQueue Production time:" + (System.currentTimeMillis() - startTime));
						endP.set(true);
					}
				}
			}).start();
		}

		new Thread(() -> {
			while (!queue.isEmpty() && count.get() != CompareTest.CAP) {
				queue.poll();
			}
			CompareTest.println("ArrayBlockingQueue Consumption time:" + (System.currentTimeMillis() - startTime));
			endC.set(true);
		}).start();

		while (!(endC.get() && endP.get())) {

		}
		CompareTest.println("ArrayBlockingQueue Total time consumed:" + (System.currentTimeMillis() - startTime));

	}
}

Disruptor Test Case

Verification Points

  1. Because ringBuffer.next() blocks the producer's acquisition sequence when RingBuffer has no free space.If the number of slots is the N-th power of 2, it is more advantageous for binary-based computers to calculate.
    (proofreading note: the N-th power of 2 is converted to binary number 1000, 100, 10, 1, sequence &(array length-1) = array index,
    For example, there are 8 slots, 3 &(8-1)=3. HashMap locates array elements in this way, which is faster than modeling.
  2. Disruptor consumption and production are completely asynchronous.
  3. If we use RingBuffer.next() to get an event slot, we must publish the corresponding event.If you can't publish the event, you'll be confusing about the Disruptor state
  4. Disruptor event handling handler must be bound before starting.The handleEventsWith in subprocess Disruptor.checkNotStarted() will determine that Disruptor is started and error will occur
  5. HandleEventsWithencapsulates the EventHandler as EventProcessors <Runnable>, encapsulating the BatchEventProcessor with the incoming ThreadFactory at start() and throwing it into the default BasicExecutor execution
  6. The consumer thread was created by the threadFactory specified when the Disruptor was initialized
package com.lmax.disruptor.noob;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;

public class DisruptorTest {
	public static void execute() {
		int bufferSize = CompareTest.CAP;

		Disruptor<DataEvent> disruptor = new Disruptor<DataEvent>(new DataEventFactory(), bufferSize,
				new ThreadFactory() {
					@Override
					public Thread newThread(Runnable eventProcessor) {
						CompareTest.println("EventProcessor wrapper");// Encapsulation of event processing bus
						Thread thread = new Thread(eventProcessor);
						thread.setName("EventProcessorWrapper");
						return thread;
					}
				});
		/**
		 * Create EventProcessors <Runnable>.
		 * Subprocess Disruptor.checkNotStarted() event handling handler must be bound before starting.
		 */
		disruptor.handleEventsWith(new DataEventHandler());
		/**
		 * Wrap BatchEventProcessor in the incoming ThreadFactory and throw it into the default BasicExecutor execution
		 */
		disruptor.start();
		CompareTest.println("disruptor start success!");

		RingBuffer<DataEvent> ringBuffer = disruptor.getRingBuffer();
		DataProducer producer = new DataProducer(ringBuffer);
		DataEventProducerWithTranslator translator = new DataEventProducerWithTranslator(ringBuffer);
		long start = System.currentTimeMillis();

		AtomicLong count = new AtomicLong();
		for (int l = 0; l < CompareTest.THREAD; l++) {
			new Thread(() -> {
				for (int m = 0; m < CompareTest.PER; m++) {
					producer.onData(start);
					// translator.onData(start);
					count.getAndIncrement();
				}
			}).start();
		}
	}
}

Event

package com.lmax.disruptor.noob;

/**
 * Event Instances Encapsulate Business Data Transfer Objects
 * 
 * @author admin
 *
 */
public class DataEvent {
	private long startTime;

	public long getStartTime() {
		return startTime;
	}

	public void setStartTime(long startTime) {
		this.startTime = startTime;
	}

}
---

package com.lmax.disruptor.noob;
import com.lmax.disruptor.EventFactory;

/*
 * Build the passed data encapsulation object to initialize the DataEvent directly on each address of entries[] when initializing ringBuffer
 */
public class DataEventFactory implements EventFactory {

	@Override
	public Object newInstance() {
		return new DataEvent();
	}

}

Producer

package com.lmax.disruptor.noob;

import com.lmax.disruptor.RingBuffer;

public class DataProducer {
	private final RingBuffer<DataEvent> ringBuffer;

	public DataProducer(RingBuffer<DataEvent> ringBuffer) {
		this.ringBuffer = ringBuffer;
	}

	/**
	 * Current or Production Line
	 * <p>
	 * onData Used to publish events, which are published once per call. Its parameters are passed to the consumer through the event
	 * 
	 * @param data
	 */
	public void onData(long data) {//
		// Think of ringBuffer as an event queue. Next is the next event slot. If there is no free slot, it will block
		long sequence = ringBuffer.next();
		// CompareTest.println("Production placement sequence:" + sequence);
		try {
			// Use the index above to fetch an empty event to fill in
			DataEvent event = ringBuffer.get(sequence);// for the sequence
			event.setStartTime(data);
		} finally {
			// Publish Events
			ringBuffer.publish(sequence);
		}
	}
}

Get the next event slot and publish the event To use try/finally guarantee that the event will be published, it is best to pass the data directly to the Translator to process the populated DataEvent, and finally finally publish it

package com.lmax.disruptor.noob;

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;

/**
 * Get the next event slot and publish the event (use try/finnally when publishing the event to ensure that it will be published).
 * If we use RingBuffer.next() to get an event slot, we must publish the corresponding event.If you can't publish the event, you'll be confusing about the Disruptor state
 * . Especially with multiple event producers, event consumers will stall and have to restart the application to recover.
 * 
 * @author admin
 *
 */
public class DataEventProducerWithTranslator {
	private final RingBuffer<DataEvent> ringBuffer;

	// A translator can be seen as an event initializer, which is called by the publicEvent method
	// Fill Event s
	private static final EventTranslatorOneArg<DataEvent, Long> TRANSLATOR = new EventTranslatorOneArg<DataEvent, Long>() {
		public void translateTo(DataEvent event, long sequence, Long startTime) {
			event.setStartTime(startTime);
		}
	};

	public DataEventProducerWithTranslator(RingBuffer<DataEvent> ringBuffer) {
		this.ringBuffer = ringBuffer;
	}

	public void onData(Long bb) {
		ringBuffer.publishEvent(TRANSLATOR, bb);
		// Current or Producer Thread
	//	CompareTest.println(Thread.currentThread().getName() + " pulishEvent end!");
		
	}
}

Handler

package com.lmax.disruptor.noob;

import java.util.concurrent.atomic.AtomicLong;

import com.lmax.disruptor.EventHandler;

/**
 * Processing of specified events
 *
 */
public class DataEventHandler implements EventHandler<DataEvent> {
	public AtomicLong count = new AtomicLong(0);

	@Override
	public void onEvent(DataEvent event, long sequence, boolean endOfBatch) throws Exception {
		/**
		 *  The consumer thread was created by the threadFactory specified when the Disruptor was initialized
		 */
	//	CompareTest.println(Thread.currentThread().getName() + "----" + event.getStartTime()); 
		count.incrementAndGet();
		if (count.incrementAndGet() == CompareTest.CAP) {
			CompareTest.println("Handled sequence: " + sequence + "  Disruptor Total time consumed:"
					+ (System.currentTimeMillis() - event.getStartTime()));

		}
	}

}

Tags: Programming Java

Posted on Wed, 28 Aug 2019 20:59:18 -0700 by millercj