Disruptor Practice (with Source Code) in Ant Golden Clothing Distributed Link Tracker Component SOFATracer

SOFAStack(Scalable Open Financial Architecture Stack)It is a financial cloud native architecture developed by Ant Golden Suit and contains the components needed to build a financial cloud native architecture. It is a best practice refined in a financial scenario.

SOFATracer Is a component for distributed system call tracking, which is unified TraceId The various network calls in the call link are logged to achieve the purpose of perspective network calls. These link data can be used for fast failure detection, service management, and so on.

SOFATracer: https://gitee.com/sofastack/sofa-tracer

Introduction to Disruptor

Disruptor is designed to provide low latency, high throughput workqueues in an asynchronous event processing architecture.It ensures that any data is owned by only one thread for write access, thereby reducing write contention compared to other structures.Currently, many well-known projects, including Apache Storm, Camel, Log4j 2, use Disruptor for high performance.

SOFATracer is also based on Disruptor SOFATracer provides two similar types of log printing, summary log and statistical log, where each call falls to the disk's log, statistical log: log with statistical output at regular intervals, and SOFATracer for either type of log outputHigh performance is required to reduce the impact on overall business process time-consuming.

Some principles of Disruptor analysis can be referred to: Disruptor .

A High Performance Inter-Thread Messaging Library High Performance Inter-Thread Messaging Library

case

Let's start with a small example of Disruptor; let's look at its constructor:

public Disruptor(
        final EventFactory<T> eventFactory,
        final int ringBufferSize,
        final ThreadFactory threadFactory,
        final ProducerType producerType,
        final WaitStrategy waitStrategy)
{
    this(
        RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
        new BasicExecutor(threadFactory));
}
  • EvetFactory: The factory that creates events in the ring buffer;
  • ringBufferSize: The size of the ring buffer must be a power of 2;
  • threadFactory: Used to create threads for processors;
  • producerType: Generator type to support RingBuffer creation using the correct sequencer and publisher; Enumeration type, SINGLE, MULTI two items.Corresponds to two Sequencers, SingleProducerSequencer and MultiProducerSequencer;
  • WatStrategy: Wait policy;

If we want to construct a disruptor, we need these components.From the perspective of eventFactory, a specific Event is also needed as a carrier of message events.[Following is a simple example of an official case change]

Message Event LongEvent, a data carrier that can be consumed

public class LongEvent {
    private long value;
    public void set(long value) {
        this.value = value;
    }
    public long getValue() {
        return value;
    }
}

factory for creating message events

public class LongEventFactory implements EventFactory<LongEvent> {
    @Override
    public LongEvent newInstance() {
        return new LongEvent();
    }
}

ConsumerThreadFactory

public class ConsumerThreadFactory implements ThreadFactory {
    private final AtomicInteger index = new AtomicInteger(1);
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "disruptor-thread-" + index.getAndIncrement());
    }
}

OK, these above are enough to create a disruptor:

private int ringBufferCapacity = 8;
//Message Event Production Factory
LongEventFactory longEventFactory = new LongEventFactory();
//Execute Event Processor Thread Factory
ConsumerThreadFactory consumerThreadFactory = new ConsumerThreadFactory();
//Wait policy for ring buffers.
WaitStrategy waitStrategy = new BlockingWaitStrategy();

//Build disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(
    longEventFactory,
    ringBufferCapacity,
    longEventThreadFactory,
    ProducerType.SINGLE,
    waitStrategy);

Now that you have a disruptor, start it with: start:

//Start disruptor
 disruptor.start();

At this point, a disruptor has been built; however, how do you use it to publish and consume messages?

Publish a message

Here are five pieces of data published in the for loop:

RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
for (long l = 0; l < 5; l++)
{
    long sequence = ringBuffer.next();
    LongEvent event = ringBuffer.get(sequence);
    event.set(100+l);
    System.out.println("publish event :" + l);
    ringBuffer.publish(sequence);
    Thread.sleep(1000);
}

The message has been published and the current disruptor's consumer processor needs to be set below.There is already a LongEvent and EventFactory; in disruptor, messages are consumed through EventHandler.

Write consumer code

public class LongEventHandler implements EventHandler<LongEvent> {
    @Override
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
        System.out.println("Event: " + event.getValue()+" -> " + Thread.currentThread().getName());
        Thread.sleep(2000);
    }
}

Set eventHandler on the processing chain of disruptor:

//Event handlers that will handle events - >Handlers for consumer events
LongEventHandler longEventHandler = new LongEventHandler();
disruptor.handleEventsWith(longEventHandler);

Run result (here)

publish event :0
Event: 0 -> disruptor-thread-1
-------------------------------->
publish event :1
Event: 1 -> disruptor-thread-1
-------------------------------->
publish event :2
Event: 2 -> disruptor-thread-1
-------------------------------->
publish event :3
Event: 3 -> disruptor-thread-1
-------------------------------->
publish event :4
Event: 4 -> disruptor-thread-1
-------------------------------->

Basic concepts and principles

Disruptor

The entire container of producer-consumer patterns based on ringBuffer implementations.Main attributes:

private final RingBuffer<T> ringBuffer;
private final Executor executor;
private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<>();
private final AtomicBoolean started = new AtomicBoolean(false);
private ExceptionHandler<? super T> exceptionHandler = new ExceptionHandlerWrapper<>();
  • RingBuffer: A RingBuffer object is held internally, and event publishing within Disruptor depends on this RingBuffer object to complete;
  • executor: the thread pool for consuming events;
  • consumerRepository: Provides a repository mechanism for associating EventHandler with EventProcessor;
  • Start: Used to indicate whether the current Disruptor has been started;
  • exceptionHandler: An exception handler used to handle uncaught exceptions in the BatchEventProcessor event cycle;

RingBuffer

Ring queues, which are implemented as an array, can be analogized to queues such as BlockingQueue. The use of ringBuffer causes memory to be recycled, reducing time-consuming operations such as memory allocation recycling expansion in some scenarios.

public final class RingBuffer<E> extends RingBufferFields<E> 
implements Cursored, EventSequencer<E>, EventSink<E>
  • E: Store implementations of data for sharing during the exchange or parallel coordination of events - > message events;

Sequencer

The top-level parent interface for producers in RingBuffer, which directly implements SingleProducerSequencer and MultiProducerSequencer; corresponding to SINGLE and MULTI enumeration values.

EventHandler

Event handler, changing the interface for external expansion to achieve specific consumption logic.As LongEventHandler in Demo above;

//Callback interface for handling events available in {@link RingBuffer}
public interface EventHandler<T> {
    void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;
}
  • Event: An event that RingBuffer has published;
  • Sequence: The sequence number of the event being processed;
  • endOfBatch: Used to identify if it is the last event in a batch from RingBuffer;

SequenceBarrier

Consumer roadblocks define how consumers can move down.In fact, the roadblock is a turn-by-turn lock.

final class ProcessingSequenceBarrier implements SequenceBarrier {
    //The wait policy when the need to wait (detect) is unavailable
    private final WaitStrategy waitStrategy;
    //Dependent on the serial number of other Consumer s, which is used for dependent consumption.
    //For example, A and B consumers, only A consumes, B can consume.
    private final Sequence     dependentSequence;
    private volatile boolean   alerted = false;
    //Write pointer for Ringbuffer
    private final Sequence     cursorSequence;
    //Sequencer for RingBuffer
    private final Sequencer    sequencer;
    //exclude method
}

WatStrategy determines what waiting strategies consumers use.

WaitStrategy

Strategy employed for making {[@link ]() EventProcessor}s wait on a cursor {[@link ]() Sequence}.

EventProcessor's wait strategy; there are eight implementations in disruptor:

The core of these different waiting strategies is how to implement waitFor s.

EventProcessor

Event processors, in fact, can be understood as the framework of the consumer model, which implements the run method of the thread Runnable, enclosing operations such as circular judgment.The interface has three implementation classes:

1,BatchEventProcessor

public final class BatchEventProcessor<T> implements EventProcessor {
    private final AtomicBoolean           running          = new AtomicBoolean(false);
    private ExceptionHandler<? super T>   exceptionHandler = new FatalExceptionHandler();
    private final DataProvider<T>         dataProvider;
    private final SequenceBarrier         sequenceBarrier;
    private final EventHandler<? super T> eventHandler;
    private final Sequence                sequence         = new Sequence(                                      Sequencer.INITIAL_CURSOR_VALUE);
    private final TimeoutHandler          timeoutHandler;
    //exclude method
}
  • ExceptionHandler: exception handler;
  • DataProvider: Data source, corresponding to RingBuffer;
  • EventHandler: Handles Event's callback object;
  • SequenceBarrier: corresponding sequence number barrier;
  • TimeoutHandler: Timeout handler, which is empty by default. If you want to set it, you only need to implement TimeOutHandler with the associated EventHandler;

If we choose to use EventHandler, the default is BatchEventProcessor, which corresponds to EventHandler one-to-one and is single-threaded.

If a RingBuffer has more than one BatchEventProcessor, then each BatchEventProcessor corresponds to one thread.

2,WorkProcessor

public final class WorkProcessor<T> implements EventProcessor {
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    private final RingBuffer<T> ringBuffer;
    private final SequenceBarrier  sequenceBarrier;
    private final WorkHandler<? super T> workHandler;
    private final ExceptionHandler<? super T> exceptionHandler;
    private final Sequence workSequence;

    private final EventReleaser eventReleaser = new EventReleaser() {
            @Override
            public void release() {
                sequence.set(Long.MAX_VALUE);
            }
    };
    private final TimeoutHandler timeoutHandler;
}

Basically similar to BatchEventProcessor, except that the callback object used to process the Event is WorkHandler.

Schematic diagram

Without consumers, the producer keeps production, but remainingCapacity remains unchanged.

While writing Demo, you wanted to observe the changes in RingBuffer's available capacity by not setting consumers.However, during the validation process, the expected results have not been obtained. (Note: No consumers are set, only producers) First look at the results:

publish event :0
bufferSie:8
remainingCapacity:8
cursor:0
-------------------------------->
publish event :1
bufferSie:8
remainingCapacity:8
cursor:1
-------------------------------->
publish event :2
bufferSie:8
remainingCapacity:8
cursor:2
-------------------------------->
publish event :3
bufferSie:8
remainingCapacity:8
cursor:3
-------------------------------->
publish event :4
bufferSie:8
remainingCapacity:8
cursor:4
-------------------------------->
publish event :5
bufferSie:8
remainingCapacity:8
cursor:5
-------------------------------->
publish event :6
bufferSie:8
remainingCapacity:8
cursor:6
-------------------------------->
publish event :7
bufferSie:8
remainingCapacity:8
cursor:7
-------------------------------->
publish event :8
bufferSie:8
remainingCapacity:8
cursor:8
-------------------------------->
publish event :9
bufferSie:8
remainingCapacity:8
cursor:9
-------------------------------->

As a result, the value of remainingCapacity should decrease with the number of releases; however, it hasn't changed at all.

Take a look at the ringBuffer.remainingCapacity() method:

/**
 * Get the remaining capacity for this ringBuffer.
 *
 * @return The number of slots remaining.
 */
public long remainingCapacity()
{
    return sequencer.remainingCapacity();
}

This is calculated using the sequencer.remainingCapacity() method.The example above uses ProducerType.SINGLE, so let's look at the implementation of remainingCapacity inside SingleProducerSequencer.

@Override
public long remainingCapacity()
{
    //Sequence value of last application completed
    long nextValue = this.nextValue;
    //Calculate series values that are currently consumed
    long consumed = Util.getMinimumSequence(gatingSequences, nextValue);
    //Sequence value currently produced
    long produced = nextValue;
    return getBufferSize() - (produced - consumed);
}

To explain what this code means:

Assume that the bufferSize of the current ringBuffer is 8; the last applied serial number is 5, which means that the occupied serial number is 5; assuming that the current consumed serial number is 3, the remaining capacity is 8-(5-2) = 5.

Because here we can determine the values of bufferSize and produced, the result of remainingCapacity depends on the result of getMinimumSequence.

public static long getMinimumSequence(final Sequence[] sequences, long minimum)
{
    for (int i = 0, n = sequences.length; i < n; i++)
    {
        long value = sequences[i].get();
        minimum = Math.min(minimum, value);
    }
    return minimum;
}

This method is to get the minimum sequence from the Sequence array.If sequences is empty, minimum is returned.Go back to the previous step and see where the sequences array came from and where its values were set.

long consumed = Util.getMinimumSequence(gatingSequences, nextValue);

gatingSequences is a member variable in the SingleProducerSequencer parent AbstractSequencer:

protected volatile Sequence[] gatingSequences = new Sequence[0];

gatingSequences are managed in this method below.

/**
 * @see Sequencer#addGatingSequences(Sequence...)
 */
@Override
public final void addGatingSequences(Sequence... gatingSequences)
{
    SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}

The call stack for this method goes back to these places:

WorkerPool manages multiple consumers; the hangdlerEventsWith method is also used to set up consumers.In the above test case, however, we wanted to observe the occupancy of the ring queue by not setting consumers to only set producers, so gatingSequences would always be empty, so the value of produced would be returned as a minimum in the calculation.Each calculation is then equivalent to:

return getBufferSize() - (produced - produced) === getBufferSize();

It also verifies why the remainingCapacity value remains unchanged without setting a consumer.

Disruptor Practice in SOFATracer

In SOFATracer, AsyncCommonDigestAppenderManager encapsulates Disruptor to handle Tracer summary logs for external components.This section uses the source code of AsyncCommonDigestAppenderManager to analyze how SOFATracer uses Disruptor.

SOFATracer uses two different event models, one is the StringEvent used internally by SOFATracer and the other is the SofaTacerSpanEvent used externally by Extensions.This is an event model such as SofaTacerSpanEvent.The StringEvent message event model corresponds to a disruptor encapsulated by the AsyncCommonAppenderManager class.

SofaTracerSpanEvent ( -> LongEvent)

Define the message event model, SofaTacerSpanEvent has the same basic structure as LongEvent in previous Demo, mainly because it has different message data internally. LongEvent has a long type of data, SofaTacerSpanEvent has a SofaTracerSpan.

public class SofaTracerSpanEvent {
    private volatile SofaTracerSpan sofaTracerSpan;
    public SofaTracerSpan getSofaTracerSpan() {
        return sofaTracerSpan;
    }
    public void setSofaTracerSpan(SofaTracerSpan sofaTracerSpan) {
        this.sofaTracerSpan = sofaTracerSpan;
    }
}

Consumer ( -> LongEventHandler)

Consumer is an internal class of AsyncCommonDigestAppenderManager; it implements the EventHandler interface, which exists as a consumer.

There is also one in AsyncCommonAppenderManager, where individuals feel they can pull it out to make the AsyncCommonDigestAppenderManager/AsyncCommonAppenderManager code look cleaner.

private class Consumer implements EventHandler<SofaTracerSpanEvent> {
       //Collection of log types, non-within which log types will not be processed
        protected Set<String> logTypes = Collections.synchronizedSet(new HashSet<String>());
        @Override
        public void onEvent(SofaTracerSpanEvent event, long sequence, boolean endOfBatch)
                                throws Exception {
            // Get specific message data sofaTracerSpan
            SofaTracerSpan sofaTracerSpan = event.getSofaTracerSpan();
            // If there is no data, do nothing
            if (sofaTracerSpan != null) {
                try {
                    String logType = sofaTracerSpan.getLogType();
                    // Verify that the current log type can be consumed by the current consumer
                    if (logTypes.contains(logType)) {
                        // Get Encoding Type
                        SpanEncoder encoder = contextEncoders.get(logType);
                        //Get appender
                        TraceAppender appender = appenders.get(logType);
                        // Encoding data
                        String encodedStr = encoder.encode(sofaTracerSpan);
                        if (appender instanceof LoadTestAwareAppender) {
                            ((LoadTestAwareAppender) appender).append(encodedStr,
                                TracerUtils.isLoadTest(sofaTracerSpan));
                        } else {
                            appender.append(encodedStr);
                        }
                        // Refresh buffer, log output
                        appender.flush();
                    }
                } catch (Exception e) {
                   // Exception omission
                }
            }
        }

        public void addLogType(String logType) {
            logTypes.add(logType);
        }
    }

SofaTracerSpanEventFactory (-> LongEventFactory)

Factory used to generate message events.

public class SofaTracerSpanEventFactory implements EventFactory<SofaTracerSpanEvent> {
    @Override
    public SofaTracerSpanEvent newInstance() {
        return new SofaTracerSpanEvent();
    }
}

ConsumerThreadFactory (-> LongEventThreadFactory )

Factory used to generate consumer threads.

public class ConsumerThreadFactory implements ThreadFactory {
    private String workName;
    public String getWorkName() {
        return workName;
    }
    public void setWorkName(String workName) {
        this.workName = workName;
    }
    @Override
    public Thread newThread(Runnable runnable) {
        Thread worker = new Thread(runnable, "Tracer-AsyncConsumer-Thread-" + workName);
        worker.setDaemon(true);
        return worker;
    }
}

Build Disruptor

Disruptor is built in the constructor of AsyncCommonDigestAppenderManager.

public AsyncCommonDigestAppenderManager(int queueSize, int consumerNumber) {
    // Use this calculation to ensure that realQueueSize is a power of 2 (returns the power of the smallest 2 that is currently greater than or equal to queueSize)
    int realQueueSize = 1 << (32 - Integer.numberOfLeadingZeros(queueSize - 1));
    //Build disruptor using ProducerType.MULTI
    //The wait policy is BlockingWaitStrategy
    disruptor = new Disruptor<SofaTracerSpanEvent>(new SofaTracerSpanEventFactory(),
        realQueueSize, threadFactory, ProducerType.MULTI, new BlockingWaitStrategy());
    //Consumer List
    this.consumers = new ArrayList<Consumer>(consumerNumber);
    
    for (int i = 0; i < consumerNumber; i++) {
        Consumer consumer = new Consumer();
        consumers.add(consumer);
        //Set exception handler
        disruptor.setDefaultExceptionHandler(new ConsumerExceptionHandler());
        //Bind consumers
        disruptor.handleEventsWith(consumer);
    }

    //Allow discarding, get from configuration file
    this.allowDiscard = Boolean.parseBoolean(SofaTracerConfiguration.getProperty(
        SofaTracerConfiguration.TRACER_ASYNC_APPENDER_ALLOW_DISCARD, DEFAULT_ALLOW_DISCARD));
    
    if (allowDiscard) {
        //Whether to record the number of lost logs
        this.isOutDiscardNumber = Boolean.parseBoolean(SofaTracerConfiguration.getProperty(
            SofaTracerConfiguration.TRACER_ASYNC_APPENDER_IS_OUT_DISCARD_NUMBER,
            DEFAULT_IS_OUT_DISCARD_NUMBER));
        //Whether to record TraceId and RpcId for lost logs
        this.isOutDiscardId = Boolean.parseBoolean(SofaTracerConfiguration.getProperty(
            SofaTracerConfiguration.TRACER_ASYNC_APPENDER_IS_OUT_DISCARD_ID,
            DEFAULT_IS_OUT_DISCARD_ID));
        //The number of lost logs reaches this threshold for one log output
        this.discardOutThreshold = Long.parseLong(SofaTracerConfiguration.getProperty(
            SofaTracerConfiguration.TRACER_ASYNC_APPENDER_DISCARD_OUT_THRESHOLD,
            DEFAULT_DISCARD_OUT_THRESHOLD));
        if (isOutDiscardNumber) {
            this.discardCount = new PaddedAtomicLong(0L);
        }
    }
}

Start Disruptor

Disruptor startup is delegated to the start method of AsyncCommonDigestAppenderManager.

public void start(final String workerName) {
    this.threadFactory.setWorkName(workerName);
    this.ringBuffer = this.disruptor.start();
}

Let's see where this start is called in SOFATracer:

  • CommonTracerManager: This holds a single object of the AsyncCommonDigestAppenderManager class and the start method is called in the static code block; this is used to output the normal log;
  • SofaTracerDigestReporterAsyncManager: This class also holds a single object of the AsyncCommonDigestAppenderManager class, and provides the getSofaTracerDigestReporterAsyncManager method to get the singleton, which calls the start method; this object is used to output summary logs;

Publish Events

In the previous Demo, events are published through a for loop. In SOFAracer, event publishing is triggered when a Tracer log needs to be output. The corresponding append operation is the log append operation, which appends the log to the ring buffer.

public boolean append(SofaTracerSpan sofaTracerSpan) {
    long sequence = 0L;
    //Allow discarding
    if (allowDiscard) {
        try {
            //Allow discarding to try the application sequence using tryNext without throwing an exception
            sequence = ringBuffer.tryNext();
        } catch (InsufficientCapacityException e) {
            //Whether to output TraceId and RpcId for lost logs
            if (isOutDiscardId) {
                SofaTracerSpanContext sofaTracerSpanContext = sofaTracerSpan
                    .getSofaTracerSpanContext();
                if (sofaTracerSpanContext != null) {
                    SynchronizingSelfLog.warn("discarded tracer: traceId["
                                              + sofaTracerSpanContext.getTraceId()
                                              + "];spanId[" + sofaTracerSpanContext.getSpanId()
                                              + "]");
                }
            }
             //Whether to output the number of lost logs
            if ((isOutDiscardNumber) && discardCount.incrementAndGet() == discardOutThreshold) {
                discardCount.set(0);
                if (isOutDiscardNumber) {
                    SynchronizingSelfLog.warn("discarded " + discardOutThreshold + " logs");
                }
            }

            return false;
        }
    } else {
        // next method if discarding is not allowed
        sequence = ringBuffer.next();
    }

    try {
        SofaTracerSpanEvent event = ringBuffer.get(sequence);
        event.setSofaTracerSpan(sofaTracerSpan);
    } catch (Exception e) {
        SynchronizingSelfLog.error("fail to add event");
        return false;
    }
    //Release
    ringBuffer.publish(sequence);
    return true;
}

Call logic for SOFATracer event publishing:

By tracing the process of the call, you know that when the current span calls finish or when reportSpan is called in SOFATracer, it is equivalent to publishing a message event.

Summary

This article makes a simple analysis of the SOFATracer's code for log output using Disruptor. You can look at the SOFATracer's code in more detail.SOFATracer, as a lower-level middleware component, is largely unaware in actual business development.But as a technology to learn, there are still many points to dig.

SOFATracer: https://gitee.com/sofastack/sofa-tracer

If there are small partners interested in middleware, please join our team and welcome to our team; those interested in SOFA technology system can pay attention to SOFAStack Community: https://www.sofastack.tech/community/

Tags: Programming network encoding Apache log4j

Posted on Tue, 17 Mar 2020 19:53:36 -0700 by jockey_jockey