On skywalking's trace element service client

This article refers to the original- http://bjbsair.com/2020-03-22/tech-info/5102/ order

This paper focuses on the trace element service client of skywalking

TracingContextListener

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContextListener.java

public interface TracingContextListener {  
    void afterFinished(TraceSegment traceSegment);  
}
  • Tracecontextlistener defines the afterFinished method, whose parameter is traceinstance

TraceSegment

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/TraceSegment.java

public class TraceSegment {  
​  
    private ID traceSegmentId;  
​  
    private List<TraceSegmentRef> refs;  
​  
    private List<AbstractTracingSpan> spans;  
​  
    private DistributedTraceIds relatedGlobalTraces;  
​  
    private boolean ignore = false;  
​  
    private boolean isSizeLimited = false;  
​  
    private final long createTime;  
​  
    public TraceSegment() {  
        this.traceSegmentId = GlobalIdGenerator.generate();  
        this.spans = new LinkedList<AbstractTracingSpan>();  
        this.relatedGlobalTraces = new DistributedTraceIds();  
        this.relatedGlobalTraces.append(new NewDistributedTraceId());  
        this.createTime = System.currentTimeMillis();  
    }  
​  
    public void ref(TraceSegmentRef refSegment) {  
        if (refs == null) {  
            refs = new LinkedList<TraceSegmentRef>();  
        }  
        if (!refs.contains(refSegment)) {  
            refs.add(refSegment);  
        }  
    }  
​  
    public void relatedGlobalTraces(DistributedTraceId distributedTraceId) {  
        relatedGlobalTraces.append(distributedTraceId);  
    }  
​  
    public void archive(AbstractTracingSpan finishedSpan) {  
        spans.add(finishedSpan);  
    }  
​  
    public TraceSegment finish(boolean isSizeLimited) {  
        this.isSizeLimited = isSizeLimited;  
        return this;  
    }  
​  
    public ID getTraceSegmentId() {  
        return traceSegmentId;  
    }  
​  
    public int getServiceId() {  
        return RemoteDownstreamConfig.Agent.SERVICE_ID;  
    }  
​  
    public boolean hasRef() {  
        return !(refs == null || refs.size() == 0);  
    }  
​  
    public List<TraceSegmentRef> getRefs() {  
        return refs;  
    }  
​  
    public List<DistributedTraceId> getRelatedGlobalTraces() {  
        return relatedGlobalTraces.getRelatedGlobalTraces();  
    }  
​  
    public boolean isSingleSpanSegment() {  
        return this.spans != null && this.spans.size() == 1;  
    }  
​  
    public boolean isIgnore() {  
        return ignore;  
    }  
​  
    public void setIgnore(boolean ignore) {  
        this.ignore = ignore;  
    }  
​  
    public UpstreamSegment transform() {  
        UpstreamSegment.Builder upstreamBuilder = UpstreamSegment.newBuilder();  
        for (DistributedTraceId distributedTraceId : getRelatedGlobalTraces()) {  
            upstreamBuilder = upstreamBuilder.addGlobalTraceIds(distributedTraceId.toUniqueId());  
        }  
        SegmentObject.Builder traceSegmentBuilder = SegmentObject.newBuilder();  
        /**  
         * Trace Segment  
         */  
        traceSegmentBuilder.setTraceSegmentId(this.traceSegmentId.transform());  
        // Don't serialize TraceSegmentReference  
​  
        // SpanObject  
        for (AbstractTracingSpan span : this.spans) {  
            traceSegmentBuilder.addSpans(span.transform());  
        }  
        traceSegmentBuilder.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID);  
        traceSegmentBuilder.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);  
        traceSegmentBuilder.setIsSizeLimited(this.isSizeLimited);  
​  
        upstreamBuilder.setSegment(traceSegmentBuilder.build().toByteString());  
        return upstreamBuilder.build();  
    }  
​  
    @Override  
    public String toString() {  
        return "TraceSegment{" +  
            "traceSegmentId='" + traceSegmentId + '\'' +  
            ", refs=" + refs +  
            ", spans=" + spans +  
            ", relatedGlobalTraces=" + relatedGlobalTraces +  
            '}';  
    }  
​  
    public int getApplicationInstanceId() {  
        return RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID;  
    }  
​  
    public long createTime() {  
        return this.createTime;  
    }  
}
  • Traceinstance defines traceSegmentId, refs, spans, relatedGlobalTraces and other attributes; it provides methods such as ref, relatedGlobalTraces, archive, finish, transform, etc

IConsumer

skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IConsumer.java

public interface IConsumer<T> {  
    void init();  
​  
    void consume(List<T> data);  
​  
    void onError(List<T> data, Throwable t);  
​  
    void onExit();  
}
  • IConsumer defines init, consumer, onError, onExit methods

TraceSegmentServiceClient

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java

@DefaultImplementor  
public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, GRPCChannelListener {  
    private static final ILog logger = LogManager.getLogger(TraceSegmentServiceClient.class);  
    private static final int TIMEOUT = 30 * 1000;  
​  
    private long lastLogTime;  
    private long segmentUplinkedCounter;  
    private long segmentAbandonedCounter;  
    private volatile DataCarrier<TraceSegment> carrier;  
    private volatile TraceSegmentReportServiceGrpc.TraceSegmentReportServiceStub serviceStub;  
    private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;  
​  
    @Override  
    public void prepare() throws Throwable {  
        ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);  
    }  
​  
    @Override  
    public void boot() throws Throwable {  
        lastLogTime = System.currentTimeMillis();  
        segmentUplinkedCounter = 0;  
        segmentAbandonedCounter = 0;  
        carrier = new DataCarrier<TraceSegment>(CHANNEL_SIZE, BUFFER_SIZE);  
        carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);  
        carrier.consume(this, 1);  
    }  
​  
    @Override  
    public void onComplete() throws Throwable {  
        TracingContext.ListenerManager.add(this);  
    }  
​  
    @Override  
    public void shutdown() throws Throwable {  
        TracingContext.ListenerManager.remove(this);  
        carrier.shutdownConsumers();  
    }  
​  
    @Override  
    public void init() {  
​  
    }  
​  
    @Override  
    public void consume(List<TraceSegment> data) {  
        if (CONNECTED.equals(status)) {  
            final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);  
            StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() {  
                @Override  
                public void onNext(Commands commands) {  
                    ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);  
                }  
​  
                @Override  
                public void onError(Throwable throwable) {  
                    status.finished();  
                    if (logger.isErrorEnable()) {  
                        logger.error(throwable, "Send UpstreamSegment to collector fail with a grpc internal exception.");  
                    }  
                    ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);  
                }  
​  
                @Override  
                public void onCompleted() {  
                    status.finished();  
                }  
            });  
​  
            try {  
                for (TraceSegment segment : data) {  
                    UpstreamSegment upstreamSegment = segment.transform();  
                    upstreamSegmentStreamObserver.onNext(upstreamSegment);  
                }  
            } catch (Throwable t) {  
                logger.error(t, "Transform and send UpstreamSegment to collector fail.");  
            }  
​  
            upstreamSegmentStreamObserver.onCompleted();  
​  
            status.wait4Finish();  
            segmentUplinkedCounter += data.size();  
        } else {  
            segmentAbandonedCounter += data.size();  
        }  
​  
        printUplinkStatus();  
    }  
​  
    private void printUplinkStatus() {  
        long currentTimeMillis = System.currentTimeMillis();  
        if (currentTimeMillis - lastLogTime > 30 * 1000) {  
            lastLogTime = currentTimeMillis;  
            if (segmentUplinkedCounter > 0) {  
                logger.debug("{} trace segments have been sent to collector.", segmentUplinkedCounter);  
                segmentUplinkedCounter = 0;  
            }  
            if (segmentAbandonedCounter > 0) {  
                logger.debug("{} trace segments have been abandoned, cause by no available channel.", segmentAbandonedCounter);  
                segmentAbandonedCounter = 0;  
            }  
        }  
    }  
​  
    @Override  
    public void onError(List<TraceSegment> data, Throwable t) {  
        logger.error(t, "Try to send {} trace segments to collector, with unexpected exception.", data.size());  
    }  
​  
    @Override  
    public void onExit() {  
​  
    }  
​  
    @Override  
    public void afterFinished(TraceSegment traceSegment) {  
        if (traceSegment.isIgnore()) {  
            return;  
        }  
        if (!carrier.produce(traceSegment)) {  
            if (logger.isDebugEnable()) {  
                logger.debug("One trace segment has been abandoned, cause by buffer is full.");  
            }  
        }  
    }  
​  
    @Override  
    public void statusChanged(GRPCChannelStatus status) {  
        if (CONNECTED.equals(status)) {  
            Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();  
            serviceStub = TraceSegmentReportServiceGrpc.newStub(channel);  
        }  
        this.status = status;  
    }  
}
  • TraceSegmentServiceClient implements BootService, IConsumer, TracingContextListener and GRPCChannelListener interfaces; its prepare method registers its channelListener with GRPCChannelManager; its boot method sets lastLogTime, instantiates DataCarrier and sets its consumer as itself; its onComplete method executes TracingContext.ListenerManager.add(this); its shutdown Method n executes traingcontext. Listenermanager. Remove (this) and carrier.shutdownConsumers(); its consume method executes upstreamSegmentStreamObserver.onNext(upstreamSegment), upstreamSegmentStreamObserver.onCompleted() and status.wait4Finish(); its afterFinished method executes carrier. Process (tracesegment); its statusChanged Set serviceStub and status

ConsumerThread

skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java

public class ConsumerThread<T> extends Thread {  
    private volatile boolean running;  
    private IConsumer<T> consumer;  
    private List<DataSource> dataSources;  
    private long consumeCycle;  
​  
    ConsumerThread(String threadName, IConsumer<T> consumer, long consumeCycle) {  
        super(threadName);  
        this.consumer = consumer;  
        running = false;  
        dataSources = new ArrayList<DataSource>(1);  
        this.consumeCycle = consumeCycle;  
    }  
​  
    /**  
     * add whole buffer to consume  
     *  
     * @param sourceBuffer  
     */  
    void addDataSource(QueueBuffer<T> sourceBuffer) {  
        this.dataSources.add(new DataSource(sourceBuffer));  
    }  
​  
    @Override  
    public void run() {  
        running = true;  
​  
        final List<T> consumeList = new ArrayList<T>(1500);  
        while (running) {  
            if (!consume(consumeList)) {  
                try {  
                    Thread.sleep(consumeCycle);  
                } catch (InterruptedException e) {  
                }  
            }  
        }  
​  
        // consumer thread is going to stop  
        // consume the last time  
        consume(consumeList);  
​  
        consumer.onExit();  
    }  
​  
    private boolean consume(List<T> consumeList) {  
        for (DataSource dataSource : dataSources) {  
            dataSource.obtain(consumeList);  
        }  
​  
        if (!consumeList.isEmpty()) {  
            try {  
                consumer.consume(consumeList);  
            } catch (Throwable t) {  
                consumer.onError(consumeList, t);  
            } finally {  
                consumeList.clear();  
            }  
            return true;  
        }  
        return false;  
    }  
​  
    void shutdown() {  
        running = false;  
    }  
​  
    /**  
     * DataSource is a refer to {@link Buffer}.  
     */  
    class DataSource {  
        private QueueBuffer<T> sourceBuffer;  
​  
        DataSource(QueueBuffer<T> sourceBuffer) {  
            this.sourceBuffer = sourceBuffer;  
        }  
​  
        void obtain(List<T> consumeList) {  
            sourceBuffer.obtain(consumeList);  
        }  
    }  
}
  • ConsumerThread inherits Thread, and its run method will execute consumer (consumeList) circularly. When it jumps out of the cycle, it will execute consumer (consumeList) again, and finally consumer.onExit(); the consumer method will traverse dataSources, execute its datasource. Detail (consumeList), and then execute consumer. Consumer (consumeList) method when consumeList is not empty

ConsumeDriver

skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java

public class ConsumeDriver<T> implements IDriver {  
    private boolean running;  
    private ConsumerThread[] consumerThreads;  
    private Channels<T> channels;  
    private ReentrantLock lock;  
​  
    public ConsumeDriver(String name, Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num,  
        long consumeCycle) {  
        this(channels, num);  
        for (int i = 0; i < num; i++) {  
            consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass), consumeCycle);  
            consumerThreads[i].setDaemon(true);  
        }  
    }  
​  
    public ConsumeDriver(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {  
        this(channels, num);  
        prototype.init();  
        for (int i = 0; i < num; i++) {  
            consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", prototype, consumeCycle);  
            consumerThreads[i].setDaemon(true);  
        }  
​  
    }  
​  
    private ConsumeDriver(Channels<T> channels, int num) {  
        running = false;  
        this.channels = channels;  
        consumerThreads = new ConsumerThread[num];  
        lock = new ReentrantLock();  
    }  
​  
    private IConsumer<T> getNewConsumerInstance(Class<? extends IConsumer<T>> consumerClass) {  
        try {  
            IConsumer<T> inst = consumerClass.newInstance();  
            inst.init();  
            return inst;  
        } catch (InstantiationException e) {  
            throw new ConsumerCannotBeCreatedException(e);  
        } catch (IllegalAccessException e) {  
            throw new ConsumerCannotBeCreatedException(e);  
        }  
    }  
​  
    @Override  
    public void begin(Channels channels) {  
        if (running) {  
            return;  
        }  
        try {  
            lock.lock();  
            this.allocateBuffer2Thread();  
            for (ConsumerThread consumerThread : consumerThreads) {  
                consumerThread.start();  
            }  
            running = true;  
        } finally {  
            lock.unlock();  
        }  
    }  
​  
    @Override  
    public boolean isRunning(Channels channels) {  
        return running;  
    }  
​  
    private void allocateBuffer2Thread() {  
        int channelSize = this.channels.getChannelSize();  
        /**  
         * if consumerThreads.length < channelSize  
         * each consumer will process several channels.  
         *  
         * if consumerThreads.length == channelSize  
         * each consumer will process one channel.  
         *  
         * if consumerThreads.length > channelSize  
         * there will be some threads do nothing.  
         */  
        for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {  
            int consumerIndex = channelIndex % consumerThreads.length;  
            consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));  
        }  
​  
    }  
​  
    @Override  
    public void close(Channels channels) {  
        try {  
            lock.lock();  
            this.running = false;  
            for (ConsumerThread consumerThread : consumerThreads) {  
                consumerThread.shutdown();  
            }  
        } finally {  
            lock.unlock();  
        }  
    }  
}
  • The ConsumeDriver implements the IDriver interface, and its ConsumeDriver will create num consumerthreads; its begin method will execute allocateBuffer2Thread, add dataSource to each consumerthread, and then execute consumerThread.start(); its close method will execute consumerThread.shutdown()

Summary

TraceSegmentServiceClient implements BootService, IConsumer, TracingContextListener and GRPCChannelListener interfaces; its prepare method registers its own channelListener with GRPCChannelManager; its boot method sets lastLogTime, instantiates DataCarrier and sets its consumer as itself; its onComplete method executes TracingContext.ListenerManager.add(this); its shutdown Method n executes traingcontext. Listenermanager. Remove (this) and carrier.shutdownConsumers(); its consume method executes upstreamSegmentStreamObserver.onNext(upstreamSegment), upstreamSegmentStreamObserver.onCompleted() and status.wait4Finish(); its afterFinished method executes carrier. Process (tracesegment); its statusChanged Set serviceStub and status

doc

This paper focuses on the trace element service client of skywalking

TracingContextListener

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContextListener.java

public interface TracingContextListener {  
    void afterFinished(TraceSegment traceSegment);  
}
  • Tracecontextlistener defines the afterFinished method, whose parameter is traceinstance

TraceSegment

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/TraceSegment.java

public class TraceSegment {  
​  
    private ID traceSegmentId;  
​  
    private List<TraceSegmentRef> refs;  
​  
    private List<AbstractTracingSpan> spans;  
​  
    private DistributedTraceIds relatedGlobalTraces;  
​  
    private boolean ignore = false;  
​  
    private boolean isSizeLimited = false;  
​  
    private final long createTime;  
​  
    public TraceSegment() {  
        this.traceSegmentId = GlobalIdGenerator.generate();  
        this.spans = new LinkedList<AbstractTracingSpan>();  
        this.relatedGlobalTraces = new DistributedTraceIds();  
        this.relatedGlobalTraces.append(new NewDistributedTraceId());  
        this.createTime = System.currentTimeMillis();  
    }  
​  
    public void ref(TraceSegmentRef refSegment) {  
        if (refs == null) {  
            refs = new LinkedList<TraceSegmentRef>();  
        }  
        if (!refs.contains(refSegment)) {  
            refs.add(refSegment);  
        }  
    }  
​  
    public void relatedGlobalTraces(DistributedTraceId distributedTraceId) {  
        relatedGlobalTraces.append(distributedTraceId);  
    }  
​  
    public void archive(AbstractTracingSpan finishedSpan) {  
        spans.add(finishedSpan);  
    }  
​  
    public TraceSegment finish(boolean isSizeLimited) {  
        this.isSizeLimited = isSizeLimited;  
        return this;  
    }  
​  
    public ID getTraceSegmentId() {  
        return traceSegmentId;  
    }  
​  
    public int getServiceId() {  
        return RemoteDownstreamConfig.Agent.SERVICE_ID;  
    }  
​  
    public boolean hasRef() {  
        return !(refs == null || refs.size() == 0);  
    }  
​  
    public List<TraceSegmentRef> getRefs() {  
        return refs;  
    }  
​  
    public List<DistributedTraceId> getRelatedGlobalTraces() {  
        return relatedGlobalTraces.getRelatedGlobalTraces();  
    }  
​  
    public boolean isSingleSpanSegment() {  
        return this.spans != null && this.spans.size() == 1;  
    }  
​  
    public boolean isIgnore() {  
        return ignore;  
    }  
​  
    public void setIgnore(boolean ignore) {  
        this.ignore = ignore;  
    }  
​  
    public UpstreamSegment transform() {  
        UpstreamSegment.Builder upstreamBuilder = UpstreamSegment.newBuilder();  
        for (DistributedTraceId distributedTraceId : getRelatedGlobalTraces()) {  
            upstreamBuilder = upstreamBuilder.addGlobalTraceIds(distributedTraceId.toUniqueId());  
        }  
        SegmentObject.Builder traceSegmentBuilder = SegmentObject.newBuilder();  
        /**  
         * Trace Segment  
         */  
        traceSegmentBuilder.setTraceSegmentId(this.traceSegmentId.transform());  
        // Don't serialize TraceSegmentReference  
​  
        // SpanObject  
        for (AbstractTracingSpan span : this.spans) {  
            traceSegmentBuilder.addSpans(span.transform());  
        }  
        traceSegmentBuilder.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID);  
        traceSegmentBuilder.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);  
        traceSegmentBuilder.setIsSizeLimited(this.isSizeLimited);  
​  
        upstreamBuilder.setSegment(traceSegmentBuilder.build().toByteString());  
        return upstreamBuilder.build();  
    }  
​  
    @Override  
    public String toString() {  
        return "TraceSegment{" +  
            "traceSegmentId='" + traceSegmentId + '\'' +  
            ", refs=" + refs +  
            ", spans=" + spans +  
            ", relatedGlobalTraces=" + relatedGlobalTraces +  
            '}';  
    }  
​  
    public int getApplicationInstanceId() {  
        return RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID;  
    }  
​  
    public long createTime() {  
        return this.createTime;  
    }  
}
  • Traceinstance defines traceSegmentId, refs, spans, relatedGlobalTraces and other attributes; it provides methods such as ref, relatedGlobalTraces, archive, finish, transform, etc

IConsumer

skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IConsumer.java

public interface IConsumer<T> {  
    void init();  
​  
    void consume(List<T> data);  
​  
    void onError(List<T> data, Throwable t);  
​  
    void onExit();  
}
  • IConsumer defines init, consumer, onError, onExit methods

TraceSegmentServiceClient

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java

@DefaultImplementor  
public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, GRPCChannelListener {  
    private static final ILog logger = LogManager.getLogger(TraceSegmentServiceClient.class);  
    private static final int TIMEOUT = 30 * 1000;  
​  
    private long lastLogTime;  
    private long segmentUplinkedCounter;  
    private long segmentAbandonedCounter;  
    private volatile DataCarrier<TraceSegment> carrier;  
    private volatile TraceSegmentReportServiceGrpc.TraceSegmentReportServiceStub serviceStub;  
    private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;  
​  
    @Override  
    public void prepare() throws Throwable {  
        ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);  
    }  
​  
    @Override  
    public void boot() throws Throwable {  
        lastLogTime = System.currentTimeMillis();  
        segmentUplinkedCounter = 0;  
        segmentAbandonedCounter = 0;  
        carrier = new DataCarrier<TraceSegment>(CHANNEL_SIZE, BUFFER_SIZE);  
        carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);  
        carrier.consume(this, 1);  
    }  
​  
    @Override  
    public void onComplete() throws Throwable {  
        TracingContext.ListenerManager.add(this);  
    }  
​  
    @Override  
    public void shutdown() throws Throwable {  
        TracingContext.ListenerManager.remove(this);  
        carrier.shutdownConsumers();  
    }  
​  
    @Override  
    public void init() {  
​  
    }  
​  
    @Override  
    public void consume(List<TraceSegment> data) {  
        if (CONNECTED.equals(status)) {  
            final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);  
            StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() {  
                @Override  
                public void onNext(Commands commands) {  
                    ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);  
                }  
​  
                @Override  
                public void onError(Throwable throwable) {  
                    status.finished();  
                    if (logger.isErrorEnable()) {  
                        logger.error(throwable, "Send UpstreamSegment to collector fail with a grpc internal exception.");  
                    }  
                    ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);  
                }  
​  
                @Override  
                public void onCompleted() {  
                    status.finished();  
                }  
            });  
​  
            try {  
                for (TraceSegment segment : data) {  
                    UpstreamSegment upstreamSegment = segment.transform();  
                    upstreamSegmentStreamObserver.onNext(upstreamSegment);  
                }  
            } catch (Throwable t) {  
                logger.error(t, "Transform and send UpstreamSegment to collector fail.");  
            }  
​  
            upstreamSegmentStreamObserver.onCompleted();  
​  
            status.wait4Finish();  
            segmentUplinkedCounter += data.size();  
        } else {  
            segmentAbandonedCounter += data.size();  
        }  
​  
        printUplinkStatus();  
    }  
​  
    private void printUplinkStatus() {  
        long currentTimeMillis = System.currentTimeMillis();  
        if (currentTimeMillis - lastLogTime > 30 * 1000) {  
            lastLogTime = currentTimeMillis;  
            if (segmentUplinkedCounter > 0) {  
                logger.debug("{} trace segments have been sent to collector.", segmentUplinkedCounter);  
                segmentUplinkedCounter = 0;  
            }  
            if (segmentAbandonedCounter > 0) {  
                logger.debug("{} trace segments have been abandoned, cause by no available channel.", segmentAbandonedCounter);  
                segmentAbandonedCounter = 0;  
            }  
        }  
    }  
​  
    @Override  
    public void onError(List<TraceSegment> data, Throwable t) {  
        logger.error(t, "Try to send {} trace segments to collector, with unexpected exception.", data.size());  
    }  
​  
    @Override  
    public void onExit() {  
​  
    }  
​  
    @Override  
    public void afterFinished(TraceSegment traceSegment) {  
        if (traceSegment.isIgnore()) {  
            return;  
        }  
        if (!carrier.produce(traceSegment)) {  
            if (logger.isDebugEnable()) {  
                logger.debug("One trace segment has been abandoned, cause by buffer is full.");  
            }  
        }  
    }  
​  
    @Override  
    public void statusChanged(GRPCChannelStatus status) {  
        if (CONNECTED.equals(status)) {  
            Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();  
            serviceStub = TraceSegmentReportServiceGrpc.newStub(channel);  
        }  
        this.status = status;  
    }  
}
  • TraceSegmentServiceClient implements BootService, IConsumer, TracingContextListener and GRPCChannelListener interfaces; its prepare method registers its own channelListener with GRPCChannelManager; its boot method sets lastLogTime, instantiates DataCarrier and sets its consumer as itself; its onComplete method executes TracingContext.ListenerManager.add(this); its shutdown Method n executes traingcontext. Listenermanager. Remove (this) and carrier.shutdownConsumers(); its consume method executes upstreamSegmentStreamObserver.onNext(upstreamSegment), upstreamSegmentStreamObserver.onCompleted() and status.wait4Finish(); its afterFinished method executes carrier. Process (tracesegment); its statusChanged Set serviceStub and status

ConsumerThread

skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java

public class ConsumerThread<T> extends Thread {  
    private volatile boolean running;  
    private IConsumer<T> consumer;  
    private List<DataSource> dataSources;  
    private long consumeCycle;  
​  
    ConsumerThread(String threadName, IConsumer<T> consumer, long consumeCycle) {  
        super(threadName);  
        this.consumer = consumer;  
        running = false;  
        dataSources = new ArrayList<DataSource>(1);  
        this.consumeCycle = consumeCycle;  
    }  
​  
    /**  
     * add whole buffer to consume  
     *  
     * @param sourceBuffer  
     */  
    void addDataSource(QueueBuffer<T> sourceBuffer) {  
        this.dataSources.add(new DataSource(sourceBuffer));  
    }  
​  
    @Override  
    public void run() {  
        running = true;  
​  
        final List<T> consumeList = new ArrayList<T>(1500);  
        while (running) {  
            if (!consume(consumeList)) {  
                try {  
                    Thread.sleep(consumeCycle);  
                } catch (InterruptedException e) {  
                }  
            }  
        }  
​  
        // consumer thread is going to stop  
        // consume the last time  
        consume(consumeList);  
​  
        consumer.onExit();  
    }  
​  
    private boolean consume(List<T> consumeList) {  
        for (DataSource dataSource : dataSources) {  
            dataSource.obtain(consumeList);  
        }  
​  
        if (!consumeList.isEmpty()) {  
            try {  
                consumer.consume(consumeList);  
            } catch (Throwable t) {  
                consumer.onError(consumeList, t);  
            } finally {  
                consumeList.clear();  
            }  
            return true;  
        }  
        return false;  
    }  
​  
    void shutdown() {  
        running = false;  
    }  
​  
    /**  
     * DataSource is a refer to {@link Buffer}.  
     */  
    class DataSource {  
        private QueueBuffer<T> sourceBuffer;  
​  
        DataSource(QueueBuffer<T> sourceBuffer) {  
            this.sourceBuffer = sourceBuffer;  
        }  
​  
        void obtain(List<T> consumeList) {  
            sourceBuffer.obtain(consumeList);  
        }  
    }  
}
  • ConsumerThread inherits Thread, and its run method will execute consumer (consumeList) circularly. When it jumps out of the cycle, it will execute consumer (consumeList) again, and finally consumer.onExit(); the consumer method will traverse dataSources, execute its datasource. Detail (consumeList), and then execute consumer. Consumer (consumeList) method when consumeList is not empty

ConsumeDriver

skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java

public class ConsumeDriver<T> implements IDriver {  
    private boolean running;  
    private ConsumerThread[] consumerThreads;  
    private Channels<T> channels;  
    private ReentrantLock lock;  
​  
    public ConsumeDriver(String name, Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num,  
        long consumeCycle) {  
        this(channels, num);  
        for (int i = 0; i < num; i++) {  
            consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass), consumeCycle);  
            consumerThreads[i].setDaemon(true);  
        }  
    }  
​  
    public ConsumeDriver(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {  
        this(channels, num);  
        prototype.init();  
        for (int i = 0; i < num; i++) {  
            consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", prototype, consumeCycle);  
            consumerThreads[i].setDaemon(true);  
        }  
​  
    }  
​  
    private ConsumeDriver(Channels<T> channels, int num) {  
        running = false;  
        this.channels = channels;  
        consumerThreads = new ConsumerThread[num];  
        lock = new ReentrantLock();  
    }  
​  
    private IConsumer<T> getNewConsumerInstance(Class<? extends IConsumer<T>> consumerClass) {  
        try {  
            IConsumer<T> inst = consumerClass.newInstance();  
            inst.init();  
            return inst;  
        } catch (InstantiationException e) {  
            throw new ConsumerCannotBeCreatedException(e);  
        } catch (IllegalAccessException e) {  
            throw new ConsumerCannotBeCreatedException(e);  
        }  
    }  
​  
    @Override  
    public void begin(Channels channels) {  
        if (running) {  
            return;  
        }  
        try {  
            lock.lock();  
            this.allocateBuffer2Thread();  
            for (ConsumerThread consumerThread : consumerThreads) {  
                consumerThread.start();  
            }  
            running = true;  
        } finally {  
            lock.unlock();  
        }  
    }  
​  
    @Override  
    public boolean isRunning(Channels channels) {  
        return running;  
    }  
​  
    private void allocateBuffer2Thread() {  
        int channelSize = this.channels.getChannelSize();  
        /**  
         * if consumerThreads.length < channelSize  
         * each consumer will process several channels.  
         *  
         * if consumerThreads.length == channelSize  
         * each consumer will process one channel.  
         *  
         * if consumerThreads.length > channelSize  
         * there will be some threads do nothing.  
         */  
        for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {  
            int consumerIndex = channelIndex % consumerThreads.length;  
            consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));  
        }  
​  
    }  
​  
    @Override  
    public void close(Channels channels) {  
        try {  
            lock.lock();  
            this.running = false;  
            for (ConsumerThread consumerThread : consumerThreads) {  
                consumerThread.shutdown();  
            }  
        } finally {  
            lock.unlock();  
        }  
    }  
}
  • The ConsumeDriver implements the IDriver interface, and its ConsumeDriver will create num consumerthreads; its begin method will execute allocateBuffer2Thread, add dataSource to each consumerthread, and then execute consumerThread.start(); its close method will execute consumerThread.shutdown()

Summary

TraceSegmentServiceClient implements BootService, IConsumer, TracingContextListener and GRPCChannelListener interfaces; its prepare method registers its own channelListener with GRPCChannelManager; its boot method sets lastLogTime, instantiates DataCarrier and sets its consumer as itself; its onComplete method executes TracingContext.ListenerManager.add(this); its shutdown Method n executes traingcontext. Listenermanager. Remove (this) and carrier.shutdownConsumers(); its consume method executes upstreamSegmentStreamObserver.onNext(upstreamSegment), upstreamSegmentStreamObserver.onCompleted() and status.wait4Finish(); its afterFinished method executes carrier. Process (tracesegment); its statusChanged Set serviceStub and status

doc

This paper focuses on the trace element service client of skywalking

TracingContextListener

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContextListener.java

public interface TracingContextListener {  
    void afterFinished(TraceSegment traceSegment);  
}
  • Tracecontextlistener defines the afterFinished method, whose parameter is traceinstance

TraceSegment

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/TraceSegment.java

public class TraceSegment {  
​  
    private ID traceSegmentId;  
​  
    private List<TraceSegmentRef> refs;  
​  
    private List<AbstractTracingSpan> spans;  
​  
    private DistributedTraceIds relatedGlobalTraces;  
​  
    private boolean ignore = false;  
​  
    private boolean isSizeLimited = false;  
​  
    private final long createTime;  
​  
    public TraceSegment() {  
        this.traceSegmentId = GlobalIdGenerator.generate();  
        this.spans = new LinkedList<AbstractTracingSpan>();  
        this.relatedGlobalTraces = new DistributedTraceIds();  
        this.relatedGlobalTraces.append(new NewDistributedTraceId());  
        this.createTime = System.currentTimeMillis();  
    }  
​  
    public void ref(TraceSegmentRef refSegment) {  
        if (refs == null) {  
            refs = new LinkedList<TraceSegmentRef>();  
        }  
        if (!refs.contains(refSegment)) {  
            refs.add(refSegment);  
        }  
    }  
​  
    public void relatedGlobalTraces(DistributedTraceId distributedTraceId) {  
        relatedGlobalTraces.append(distributedTraceId);  
    }  
​  
    public void archive(AbstractTracingSpan finishedSpan) {  
        spans.add(finishedSpan);  
    }  
​  
    public TraceSegment finish(boolean isSizeLimited) {  
        this.isSizeLimited = isSizeLimited;  
        return this;  
    }  
​  
    public ID getTraceSegmentId() {  
        return traceSegmentId;  
    }  
​  
    public int getServiceId() {  
        return RemoteDownstreamConfig.Agent.SERVICE_ID;  
    }  
​  
    public boolean hasRef() {  
        return !(refs == null || refs.size() == 0);  
    }  
​  
    public List<TraceSegmentRef> getRefs() {  
        return refs;  
    }  
​  
    public List<DistributedTraceId> getRelatedGlobalTraces() {  
        return relatedGlobalTraces.getRelatedGlobalTraces();  
    }  
​  
    public boolean isSingleSpanSegment() {  
        return this.spans != null && this.spans.size() == 1;  
    }  
​  
    public boolean isIgnore() {  
        return ignore;  
    }  
​  
    public void setIgnore(boolean ignore) {  
        this.ignore = ignore;  
    }  
​  
    public UpstreamSegment transform() {  
        UpstreamSegment.Builder upstreamBuilder = UpstreamSegment.newBuilder();  
        for (DistributedTraceId distributedTraceId : getRelatedGlobalTraces()) {  
            upstreamBuilder = upstreamBuilder.addGlobalTraceIds(distributedTraceId.toUniqueId());  
        }  
        SegmentObject.Builder traceSegmentBuilder = SegmentObject.newBuilder();  
        /**  
         * Trace Segment  
         */  
        traceSegmentBuilder.setTraceSegmentId(this.traceSegmentId.transform());  
        // Don't serialize TraceSegmentReference  
​  
        // SpanObject  
        for (AbstractTracingSpan span : this.spans) {  
            traceSegmentBuilder.addSpans(span.transform());  
        }  
        traceSegmentBuilder.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID);  
        traceSegmentBuilder.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);  
        traceSegmentBuilder.setIsSizeLimited(this.isSizeLimited);  
​  
        upstreamBuilder.setSegment(traceSegmentBuilder.build().toByteString());  
        return upstreamBuilder.build();  
    }  
​  
    @Override  
    public String toString() {  
        return "TraceSegment{" +  
            "traceSegmentId='" + traceSegmentId + '\'' +  
            ", refs=" + refs +  
            ", spans=" + spans +  
            ", relatedGlobalTraces=" + relatedGlobalTraces +  
            '}';  
    }  
​  
    public int getApplicationInstanceId() {  
        return RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID;  
    }  
​  
    public long createTime() {  
        return this.createTime;  
    }  
}
  • Traceinstance defines traceSegmentId, refs, spans, relatedGlobalTraces and other attributes; it provides methods such as ref, relatedGlobalTraces, archive, finish, transform, etc

IConsumer

skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IConsumer.java

public interface IConsumer<T> {  
    void init();  
​  
    void consume(List<T> data);  
​  
    void onError(List<T> data, Throwable t);  
​  
    void onExit();  
}
  • IConsumer defines init, consumer, onError, onExit methods

TraceSegmentServiceClient

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java

@DefaultImplementor  
public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, GRPCChannelListener {  
    private static final ILog logger = LogManager.getLogger(TraceSegmentServiceClient.class);  
    private static final int TIMEOUT = 30 * 1000;  
​  
    private long lastLogTime;  
    private long segmentUplinkedCounter;  
    private long segmentAbandonedCounter;  
    private volatile DataCarrier<TraceSegment> carrier;  
    private volatile TraceSegmentReportServiceGrpc.TraceSegmentReportServiceStub serviceStub;  
    private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;  
​  
    @Override  
    public void prepare() throws Throwable {  
        ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);  
    }  
​  
    @Override  
    public void boot() throws Throwable {  
        lastLogTime = System.currentTimeMillis();  
        segmentUplinkedCounter = 0;  
        segmentAbandonedCounter = 0;  
        carrier = new DataCarrier<TraceSegment>(CHANNEL_SIZE, BUFFER_SIZE);  
        carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);  
        carrier.consume(this, 1);  
    }  
​  
    @Override  
    public void onComplete() throws Throwable {  
        TracingContext.ListenerManager.add(this);  
    }  
​  
    @Override  
    public void shutdown() throws Throwable {  
        TracingContext.ListenerManager.remove(this);  
        carrier.shutdownConsumers();  
    }  
​  
    @Override  
    public void init() {  
​  
    }  
​  
    @Override  
    public void consume(List<TraceSegment> data) {  
        if (CONNECTED.equals(status)) {  
            final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);  
            StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() {  
                @Override  
                public void onNext(Commands commands) {  
                    ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);  
                }  
​  
                @Override  
                public void onError(Throwable throwable) {  
                    status.finished();  
                    if (logger.isErrorEnable()) {  
                        logger.error(throwable, "Send UpstreamSegment to collector fail with a grpc internal exception.");  
                    }  
                    ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);  
                }  
​  
                @Override  
                public void onCompleted() {  
                    status.finished();  
                }  
            });  
​  
            try {  
                for (TraceSegment segment : data) {  
                    UpstreamSegment upstreamSegment = segment.transform();  
                    upstreamSegmentStreamObserver.onNext(upstreamSegment);  
                }  
            } catch (Throwable t) {  
                logger.error(t, "Transform and send UpstreamSegment to collector fail.");  
            }  
​  
            upstreamSegmentStreamObserver.onCompleted();  
​  
            status.wait4Finish();  
            segmentUplinkedCounter += data.size();  
        } else {  
            segmentAbandonedCounter += data.size();  
        }  
​  
        printUplinkStatus();  
    }  
​  
    private void printUplinkStatus() {  
        long currentTimeMillis = System.currentTimeMillis();  
        if (currentTimeMillis - lastLogTime > 30 * 1000) {  
            lastLogTime = currentTimeMillis;  
            if (segmentUplinkedCounter > 0) {  
                logger.debug("{} trace segments have been sent to collector.", segmentUplinkedCounter);  
                segmentUplinkedCounter = 0;  
            }  
            if (segmentAbandonedCounter > 0) {  
                logger.debug("{} trace segments have been abandoned, cause by no available channel.", segmentAbandonedCounter);  
                segmentAbandonedCounter = 0;  
            }  
        }  
    }  
​  
    @Override  
    public void onError(List<TraceSegment> data, Throwable t) {  
        logger.error(t, "Try to send {} trace segments to collector, with unexpected exception.", data.size());  
    }  
​  
    @Override  
    public void onExit() {  
​  
    }  
​  
    @Override  
    public void afterFinished(TraceSegment traceSegment) {  
        if (traceSegment.isIgnore()) {  
            return;  
        }  
        if (!carrier.produce(traceSegment)) {  
            if (logger.isDebugEnable()) {  
                logger.debug("One trace segment has been abandoned, cause by buffer is full.");  
            }  
        }  
    }  
​  
    @Override  
    public void statusChanged(GRPCChannelStatus status) {  
        if (CONNECTED.equals(status)) {  
            Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();  
            serviceStub = TraceSegmentReportServiceGrpc.newStub(channel);  
        }  
        this.status = status;  
    }  
}
  • TraceSegmentServiceClient implements BootService, IConsumer, TracingContextListener and GRPCChannelListener interfaces; its prepare method registers its own channelListener with GRPCChannelManager; its boot method sets lastLogTime, instantiates DataCarrier and sets its consumer as itself; its onComplete method executes TracingContext.ListenerManager.add(this); its shutdown Method n executes traingcontext. Listenermanager. Remove (this) and carrier.shutdownConsumers(); its consume method executes upstreamSegmentStreamObserver.onNext(upstreamSegment), upstreamSegmentStreamObserver.onCompleted() and status.wait4Finish(); its afterFinished method executes carrier. Process (tracesegment); its statusChanged Set serviceStub and status

ConsumerThread

skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java

public class ConsumerThread<T> extends Thread {  
    private volatile boolean running;  
    private IConsumer<T> consumer;  
    private List<DataSource> dataSources;  
    private long consumeCycle;  
​  
    ConsumerThread(String threadName, IConsumer<T> consumer, long consumeCycle) {  
        super(threadName);  
        this.consumer = consumer;  
        running = false;  
        dataSources = new ArrayList<DataSource>(1);  
        this.consumeCycle = consumeCycle;  
    }  
​  
    /**  
     * add whole buffer to consume  
     *  
     * @param sourceBuffer  
     */  
    void addDataSource(QueueBuffer<T> sourceBuffer) {  
        this.dataSources.add(new DataSource(sourceBuffer));  
    }  
​  
    @Override  
    public void run() {  
        running = true;  
​  
        final List<T> consumeList = new ArrayList<T>(1500);  
        while (running) {  
            if (!consume(consumeList)) {  
                try {  
                    Thread.sleep(consumeCycle);  
                } catch (InterruptedException e) {  
                }  
            }  
        }  
​  
        // consumer thread is going to stop  
        // consume the last time  
        consume(consumeList);  
​  
        consumer.onExit();  
    }  
​  
    private boolean consume(List<T> consumeList) {  
        for (DataSource dataSource : dataSources) {  
            dataSource.obtain(consumeList);  
        }  
​  
        if (!consumeList.isEmpty()) {  
            try {  
                consumer.consume(consumeList);  
            } catch (Throwable t) {  
                consumer.onError(consumeList, t);  
            } finally {  
                consumeList.clear();  
            }  
            return true;  
        }  
        return false;  
    }  
​  
    void shutdown() {  
        running = false;  
    }  
​  
    /**  
     * DataSource is a refer to {@link Buffer}.  
     */  
    class DataSource {  
        private QueueBuffer<T> sourceBuffer;  
​  
        DataSource(QueueBuffer<T> sourceBuffer) {  
            this.sourceBuffer = sourceBuffer;  
        }  
​  
        void obtain(List<T> consumeList) {  
            sourceBuffer.obtain(consumeList);  
        }  
    }  
}
  • ConsumerThread inherits Thread, and its run method will execute consumer (consumeList) circularly. When it jumps out of the cycle, it will execute consumer (consumeList) again, and finally consumer.onExit(); the consumer method will traverse dataSources, execute its datasource. Detail (consumeList), and then execute consumer. Consumer (consumeList) method when consumeList is not empty

ConsumeDriver

skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java

public class ConsumeDriver<T> implements IDriver {  
    private boolean running;  
    private ConsumerThread[] consumerThreads;  
    private Channels<T> channels;  
    private ReentrantLock lock;  
​  
    public ConsumeDriver(String name, Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num,  
        long consumeCycle) {  
        this(channels, num);  
        for (int i = 0; i < num; i++) {  
            consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass), consumeCycle);  
            consumerThreads[i].setDaemon(true);  
        }  
    }  
​  
    public ConsumeDriver(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {  
        this(channels, num);  
        prototype.init();  
        for (int i = 0; i < num; i++) {  
            consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", prototype, consumeCycle);  
            consumerThreads[i].setDaemon(true);  
        }  
​  
    }  
​  
    private ConsumeDriver(Channels<T> channels, int num) {  
        running = false;  
        this.channels = channels;  
        consumerThreads = new ConsumerThread[num];  
        lock = new ReentrantLock();  
    }  
​  
    private IConsumer<T> getNewConsumerInstance(Class<? extends IConsumer<T>> consumerClass) {  
        try {  
            IConsumer<T> inst = consumerClass.newInstance();  
            inst.init();  
            return inst;  
        } catch (InstantiationException e) {  
            throw new ConsumerCannotBeCreatedException(e);  
        } catch (IllegalAccessException e) {  
            throw new ConsumerCannotBeCreatedException(e);  
        }  
    }  
​  
    @Override  
    public void begin(Channels channels) {  
        if (running) {  
            return;  
        }  
        try {  
            lock.lock();  
            this.allocateBuffer2Thread();  
            for (ConsumerThread consumerThread : consumerThreads) {  
                consumerThread.start();  
            }  
            running = true;  
        } finally {  
            lock.unlock();  
        }  
    }  
​  
    @Override  
    public boolean isRunning(Channels channels) {  
        return running;  
    }  
​  
    private void allocateBuffer2Thread() {  
        int channelSize = this.channels.getChannelSize();  
        /**  
         * if consumerThreads.length < channelSize  
         * each consumer will process several channels.  
         *  
         * if consumerThreads.length == channelSize  
         * each consumer will process one channel.  
         *  
         * if consumerThreads.length > channelSize  
         * there will be some threads do nothing.  
         */  
        for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {  
            int consumerIndex = channelIndex % consumerThreads.length;  
            consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));  
        }  
​  
    }  
​  
    @Override  
    public void close(Channels channels) {  
        try {  
            lock.lock();  
            this.running = false;  
            for (ConsumerThread consumerThread : consumerThreads) {  
                consumerThread.shutdown();  
            }  
        } finally {  
            lock.unlock();  
        }  
    }  
}
  • The ConsumeDriver implements the IDriver interface, and its ConsumeDriver will create num consumerthreads; its begin method will execute allocateBuffer2Thread, add dataSource to each consumerthread, and then execute consumerThread.start(); its close method will execute consumerThread.shutdown()

Summary

TraceSegmentServiceClient implements BootService, IConsumer, TracingContextListener and GRPCChannelListener interfaces; its prepare method registers its own channelListener with GRPCChannelManager; its boot method sets lastLogTime, instantiates DataCarrier and sets its consumer as itself; its onComplete method executes TracingContext.ListenerManager.add(this); its shutdown Method n executes traingcontext. Listenermanager. Remove (this) and carrier.shutdownConsumers(); its consume method executes upstreamSegmentStreamObserver.onNext(upstreamSegment), upstreamSegmentStreamObserver.onCompleted() and status.wait4Finish(); its afterFinished method executes carrier. Process (tracesegment); its statusChanged Set serviceStub and status

doc

This paper focuses on the trace element service client of skywalking

TracingContextListener

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContextListener.java

public interface TracingContextListener {  
    void afterFinished(TraceSegment traceSegment);  
}
  • Tracecontextlistener defines the afterFinished method, whose parameter is traceinstance

TraceSegment

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/TraceSegment.java

public class TraceSegment {  
​  
    private ID traceSegmentId;  
​  
    private List<TraceSegmentRef> refs;  
​  
    private List<AbstractTracingSpan> spans;  
​  
    private DistributedTraceIds relatedGlobalTraces;  
​  
    private boolean ignore = false;  
​  
    private boolean isSizeLimited = false;  
​  
    private final long createTime;  
​  
    public TraceSegment() {  
        this.traceSegmentId = GlobalIdGenerator.generate();  
        this.spans = new LinkedList<AbstractTracingSpan>();  
        this.relatedGlobalTraces = new DistributedTraceIds();  
        this.relatedGlobalTraces.append(new NewDistributedTraceId());  
        this.createTime = System.currentTimeMillis();  
    }  
​  
    public void ref(TraceSegmentRef refSegment) {  
        if (refs == null) {  
            refs = new LinkedList<TraceSegmentRef>();  
        }  
        if (!refs.contains(refSegment)) {  
            refs.add(refSegment);  
        }  
    }  
​  
    public void relatedGlobalTraces(DistributedTraceId distributedTraceId) {  
        relatedGlobalTraces.append(distributedTraceId);  
    }  
​  
    public void archive(AbstractTracingSpan finishedSpan) {  
        spans.add(finishedSpan);  
    }  
​  
    public TraceSegment finish(boolean isSizeLimited) {  
        this.isSizeLimited = isSizeLimited;  
        return this;  
    }  
​  
    public ID getTraceSegmentId() {  
        return traceSegmentId;  
    }  
​  
    public int getServiceId() {  
        return RemoteDownstreamConfig.Agent.SERVICE_ID;  
    }  
​  
    public boolean hasRef() {  
        return !(refs == null || refs.size() == 0);  
    }  
​  
    public List<TraceSegmentRef> getRefs() {  
        return refs;  
    }  
​  
    public List<DistributedTraceId> getRelatedGlobalTraces() {  
        return relatedGlobalTraces.getRelatedGlobalTraces();  
    }  
​  
    public boolean isSingleSpanSegment() {  
        return this.spans != null && this.spans.size() == 1;  
    }  
​  
    public boolean isIgnore() {  
        return ignore;  
    }  
​  
    public void setIgnore(boolean ignore) {  
        this.ignore = ignore;  
    }  
​  
    public UpstreamSegment transform() {  
        UpstreamSegment.Builder upstreamBuilder = UpstreamSegment.newBuilder();  
        for (DistributedTraceId distributedTraceId : getRelatedGlobalTraces()) {  
            upstreamBuilder = upstreamBuilder.addGlobalTraceIds(distributedTraceId.toUniqueId());  
        }  
        SegmentObject.Builder traceSegmentBuilder = SegmentObject.newBuilder();  
        /**  
         * Trace Segment  
         */  
        traceSegmentBuilder.setTraceSegmentId(this.traceSegmentId.transform());  
        // Don't serialize TraceSegmentReference  
​  
        // SpanObject  
        for (AbstractTracingSpan span : this.spans) {  
            traceSegmentBuilder.addSpans(span.transform());  
        }  
        traceSegmentBuilder.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID);  
        traceSegmentBuilder.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);  
        traceSegmentBuilder.setIsSizeLimited(this.isSizeLimited);  
​  
        upstreamBuilder.setSegment(traceSegmentBuilder.build().toByteString());  
        return upstreamBuilder.build();  
    }  
​  
    @Override  
    public String toString() {  
        return "TraceSegment{" +  
            "traceSegmentId='" + traceSegmentId + '\'' +  
            ", refs=" + refs +  
            ", spans=" + spans +  
            ", relatedGlobalTraces=" + relatedGlobalTraces +  
            '}';  
    }  
​  
    public int getApplicationInstanceId() {  
        return RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID;  
    }  
​  
    public long createTime() {  
        return this.createTime;  
    }  
}
  • Traceinstance defines traceSegmentId, refs, spans, relatedGlobalTraces and other attributes; it provides methods such as ref, relatedGlobalTraces, archive, finish, transform, etc

IConsumer

skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IConsumer.java

public interface IConsumer<T> {  
    void init();  
​  
    void consume(List<T> data);  
​  
    void onError(List<T> data, Throwable t);  
​  
    void onExit();  
}
  • IConsumer defines init, consumer, onError, onExit methods

TraceSegmentServiceClient

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java

@DefaultImplementor  
public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, GRPCChannelListener {  
    private static final ILog logger = LogManager.getLogger(TraceSegmentServiceClient.class);  
    private static final int TIMEOUT = 30 * 1000;  
​  
    private long lastLogTime;  
    private long segmentUplinkedCounter;  
    private long segmentAbandonedCounter;  
    private volatile DataCarrier<TraceSegment> carrier;  
    private volatile TraceSegmentReportServiceGrpc.TraceSegmentReportServiceStub serviceStub;  
    private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;  
​  
    @Override  
    public void prepare() throws Throwable {  
        ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);  
    }  
​  
    @Override  
    public void boot() throws Throwable {  
        lastLogTime = System.currentTimeMillis();  
        segmentUplinkedCounter = 0;  
        segmentAbandonedCounter = 0;  
        carrier = new DataCarrier<TraceSegment>(CHANNEL_SIZE, BUFFER_SIZE);  
        carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);  
        carrier.consume(this, 1);  
    }  
​  
    @Override  
    public void onComplete() throws Throwable {  
        TracingContext.ListenerManager.add(this);  
    }  
​  
    @Override  
    public void shutdown() throws Throwable {  
        TracingContext.ListenerManager.remove(this);  
        carrier.shutdownConsumers();  
    }  
​  
    @Override  
    public void init() {  
​  
    }  
​  
    @Override  
    public void consume(List<TraceSegment> data) {  
        if (CONNECTED.equals(status)) {  
            final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);  
            StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() {  
                @Override  
                public void onNext(Commands commands) {  
                    ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);  
                }  
​  
                @Override  
                public void onError(Throwable throwable) {  
                    status.finished();  
                    if (logger.isErrorEnable()) {  
                        logger.error(throwable, "Send UpstreamSegment to collector fail with a grpc internal exception.");  
                    }  
                    ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);  
                }  
​  
                @Override  
                public void onCompleted() {  
                    status.finished();  
                }  
            });  
​  
            try {  
                for (TraceSegment segment : data) {  
                    UpstreamSegment upstreamSegment = segment.transform();  
                    upstreamSegmentStreamObserver.onNext(upstreamSegment);  
                }  
            } catch (Throwable t) {  
                logger.error(t, "Transform and send UpstreamSegment to collector fail.");  
            }  
​  
            upstreamSegmentStreamObserver.onCompleted();  
​  
            status.wait4Finish();  
            segmentUplinkedCounter += data.size();  
        } else {  
            segmentAbandonedCounter += data.size();  
        }  
​  
        printUplinkStatus();  
    }  
​  
    private void printUplinkStatus() {  
        long currentTimeMillis = System.currentTimeMillis();  
        if (currentTimeMillis - lastLogTime > 30 * 1000) {  
            lastLogTime = currentTimeMillis;  
            if (segmentUplinkedCounter > 0) {  
                logger.debug("{} trace segments have been sent to collector.", segmentUplinkedCounter);  
                segmentUplinkedCounter = 0;  
            }  
            if (segmentAbandonedCounter > 0) {  
                logger.debug("{} trace segments have been abandoned, cause by no available channel.", segmentAbandonedCounter);  
                segmentAbandonedCounter = 0;  
            }  
        }  
    }  
​  
    @Override  
    public void onError(List<TraceSegment> data, Throwable t) {  
        logger.error(t, "Try to send {} trace segments to collector, with unexpected exception.", data.size());  
    }  
​  
    @Override  
    public void onExit() {  
​  
    }  
​  
    @Override  
    public void afterFinished(TraceSegment traceSegment) {  
        if (traceSegment.isIgnore()) {  
            return;  
        }  
        if (!carrier.produce(traceSegment)) {  
            if (logger.isDebugEnable()) {  
                logger.debug("One trace segment has been abandoned, cause by buffer is full.");  
            }  
        }  
    }  
​  
    @Override  
    public void statusChanged(GRPCChannelStatus status) {  
        if (CONNECTED.equals(status)) {  
            Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();  
            serviceStub = TraceSegmentReportServiceGrpc.newStub(channel);  
        }  
        this.status = status;  
    }  
}
  • TraceSegmentServiceClient implements BootService, IConsumer, TracingContextListener and GRPCChannelListener interfaces; its prepare method registers its own channelListener with GRPCChannelManager; its boot method sets lastLogTime, instantiates DataCarrier and sets its consumer as itself; its onComplete method executes TracingContext.ListenerManager.add(this); its shutdown Method n executes traingcontext. Listenermanager. Remove (this) and carrier.shutdownConsumers(); its consume method executes upstreamSegmentStreamObserver.onNext(upstreamSegment), upstreamSegmentStreamObserver.onCompleted() and status.wait4Finish(); its afterFinished method executes carrier. Process (tracesegment); its statusChanged Set serviceStub and status

ConsumerThread

skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java

public class ConsumerThread<T> extends Thread {  
    private volatile boolean running;  
    private IConsumer<T> consumer;  
    private List<DataSource> dataSources;  
    private long consumeCycle;  
​  
    ConsumerThread(String threadName, IConsumer<T> consumer, long consumeCycle) {  
        super(threadName);  
        this.consumer = consumer;  
        running = false;  
        dataSources = new ArrayList<DataSource>(1);  
        this.consumeCycle = consumeCycle;  
    }  
​  
    /**  
     * add whole buffer to consume  
     *  
     * @param sourceBuffer  
     */  
    void addDataSource(QueueBuffer<T> sourceBuffer) {  
        this.dataSources.add(new DataSource(sourceBuffer));  
    }  
​  
    @Override  
    public void run() {  
        running = true;  
​  
        final List<T> consumeList = new ArrayList<T>(1500);  
        while (running) {  
            if (!consume(consumeList)) {  
                try {  
                    Thread.sleep(consumeCycle);  
                } catch (InterruptedException e) {  
                }  
            }  
        }  
​  
        // consumer thread is going to stop  
        // consume the last time  
        consume(consumeList);  
​  
        consumer.onExit();  
    }  
​  
    private boolean consume(List<T> consumeList) {  
        for (DataSource dataSource : dataSources) {  
            dataSource.obtain(consumeList);  
        }  
​  
        if (!consumeList.isEmpty()) {  
            try {  
                consumer.consume(consumeList);  
            } catch (Throwable t) {  
                consumer.onError(consumeList, t);  
            } finally {  
                consumeList.clear();  
            }  
            return true;  
        }  
        return false;  
    }  
​  
    void shutdown() {  
        running = false;  
    }  
​  
    /**  
     * DataSource is a refer to {@link Buffer}.  
     */  
    class DataSource {  
        private QueueBuffer<T> sourceBuffer;  
​  
        DataSource(QueueBuffer<T> sourceBuffer) {  
            this.sourceBuffer = sourceBuffer;  
        }  
​  
        void obtain(List<T> consumeList) {  
            sourceBuffer.obtain(consumeList);  
        }  
    }  
}
  • ConsumerThread inherits Thread, and its run method will execute consumer (consumeList) circularly. When it jumps out of the cycle, it will execute consumer (consumeList) again, and finally consumer.onExit(); the consumer method will traverse dataSources, execute its datasource. Detail (consumeList), and then execute consumer. Consumer (consumeList) method when consumeList is not empty

ConsumeDriver

skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java

public class ConsumeDriver<T> implements IDriver {  
    private boolean running;  
    private ConsumerThread[] consumerThreads;  
    private Channels<T> channels;  
    private ReentrantLock lock;  
​  
    public ConsumeDriver(String name, Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num,  
        long consumeCycle) {  
        this(channels, num);  
        for (int i = 0; i < num; i++) {  
            consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass), consumeCycle);  
            consumerThreads[i].setDaemon(true);  
        }  
    }  
​  
    public ConsumeDriver(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {  
        this(channels, num);  
        prototype.init();  
        for (int i = 0; i < num; i++) {  
            consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", prototype, consumeCycle);  
            consumerThreads[i].setDaemon(true);  
        }  
​  
    }  
​  
    private ConsumeDriver(Channels<T> channels, int num) {  
        running = false;  
        this.channels = channels;  
        consumerThreads = new ConsumerThread[num];  
        lock = new ReentrantLock();  
    }  
​  
    private IConsumer<T> getNewConsumerInstance(Class<? extends IConsumer<T>> consumerClass) {  
        try {  
            IConsumer<T> inst = consumerClass.newInstance();  
            inst.init();  
            return inst;  
        } catch (InstantiationException e) {  
            throw new ConsumerCannotBeCreatedException(e);  
        } catch (IllegalAccessException e) {  
            throw new ConsumerCannotBeCreatedException(e);  
        }  
    }  
​  
    @Override  
    public void begin(Channels channels) {  
        if (running) {  
            return;  
        }  
        try {  
            lock.lock();  
            this.allocateBuffer2Thread();  
            for (ConsumerThread consumerThread : consumerThreads) {  
                consumerThread.start();  
            }  
            running = true;  
        } finally {  
            lock.unlock();  
        }  
    }  
​  
    @Override  
    public boolean isRunning(Channels channels) {  
        return running;  
    }  
​  
    private void allocateBuffer2Thread() {  
        int channelSize = this.channels.getChannelSize();  
        /**  
         * if consumerThreads.length < channelSize  
         * each consumer will process several channels.  
         *  
         * if consumerThreads.length == channelSize  
         * each consumer will process one channel.  
         *  
         * if consumerThreads.length > channelSize  
         * there will be some threads do nothing.  
         */  
        for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {  
            int consumerIndex = channelIndex % consumerThreads.length;  
            consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));  
        }  
​  
    }  
​  
    @Override  
    public void close(Channels channels) {  
        try {  
            lock.lock();  
            this.running = false;  
            for (ConsumerThread consumerThread : consumerThreads) {  
                consumerThread.shutdown();  
            }  
        } finally {  
            lock.unlock();  
        }  
    }  
}
  • The ConsumeDriver implements the IDriver interface, and its ConsumeDriver will create num consumerthreads; its begin method will execute allocateBuffer2Thread, add dataSource to each consumerthread, and then execute consumerThread.start(); its close method will execute consumerThread.shutdown()

Summary

TraceSegmentServiceClient implements BootService, IConsumer, TracingContextListener and GRPCChannelListener interfaces; its prepare method registers its own channelListener with GRPCChannelManager; its boot method sets lastLogTime, instantiates DataCarrier and sets its consumer as itself; its onComplete method executes TracingContext.ListenerManager.add(this); its shutdown Method n executes traingcontext. Listenermanager. Remove (this) and carrier.shutdownConsumers(); its consume method executes upstreamSegmentStreamObserver.onNext(upstreamSegment), upstreamSegmentStreamObserver.onCompleted() and status.wait4Finish(); its afterFinished method executes carrier. Process (tracesegment); its statusChanged Set serviceStub and status

doc

This paper focuses on the trace element service client of skywalking

TracingContextListener

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContextListener.java

public interface TracingContextListener {  
    void afterFinished(TraceSegment traceSegment);  
}
  • Tracecontextlistener defines the afterFinished method, whose parameter is traceinstance

TraceSegment

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/TraceSegment.java

public class TraceSegment {  
​  
    private ID traceSegmentId;  
​  
    private List<TraceSegmentRef> refs;  
​  
    private List<AbstractTracingSpan> spans;  
​  
    private DistributedTraceIds relatedGlobalTraces;  
​  
    private boolean ignore = false;  
​  
    private boolean isSizeLimited = false;  
​  
    private final long createTime;  
​  
    public TraceSegment() {  
        this.traceSegmentId = GlobalIdGenerator.generate();  
        this.spans = new LinkedList<AbstractTracingSpan>();  
        this.relatedGlobalTraces = new DistributedTraceIds();  
        this.relatedGlobalTraces.append(new NewDistributedTraceId());  
        this.createTime = System.currentTimeMillis();  
    }  
​  
    public void ref(TraceSegmentRef refSegment) {  
        if (refs == null) {  
            refs = new LinkedList<TraceSegmentRef>();  
        }  
        if (!refs.contains(refSegment)) {  
            refs.add(refSegment);  
        }  
    }  
​  
    public void relatedGlobalTraces(DistributedTraceId distributedTraceId) {  
        relatedGlobalTraces.append(distributedTraceId);  
    }  
​  
    public void archive(AbstractTracingSpan finishedSpan) {  
        spans.add(finishedSpan);  
    }  
​  
    public TraceSegment finish(boolean isSizeLimited) {  
        this.isSizeLimited = isSizeLimited;  
        return this;  
    }  
​  
    public ID getTraceSegmentId() {  
        return traceSegmentId;  
    }  
​  
    public int getServiceId() {  
        return RemoteDownstreamConfig.Agent.SERVICE_ID;  
    }  
​  
    public boolean hasRef() {  
        return !(refs == null || refs.size() == 0);  
    }  
​  
    public List<TraceSegmentRef> getRefs() {  
        return refs;  
    }  
​  
    public List<DistributedTraceId> getRelatedGlobalTraces() {  
        return relatedGlobalTraces.getRelatedGlobalTraces();  
    }  
​  
    public boolean isSingleSpanSegment() {  
        return this.spans != null && this.spans.size() == 1;  
    }  
​  
    public boolean isIgnore() {  
        return ignore;  
    }  
​  
    public void setIgnore(boolean ignore) {  
        this.ignore = ignore;  
    }  
​  
    public UpstreamSegment transform() {  
        UpstreamSegment.Builder upstreamBuilder = UpstreamSegment.newBuilder();  
        for (DistributedTraceId distributedTraceId : getRelatedGlobalTraces()) {  
            upstreamBuilder = upstreamBuilder.addGlobalTraceIds(distributedTraceId.toUniqueId());  
        }  
        SegmentObject.Builder traceSegmentBuilder = SegmentObject.newBuilder();  
        /**  
         * Trace Segment  
         */  
        traceSegmentBuilder.setTraceSegmentId(this.traceSegmentId.transform());  
        // Don't serialize TraceSegmentReference  
​  
        // SpanObject  
        for (AbstractTracingSpan span : this.spans) {  
            traceSegmentBuilder.addSpans(span.transform());  
        }  
        traceSegmentBuilder.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID);  
        traceSegmentBuilder.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);  
        traceSegmentBuilder.setIsSizeLimited(this.isSizeLimited);  
​  
        upstreamBuilder.setSegment(traceSegmentBuilder.build().toByteString());  
        return upstreamBuilder.build();  
    }  
​  
    @Override  
    public String toString() {  
        return "TraceSegment{" +  
            "traceSegmentId='" + traceSegmentId + '\'' +  
            ", refs=" + refs +  
            ", spans=" + spans +  
            ", relatedGlobalTraces=" + relatedGlobalTraces +  
            '}';  
    }  
​  
    public int getApplicationInstanceId() {  
        return RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID;  
    }  
​  
    public long createTime() {  
        return this.createTime;  
    }  
}
  • Traceinstance defines traceSegmentId, refs, spans, relatedGlobalTraces and other attributes; it provides methods such as ref, relatedGlobalTraces, archive, finish, transform, etc

IConsumer

skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IConsumer.java

public interface IConsumer<T> {  
    void init();  
​  
    void consume(List<T> data);  
​  
    void onError(List<T> data, Throwable t);  
​  
    void onExit();  
}
  • IConsumer defines init, consumer, onError, onExit methods

TraceSegmentServiceClient

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java

@DefaultImplementor  
public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, GRPCChannelListener {  
    private static final ILog logger = LogManager.getLogger(TraceSegmentServiceClient.class);  
    private static final int TIMEOUT = 30 * 1000;  
​  
    private long lastLogTime;  
    private long segmentUplinkedCounter;  
    private long segmentAbandonedCounter;  
    private volatile DataCarrier<TraceSegment> carrier;  
    private volatile TraceSegmentReportServiceGrpc.TraceSegmentReportServiceStub serviceStub;  
    private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;  
​  
    @Override  
    public void prepare() throws Throwable {  
        ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);  
    }  
​  
    @Override  
    public void boot() throws Throwable {  
        lastLogTime = System.currentTimeMillis();  
        segmentUplinkedCounter = 0;  
        segmentAbandonedCounter = 0;  
        carrier = new DataCarrier<TraceSegment>(CHANNEL_SIZE, BUFFER_SIZE);  
        carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);  
        carrier.consume(this, 1);  
    }  
​  
    @Override  
    public void onComplete() throws Throwable {  
        TracingContext.ListenerManager.add(this);  
    }  
​  
    @Override  
    public void shutdown() throws Throwable {  
        TracingContext.ListenerManager.remove(this);  
        carrier.shutdownConsumers();  
    }  
​  
    @Override  
    public void init() {  
​  
    }  
​  
    @Override  
    public void consume(List<TraceSegment> data) {  
        if (CONNECTED.equals(status)) {  
            final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);  
            StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() {  
                @Override  
                public void onNext(Commands commands) {  
                    ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);  
                }  
​  
                @Override  
                public void onError(Throwable throwable) {  
                    status.finished();  
                    if (logger.isErrorEnable()) {  
                        logger.error(throwable, "Send UpstreamSegment to collector fail with a grpc internal exception.");  
                    }  
                    ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);  
                }  
​  
                @Override  
                public void onCompleted() {  
                    status.finished();  
                }  
            });  
​  
            try {  
                for (TraceSegment segment : data) {  
                    UpstreamSegment upstreamSegment = segment.transform();  
                    upstreamSegmentStreamObserver.onNext(upstreamSegment);  
                }  
            } catch (Throwable t) {  
                logger.error(t, "Transform and send UpstreamSegment to collector fail.");  
            }  
​  
            upstreamSegmentStreamObserver.onCompleted();  
​  
            status.wait4Finish();  
            segmentUplinkedCounter += data.size();  
        } else {  
            segmentAbandonedCounter += data.size();  
        }  
​  
        printUplinkStatus();  
    }  
​  
    private void printUplinkStatus() {  
        long currentTimeMillis = System.currentTimeMillis();  
        if (currentTimeMillis - lastLogTime > 30 * 1000) {  
            lastLogTime = currentTimeMillis;  
            if (segmentUplinkedCounter > 0) {  
                logger.debug("{} trace segments have been sent to collector.", segmentUplinkedCounter);  
                segmentUplinkedCounter = 0;  
            }  
            if (segmentAbandonedCounter > 0) {  
                logger.debug("{} trace segments have been abandoned, cause by no available channel.", segmentAbandonedCounter);  
                segmentAbandonedCounter = 0;  
            }  
        }  
    }  
​  
    @Override  
    public void onError(List<TraceSegment> data, Throwable t) {  
        logger.error(t, "Try to send {} trace segments to collector, with unexpected exception.", data.size());  
    }  
​  
    @Override  
    public void onExit() {  
​  
    }  
​  
    @Override  
    public void afterFinished(TraceSegment traceSegment) {  
        if (traceSegment.isIgnore()) {  
            return;  
        }  
        if (!carrier.produce(traceSegment)) {  
            if (logger.isDebugEnable()) {  
                logger.debug("One trace segment has been abandoned, cause by buffer is full.");  
            }  
        }  
    }  
​  
    @Override  
    public void statusChanged(GRPCChannelStatus status) {  
        if (CONNECTED.equals(status)) {  
            Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();  
            serviceStub = TraceSegmentReportServiceGrpc.newStub(channel);  
        }  
        this.status = status;  
    }  
}
  • TraceSegmentServiceClient implements BootService, IConsumer, TracingContextListener and GRPCChannelListener interfaces; its prepare method registers its own channelListener with GRPCChannelManager; its boot method sets lastLogTime, instantiates DataCarrier and sets its consumer as itself; its onComplete method executes TracingContext.ListenerManager.add(this); its shutdown Method n executes traingcontext. Listenermanager. Remove (this) and carrier.shutdownConsumers(); its consume method executes upstreamSegmentStreamObserver.onNext(upstreamSegment), upstreamSegmentStreamObserver.onCompleted() and status.wait4Finish(); its afterFinished method executes carrier. Process (tracesegment); its statusChanged Set serviceStub and status

ConsumerThread

skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java

public class ConsumerThread<T> extends Thread {  
    private volatile boolean running;  
    private IConsumer<T> consumer;  
    private List<DataSource> dataSources;  
    private long consumeCycle;  
​  
    ConsumerThread(String threadName, IConsumer<T> consumer, long consumeCycle) {  
        super(threadName);  
        this.consumer = consumer;  
        running = false;  
        dataSources = new ArrayList<DataSource>(1);  
        this.consumeCycle = consumeCycle;  
    }  
​  
    /**  
     * add whole buffer to consume  
     *  
     * @param sourceBuffer  
     */  
    void addDataSource(QueueBuffer<T> sourceBuffer) {  
        this.dataSources.add(new DataSource(sourceBuffer));  
    }  
​  
    @Override  
    public void run() {  
        running = true;  
​  
        final List<T> consumeList = new ArrayList<T>(1500);  
        while (running) {  
            if (!consume(consumeList)) {  
                try {  
                    Thread.sleep(consumeCycle);  
                } catch (InterruptedException e) {  
                }  
            }  
        }  
​  
        // consumer thread is going to stop  
        // consume the last time  
        consume(consumeList);  
​  
        consumer.onExit();  
    }  
​  
    private boolean consume(List<T> consumeList) {  
        for (DataSource dataSource : dataSources) {  
            dataSource.obtain(consumeList);  
        }  
​  
        if (!consumeList.isEmpty()) {  
            try {  
                consumer.consume(consumeList);  
            } catch (Throwable t) {  
                consumer.onError(consumeList, t);  
            } finally {  
                consumeList.clear();  
            }  
            return true;  
        }  
        return false;  
    }  
​  
    void shutdown() {  
        running = false;  
    }  
​  
    /**  
     * DataSource is a refer to {@link Buffer}.  
     */  
    class DataSource {  
        private QueueBuffer<T> sourceBuffer;  
​  
        DataSource(QueueBuffer<T> sourceBuffer) {  
            this.sourceBuffer = sourceBuffer;  
        }  
​  
        void obtain(List<T> consumeList) {  
            sourceBuffer.obtain(consumeList);  
        }  
    }  
}
  • ConsumerThread inherits Thread, and its run method will execute consumer (consumeList) circularly. When it jumps out of the cycle, it will execute consumer (consumeList) again, and finally consumer.onExit(); the consumer method will traverse dataSources, execute its datasource. Detail (consumeList), and then execute consumer. Consumer (consumeList) method when consumeList is not empty

ConsumeDriver

skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java

public class ConsumeDriver<T> implements IDriver {  
    private boolean running;  
    private ConsumerThread[] consumerThreads;  
    private Channels<T> channels;  
    private ReentrantLock lock;  
​  
    public ConsumeDriver(String name, Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num,  
        long consumeCycle) {  
        this(channels, num);  
        for (int i = 0; i < num; i++) {  
            consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass), consumeCycle);  
            consumerThreads[i].setDaemon(true);  
        }  
    }  
​  
    public ConsumeDriver(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {  
        this(channels, num);  
        prototype.init();  
        for (int i = 0; i < num; i++) {  
            consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", prototype, consumeCycle);  
            consumerThreads[i].setDaemon(true);  
        }  
​  
    }  
​  
    private ConsumeDriver(Channels<T> channels, int num) {  
        running = false;  
        this.channels = channels;  
        consumerThreads = new ConsumerThread[num];  
        lock = new ReentrantLock();  
    }  
​  
    private IConsumer<T> getNewConsumerInstance(Class<? extends IConsumer<T>> consumerClass) {  
        try {  
            IConsumer<T> inst = consumerClass.newInstance();  
            inst.init();  
            return inst;  
        } catch (InstantiationException e) {  
            throw new ConsumerCannotBeCreatedException(e);  
        } catch (IllegalAccessException e) {  
            throw new ConsumerCannotBeCreatedException(e);  
        }  
    }  
​  
    @Override  
    public void begin(Channels channels) {  
        if (running) {  
            return;  
        }  
        try {  
            lock.lock();  
            this.allocateBuffer2Thread();  
            for (ConsumerThread consumerThread : consumerThreads) {  
                consumerThread.start();  
            }  
            running = true;  
        } finally {  
            lock.unlock();  
        }  
    }  
​  
    @Override  
    public boolean isRunning(Channels channels) {  
        return running;  
    }  
​  
    private void allocateBuffer2Thread() {  
        int channelSize = this.channels.getChannelSize();  
        /**  
         * if consumerThreads.length < channelSize  
         * each consumer will process several channels.  
         *  
         * if consumerThreads.length == channelSize  
         * each consumer will process one channel.  
         *  
         * if consumerThreads.length > channelSize  
         * there will be some threads do nothing.  
         */  
        for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {  
            int consumerIndex = channelIndex % consumerThreads.length;  
            consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));  
        }  
​  
    }  
​  
    @Override  
    public void close(Channels channels) {  
        try {  
            lock.lock();  
            this.running = false;  
            for (ConsumerThread consumerThread : consumerThreads) {  
                consumerThread.shutdown();  
            }  
        } finally {  
            lock.unlock();  
        }  
    }  
}
  • The ConsumeDriver implements the IDriver interface, and its ConsumeDriver will create num consumerthreads; its begin method will execute allocateBuffer2Thread, add dataSource to each consumerthread, and then execute consumerThread.start(); its close method will execute consumerThread.shutdown()

Summary

TraceSegmentServiceClient implements BootService, IConsumer, TracingContextListener and GRPCChannelListener interfaces; its prepare method registers its own channelListener with GRPCChannelManager; its boot method sets lastLogTime, instantiates DataCarrier and sets its consumer as itself; its onComplete method executes TracingContext.ListenerManager.add(this); its shutdown Method n executes traingcontext. Listenermanager. Remove (this) and carrier.shutdownConsumers(); its consume method executes upstreamSegmentStreamObserver.onNext(upstreamSegment), upstreamSegmentStreamObserver.onCompleted() and status.wait4Finish(); its afterFinished method executes carrier. Process (tracesegment); its statusChanged Set serviceStub and status

doc

This paper focuses on the trace element service client of skywalking

TracingContextListener

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContextListener.java

public interface TracingContextListener {  
    void afterFinished(TraceSegment traceSegment);  
}
  • Tracecontextlistener defines the afterFinished method, whose parameter is traceinstance

TraceSegment

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/TraceSegment.java

public class TraceSegment {  
​  
    private ID traceSegmentId;  
​  
    private List<TraceSegmentRef> refs;  
​  
    private List<AbstractTracingSpan> spans;  
​  
    private DistributedTraceIds relatedGlobalTraces;  
​  
    private boolean ignore = false;  
​  
    private boolean isSizeLimited = false;  
​  
    private final long createTime;  
​  
    public TraceSegment() {  
        this.traceSegmentId = GlobalIdGenerator.generate();  
        this.spans = new LinkedList<AbstractTracingSpan>();  
        this.relatedGlobalTraces = new DistributedTraceIds();  
        this.relatedGlobalTraces.append(new NewDistributedTraceId());  
        this.createTime = System.currentTimeMillis();  
    }  
​  
    public void ref(TraceSegmentRef refSegment) {  
        if (refs == null) {  
            refs = new LinkedList<TraceSegmentRef>();  
        }  
        if (!refs.contains(refSegment)) {  
            refs.add(refSegment);  
        }  
    }  
​  
    public void relatedGlobalTraces(DistributedTraceId distributedTraceId) {  
        relatedGlobalTraces.append(distributedTraceId);  
    }  
​  
    public void archive(AbstractTracingSpan finishedSpan) {  
        spans.add(finishedSpan);  
    }  
​  
    public TraceSegment finish(boolean isSizeLimited) {  
        this.isSizeLimited = isSizeLimited;  
        return this;  
    }  
​  
    public ID getTraceSegmentId() {  
        return traceSegmentId;  
    }  
​  
    public int getServiceId() {  
        return RemoteDownstreamConfig.Agent.SERVICE_ID;  
    }  
​  
    public boolean hasRef() {  
        return !(refs == null || refs.size() == 0);  
    }  
​  
    public List<TraceSegmentRef> getRefs() {  
        return refs;  
    }  
​  
    public List<DistributedTraceId> getRelatedGlobalTraces() {  
        return relatedGlobalTraces.getRelatedGlobalTraces();  
    }  
​  
    public boolean isSingleSpanSegment() {  
        return this.spans != null && this.spans.size() == 1;  
    }  
​  
    public boolean isIgnore() {  
        return ignore;  
    }  
​  
    public void setIgnore(boolean ignore) {  
        this.ignore = ignore;  
    }  
​  
    public UpstreamSegment transform() {  
        UpstreamSegment.Builder upstreamBuilder = UpstreamSegment.newBuilder();  
        for (DistributedTraceId distributedTraceId : getRelatedGlobalTraces()) {  
            upstreamBuilder = upstreamBuilder.addGlobalTraceIds(distributedTraceId.toUniqueId());  
        }  
        SegmentObject.Builder traceSegmentBuilder = SegmentObject.newBuilder();  
        /**  
         * Trace Segment  
         */  
        traceSegmentBuilder.setTraceSegmentId(this.traceSegmentId.transform());  
        // Don't serialize TraceSegmentReference  
​  
        // SpanObject  
        for (AbstractTracingSpan span : this.spans) {  
            traceSegmentBuilder.addSpans(span.transform());  
        }  
        traceSegmentBuilder.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID);  
        traceSegmentBuilder.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);  
        traceSegmentBuilder.setIsSizeLimited(this.isSizeLimited);  
​  
        upstreamBuilder.setSegment(traceSegmentBuilder.build().toByteString());  
        return upstreamBuilder.build();  
    }  
​  
    @Override  
    public String toString() {  
        return "TraceSegment{" +  
            "traceSegmentId='" + traceSegmentId + '\'' +  
            ", refs=" + refs +  
            ", spans=" + spans +  
            ", relatedGlobalTraces=" + relatedGlobalTraces +  
            '}';  
    }  
​  
    public int getApplicationInstanceId() {  
        return RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID;  
    }  
​  
    public long createTime() {  
        return this.createTime;  
    }  
}
  • Traceinstance defines traceSegmentId, refs, spans, relatedGlobalTraces and other attributes; it provides methods such as ref, relatedGlobalTraces, archive, finish, transform, etc

IConsumer

skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IConsumer.java

public interface IConsumer<T> {  
    void init();  
​  
    void consume(List<T> data);  
​  
    void onError(List<T> data, Throwable t);  
​  
    void onExit();  
}
  • IConsumer defines init, consumer, onError, onExit methods

TraceSegmentServiceClient

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java

@DefaultImplementor  
public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, GRPCChannelListener {  
    private static final ILog logger = LogManager.getLogger(TraceSegmentServiceClient.class);  
    private static final int TIMEOUT = 30 * 1000;  
​  
    private long lastLogTime;  
    private long segmentUplinkedCounter;  
    private long segmentAbandonedCounter;  
    private volatile DataCarrier<TraceSegment> carrier;  
    private volatile TraceSegmentReportServiceGrpc.TraceSegmentReportServiceStub serviceStub;  
    private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;  
​  
    @Override  
    public void prepare() throws Throwable {  
        ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);  
    }  
​  
    @Override  
    public void boot() throws Throwable {  
        lastLogTime = System.currentTimeMillis();  
        segmentUplinkedCounter = 0;  
        segmentAbandonedCounter = 0;  
        carrier = new DataCarrier<TraceSegment>(CHANNEL_SIZE, BUFFER_SIZE);  
        carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);  
        carrier.consume(this, 1);  
    }  
​  
    @Override  
    public void onComplete() throws Throwable {  
        TracingContext.ListenerManager.add(this);  
    }  
​  
    @Override  
    public void shutdown() throws Throwable {  
        TracingContext.ListenerManager.remove(this);  
        carrier.shutdownConsumers();  
    }  
​  
    @Override  
    public void init() {  
​  
    }  
​  
    @Override  
    public void consume(List<TraceSegment> data) {  
        if (CONNECTED.equals(status)) {  
            final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);  
            StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() {  
                @Override  
                public void onNext(Commands commands) {  
                    ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);  
                }  
​  
                @Override  
                public void onError(Throwable throwable) {  
                    status.finished();  
                    if (logger.isErrorEnable()) {  
                        logger.error(throwable, "Send UpstreamSegment to collector fail with a grpc internal exception.");  
                    }  
                    ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);  
                }  
​  
                @Override  
                public void onCompleted() {  
                    status.finished();  
                }  
            });  
​  
            try {  
                for (TraceSegment segment : data) {  
                    UpstreamSegment upstreamSegment = segment.transform();  
                    upstreamSegmentStreamObserver.onNext(upstreamSegment);  
                }  
            } catch (Throwable t) {  
                logger.error(t, "Transform and send UpstreamSegment to collector fail.");  
            }  
​  
            upstreamSegmentStreamObserver.onCompleted();  
​  
            status.wait4Finish();  
            segmentUplinkedCounter += data.size();  
        } else {  
            segmentAbandonedCounter += data.size();  
        }  
​  
        printUplinkStatus();  
    }  
​  
    private void printUplinkStatus() {  
        long currentTimeMillis = System.currentTimeMillis();  
        if (currentTimeMillis - lastLogTime > 30 * 1000) {  
            lastLogTime = currentTimeMillis;  
            if (segmentUplinkedCounter > 0) {  
                logger.debug("{} trace segments have been sent to collector.", segmentUplinkedCounter);  
                segmentUplinkedCounter = 0;  
            }  
            if (segmentAbandonedCounter > 0) {  
                logger.debug("{} trace segments have been abandoned, cause by no available channel.", segmentAbandonedCounter);  
                segmentAbandonedCounter = 0;  
            }  
        }  
    }  
​  
    @Override  
    public void onError(List<TraceSegment> data, Throwable t) {  
        logger.error(t, "Try to send {} trace segments to collector, with unexpected exception.", data.size());  
    }  
​  
    @Override  
    public void onExit() {  
​  
    }  
​  
    @Override  
    public void afterFinished(TraceSegment traceSegment) {  
        if (traceSegment.isIgnore()) {  
            return;  
        }  
        if (!carrier.produce(traceSegment)) {  
            if (logger.isDebugEnable()) {  
                logger.debug("One trace segment has been abandoned, cause by buffer is full.");  
            }  
        }  
    }  
​  
    @Override  
    public void statusChanged(GRPCChannelStatus status) {  
        if (CONNECTED.equals(status)) {  
            Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();  
            serviceStub = TraceSegmentReportServiceGrpc.newStub(channel);  
        }  
        this.status = status;  
    }  
}
  • TraceSegmentServiceClient implements BootService, IConsumer, TracingContextListener and GRPCChannelListener interfaces; its prepare method registers its own channelListener with GRPCChannelManager; its boot method sets lastLogTime, instantiates DataCarrier and sets its consumer as itself; its onComplete method executes TracingContext.ListenerManager.add(this); its shutdown Method n executes traingcontext. Listenermanager. Remove (this) and carrier.shutdownConsumers(); its consume method executes upstreamSegmentStreamObserver.onNext(upstreamSegment), upstreamSegmentStreamObserver.onCompleted() and status.wait4Finish(); its afterFinished method executes carrier. Process (tracesegment); its statusChanged Set serviceStub and status

ConsumerThread

skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java

public class ConsumerThread<T> extends Thread {  
    private volatile boolean running;  
    private IConsumer<T> consumer;  
    private List<DataSource> dataSources;  
    private long consumeCycle;  
​  
    ConsumerThread(String threadName, IConsumer<T> consumer, long consumeCycle) {  
        super(threadName);  
        this.consumer = consumer;  
        running = false;  
        dataSources = new ArrayList<DataSource>(1);  
        this.consumeCycle = consumeCycle;  
    }  
​  
    /**  
     * add whole buffer to consume  
     *  
     * @param sourceBuffer  
     */  
    void addDataSource(QueueBuffer<T> sourceBuffer) {  
        this.dataSources.add(new DataSource(sourceBuffer));  
    }  
​  
    @Override  
    public void run() {  
        running = true;  
​  
        final List<T> consumeList = new ArrayList<T>(1500);  
        while (running) {  
            if (!consume(consumeList)) {  
                try {  
                    Thread.sleep(consumeCycle);  
                } catch (InterruptedException e) {  
                }  
            }  
        }  
​  
        // consumer thread is going to stop  
        // consume the last time  
        consume(consumeList);  
​  
        consumer.onExit();  
    }  
​  
    private boolean consume(List<T> consumeList) {  
        for (DataSource dataSource : dataSources) {  
            dataSource.obtain(consumeList);  
        }  
​  
        if (!consumeList.isEmpty()) {  
            try {  
                consumer.consume(consumeList);  
            } catch (Throwable t) {  
                consumer.onError(consumeList, t);  
            } finally {  
                consumeList.clear();  
            }  
            return true;  
        }  
        return false;  
    }  
​  
    void shutdown() {  
        running = false;  
    }  
​  
    /**  
     * DataSource is a refer to {@link Buffer}.  
     */  
    class DataSource {  
        private QueueBuffer<T> sourceBuffer;  
​  
        DataSource(QueueBuffer<T> sourceBuffer) {  
            this.sourceBuffer = sourceBuffer;  
        }  
​  
        void obtain(List<T> consumeList) {  
            sourceBuffer.obtain(consumeList);  
        }  
    }  
}
  • ConsumerThread inherits Thread, and its run method will execute consumer (consumeList) circularly. When it jumps out of the cycle, it will execute consumer (consumeList) again, and finally consumer.onExit(); the consumer method will traverse dataSources, execute its datasource. Detail (consumeList), and then execute consumer. Consumer (consumeList) method when consumeList is not empty

ConsumeDriver

skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java

public class ConsumeDriver<T> implements IDriver {  
    private boolean running;  
    private ConsumerThread[] consumerThreads;  
    private Channels<T> channels;  
    private ReentrantLock lock;  
​  
    public ConsumeDriver(String name, Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num,  
        long consumeCycle) {  
        this(channels, num);  
        for (int i = 0; i < num; i++) {  
            consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass), consumeCycle);  
            consumerThreads[i].setDaemon(true);  
        }  
    }  
​  
    public ConsumeDriver(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {  
        this(channels, num);  
        prototype.init();  
        for (int i = 0; i < num; i++) {  
            consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", prototype, consumeCycle);  
            consumerThreads[i].setDaemon(true);  
        }  
​  
    }  
​  
    private ConsumeDriver(Channels<T> channels, int num) {  
        running = false;  
        this.channels = channels;  
        consumerThreads = new ConsumerThread[num];  
        lock = new ReentrantLock();  
    }  
​  
    private IConsumer<T> getNewConsumerInstance(Class<? extends IConsumer<T>> consumerClass) {  
        try {  
            IConsumer<T> inst = consumerClass.newInstance();  
            inst.init();  
            return inst;  
        } catch (InstantiationException e) {  
            throw new ConsumerCannotBeCreatedException(e);  
        } catch (IllegalAccessException e) {  
            throw new ConsumerCannotBeCreatedException(e);  
        }  
    }  
​  
    @Override  
    public void begin(Channels channels) {  
        if (running) {  
            return;  
        }  
        try {  
            lock.lock();  
            this.allocateBuffer2Thread();  
            for (ConsumerThread consumerThread : consumerThreads) {  
                consumerThread.start();  
            }  
            running = true;  
        } finally {  
            lock.unlock();  
        }  
    }  
​  
    @Override  
    public boolean isRunning(Channels channels) {  
        return running;  
    }  
​  
    private void allocateBuffer2Thread() {  
        int channelSize = this.channels.getChannelSize();  
        /**  
         * if consumerThreads.length < channelSize  
         * each consumer will process several channels.  
         *  
         * if consumerThreads.length == channelSize  
         * each consumer will process one channel.  
         *  
         * if consumerThreads.length > channelSize  
         * there will be some threads do nothing.  
         */  
        for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {  
            int consumerIndex = channelIndex % consumerThreads.length;  
            consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));  
        }  
​  
    }  
​  
    @Override  
    public void close(Channels channels) {  
        try {  
            lock.lock();  
            this.running = false;  
            for (ConsumerThread consumerThread : consumerThreads) {  
                consumerThread.shutdown();  
            }  
        } finally {  
            lock.unlock();  
        }  
    }  
}
  • The ConsumeDriver implements the IDriver interface, and its ConsumeDriver will create num consumerthreads; its begin method will execute allocateBuffer2Thread, add dataSource to each consumerthread, and then execute consumerThread.start(); its close method will execute consumerThread.shutdown()

Summary

TraceSegmentServiceClient implements BootService, IConsumer, TracingContextListener and GRPCChannelListener interfaces; its prepare method registers its own channelListener with GRPCChannelManager; its boot method sets lastLogTime, instantiates DataCarrier and sets its consumer as itself; its onComplete method executes TracingContext.ListenerManager.add(this); its shutdown Method n executes traingcontext. Listenermanager. Remove (this) and carrier.shutdownConsumers(); its consume method executes upstreamSegmentStreamObserver.onNext(upstreamSegment), upstreamSegmentStreamObserver.onCompleted() and status.wait4Finish(); its afterFinished method executes carrier. Process (tracesegment); its statusChanged Set serviceStub and status

doc

  • TraceSegmentServiceClient

Tags: Programming Java Apache

Posted on Tue, 24 Mar 2020 01:02:01 -0700 by spider_man