Source code analysis of Zipkin client link tracing

As we know, Zipkin can help us to collect the call connections among various systems in the distributed system, and in addition to the Servlet, it can also collect the call relations among MQ, thread pool, WebSocket, Feign, Hystrix, RxJava, WebFlux and other components. This article will analyze how Zipkin accomplishes these functions

Let's take the most commonly used Servlet accept request as an example

Many classes injected in spring-cloud-sleuth's spring.factories file contain a class: tracewebservletoautoconfiguration. As you can see, this is an auto assembly class customized for the Servlet environment

In this class, a Filter is created, which is a powerful tool to intercept web requests and complete the collection of Servlet request links

	@Bean
	@ConditionalOnMissingBean
	public TracingFilter tracingFilter(HttpTracing tracing) {
		return (TracingFilter) TracingFilter.create(tracing);
	}

Let's take a look at what this interceptor has done

public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
        HttpServletRequest httpRequest = (HttpServletRequest)request;
        HttpServletResponse httpResponse = this.servlet.httpResponse(response);
        TraceContext context = (TraceContext)request.getAttribute(TraceContext.class.getName());
        if (context != null) {
            Scope scope = this.currentTraceContext.maybeScope(context);

            try {
                chain.doFilter(request, response);
            } finally {
                scope.close();
            }

        } else {
            Span span = this.handler.handleReceive(this.extractor, httpRequest);
            request.setAttribute(SpanCustomizer.class.getName(), span.customizer());
            request.setAttribute(TraceContext.class.getName(), span.context());
            Throwable error = null;
            Scope scope = this.currentTraceContext.newScope(span.context());

            try {
                chain.doFilter(httpRequest, httpResponse);
            } catch (ServletException | RuntimeException | Error | IOException var19) {
                error = var19;
                throw var19;
            } finally {
                scope.close();
                if (this.servlet.isAsync(httpRequest)) {
                    this.servlet.handleAsync(this.handler, httpRequest, span);
                } else {
                    this.handler.handleSend(ADAPTER.adaptResponse(httpRequest, httpResponse), error, span);
                }

            }

        }
    }
Creation of Span

Step 1: try to get TraceContext from the request. TraceContext contains the link information of this request. If the request comes from the upstream system, this information will exist here.

Let's focus on the branches when there is no upstream system. At this time, the first step is to create a span. The concept of span and trace has been mentioned in the previous article, and will not be expanded here.

  public <C> Span handleReceive(TraceContext.Extractor<C> extractor, C carrier, Req request) {
    Span span = nextSpan(extractor.extract(carrier), request);
    span.kind(Span.Kind.SERVER);
    return handleStart(request, span);
  }
  Span nextSpan(TraceContextOrSamplingFlags extracted, Req request) {
    if (extracted.sampled() == null) { // Otherwise, try to make a new decision
      extracted = extracted.sampled(sampler.trySample(adapter, request));
    }
    return extracted.context() != null
        ? tracer.joinSpan(extracted.context())
        : tracer.nextSpan(extracted);
  }

The meaning of this three item expression is to see if span exists in the current environment. If it exists, add span to the current environment. Otherwise, continue to enter the logic of creating span

  public Span nextSpan(TraceContextOrSamplingFlags extracted) {
    TraceContext parent = extracted.context();
    if (extracted.samplingFlags() != null) {
      TraceContext implicitParent = currentTraceContext.get();
      if (implicitParent == null) {
        return toSpan(newContextBuilder(null, extracted.samplingFlags())
            .extra(extracted.extra()).build());
      }
      // fall through, with an implicit parent, not an extracted one
      parent = appendExtra(implicitParent, extracted.extra());
    }
    if (parent != null) {
      TraceContext.Builder builder;
      if (extracted.samplingFlags() != null) {
        builder = newContextBuilder(parent, extracted.samplingFlags());
      } else {
        builder = newContextBuilder(parent, sampler);
      }
      return toSpan(builder.build());
    }
    TraceIdContext traceIdContext = extracted.traceIdContext();
    if (extracted.traceIdContext() != null) {
      Boolean sampled = traceIdContext.sampled();
      if (sampled == null) sampled = sampler.isSampled(traceIdContext.traceId());
      return toSpan(TraceContext.newBuilder()
          .sampled(sampled)
          .debug(traceIdContext.debug())
          .traceIdHigh(traceIdContext.traceIdHigh()).traceId(traceIdContext.traceId())
          .spanId(nextId())
          .extra(extracted.extra()).build());
    }
    // TraceContextOrSamplingFlags is a union of 3 types, we've checked all three
    throw new AssertionError("should not reach here");
  }

First, I will try to get trace. Because it is the first request, trace does not exist at this time, so I will enter the toSpan method

 public Span toSpan(TraceContext context) {
    if (context == null) throw new NullPointerException("context == null");
    TraceContext decorated = propagationFactory.decorate(context);
    if (!noop.get() && Boolean.TRUE.equals(decorated.sampled())) {
      return RealSpan.create(decorated, recorder, errorParser);
    }
    return NoopSpan.create(decorated);
  }

In this case, if we don't specify it specifically, we will use RealSpan to create a span. The final implementation class of this span is autovalue [RealSpan

Then return the initial handleReceive method

  public <C> Span handleReceive(TraceContext.Extractor<C> extractor, C carrier, Req request) {
    Span span = nextSpan(extractor.extract(carrier), request);
    span.kind(Span.Kind.SERVER);
    return handleStart(request, span);
  }

span after creation, the kind will be set, which represents the service type. Here, the service type is set as the service end.

The next step is to start recording link information

 Span handleStart(Req request, Span span) {
    if (span.isNoop()) return span;
    Scope ws = currentTraceContext.maybeScope(span.context());
    try {
      parser.request(adapter, request, span.customizer());

      Endpoint.Builder remoteEndpoint = Endpoint.newBuilder();
      if (parseRemoteEndpoint(request, remoteEndpoint)) {
        span.remoteEndpoint(remoteEndpoint.build());
      }
    } finally {
      ws.close();
    }

    return span.start();
  }
  

Several messages are recorded during opening

  public <Req> void request(HttpAdapter<Req, ?> adapter, Req req, SpanCustomizer customizer) {
    customizer.name(spanName(adapter, req));
    String method = adapter.method(req);
    if (method != null) customizer.tag("http.method", method);
    String path = adapter.path(req);
    if (path != null) customizer.tag("http.path", path);
  }
  
  public Span start() {
      return start(clock.currentTimeMicroseconds());
  }
  synchronized MutableSpan start(long timestamp) {
    span.timestamp(this.timestamp = timestamp);
    return this;
  }

And then back to the Filter method mentioned at the beginning of the article

After span and trace are created, they are added to the request

Scope creation

Then there is the creation of a scope, which is closely related to the log component. In short, it will print traceId, parentId, spanId to every log line printed by the current system

public Scope newScope(@Nullable TraceContext currentSpan) {
		final String previousTraceId = MDC.get("traceId");
		final String previousParentId = MDC.get("parentId");
		final String previousSpanId = MDC.get("spanId");
		final String spanExportable = MDC.get("spanExportable");
		final String legacyPreviousTraceId = MDC.get(LEGACY_TRACE_ID_NAME);
		final String legacyPreviousParentId = MDC.get(LEGACY_PARENT_ID_NAME);
		final String legacyPreviousSpanId = MDC.get(LEGACY_SPAN_ID_NAME);
		final String legacySpanExportable = MDC.get(LEGACY_EXPORTABLE_NAME);

		if (currentSpan != null) {
			String traceIdString = currentSpan.traceIdString();
			MDC.put("traceId", traceIdString);
			MDC.put(LEGACY_TRACE_ID_NAME, traceIdString);
			String parentId = currentSpan.parentId() != null ?
					HexCodec.toLowerHex(currentSpan.parentId()) :
					null;
			replace("parentId", parentId);
			replace(LEGACY_PARENT_ID_NAME, parentId);
			String spanId = HexCodec.toLowerHex(currentSpan.spanId());
			MDC.put("spanId", spanId);
			MDC.put(LEGACY_SPAN_ID_NAME, spanId);
			String sampled = String.valueOf(currentSpan.sampled());
			MDC.put("spanExportable", sampled);
			MDC.put(LEGACY_EXPORTABLE_NAME, sampled);
			log("Starting scope for span: {}", currentSpan);
			if (currentSpan.parentId() != null) {
				if (log.isTraceEnabled()) {
					log.trace("With parent: {}", currentSpan.parentId());
				}
			}
		}
		else {
			MDC.remove("traceId");
			MDC.remove("parentId");
			MDC.remove("spanId");
			MDC.remove("spanExportable");
			MDC.remove(LEGACY_TRACE_ID_NAME);
			MDC.remove(LEGACY_PARENT_ID_NAME);
			MDC.remove(LEGACY_SPAN_ID_NAME);
			MDC.remove(LEGACY_EXPORTABLE_NAME);
		}

		Scope scope = this.delegate.newScope(currentSpan);

		class ThreadContextCurrentTraceContextScope implements Scope {
			@Override public void close() {
				log("Closing scope for span: {}", currentSpan);
				scope.close();
				replace("traceId", previousTraceId);
				replace("parentId", previousParentId);
				replace("spanId", previousSpanId);
				replace("spanExportable", spanExportable);
				replace(LEGACY_TRACE_ID_NAME, legacyPreviousTraceId);
				replace(LEGACY_PARENT_ID_NAME, legacyPreviousParentId);
				replace(LEGACY_SPAN_ID_NAME, legacyPreviousSpanId);
				replace(LEGACY_EXPORTABLE_NAME, legacySpanExportable);
			}
		}
		return new ThreadContextCurrentTraceContextScope();
	}
The delivery of Span

Next, when the rest of the execution chain is completed, the request is over. At the end of the request, span will be uploaded to Zipkin server

  public void handleSend(@Nullable Resp response, @Nullable Throwable error, Span span) {
    handleFinish(response, error, span);
  }
  
    void handleFinish(@Nullable Resp response, @Nullable Throwable error, Span span) {
    if (span.isNoop()) return;
    try {
      Scope ws = currentTraceContext.maybeScope(span.context());
      try {
        parser.response(adapter, response, error, span.customizer());
      } finally {
        ws.close(); // close the scope before finishing the span
      }
    } finally {
      finishInNullScope(span);
    }
  }

First record the corresponding information of this call in span

  public <Resp> void response(HttpAdapter<?, Resp> adapter, @Nullable Resp res,
      @Nullable Throwable error, SpanCustomizer customizer) {
    int statusCode = 0;
    if (res != null) {
      statusCode = adapter.statusCodeAsInt(res);
      String nameFromRoute = spanNameFromRoute(adapter, res, statusCode);
      if (nameFromRoute != null) customizer.name(nameFromRoute);
      String maybeStatus = maybeStatusAsString(statusCode, 299);
      if (maybeStatus != null) customizer.tag("http.status_code", maybeStatus);
    }
    error(statusCode, error, customizer);
  }

Then clear Scope

  void finishInNullScope(Span span) {
    Scope ws = currentTraceContext.maybeScope(null);
    try {
      span.finish();
    } finally {
      ws.close();
    }
  }

After that, I said span upload

  public void finish(TraceContext context) {
    MutableSpan span = spanMap.remove(context);
    if (span == null || noop.get()) return;
    synchronized (span) {
      span.finish(span.clock.currentTimeMicroseconds());
      reporter.report(span.toSpan());
    }
  }

The specific upload implementation is implemented by the implementation class of the Sender interface. By default, its implementation class is these three Screenshot 10.31.01 PM November 18, 2019

And a span content is like this Screen shot 9.45.27 PM November 13, 2019

RabbitMQ link tracking

After reading the implementation of spring MVC link tracking, I think it's very simple to look at other ways. Here we take RabbitMQ as an example:

First, look up the spring.factories file of spring cloud sleuth. You can see that the tracking configuration class of the message middleware is tracemessaging autoconfiguration

Look at this class about RabbitMQ

	@Configuration
	@ConditionalOnProperty(value = "spring.sleuth.messaging.rabbit.enabled", matchIfMissing = true)
	@ConditionalOnClass(RabbitTemplate.class)
	protected static class SleuthRabbitConfiguration {
		@Bean
		@ConditionalOnMissingBean
		SpringRabbitTracing springRabbitTracing(Tracing tracing,
				SleuthMessagingProperties properties) {
			return SpringRabbitTracing.newBuilder(tracing)
					.remoteServiceName(properties.getMessaging().getRabbit().getRemoteServiceName())
					.build();
		}

		@Bean
		@ConditionalOnMissingBean
		static SleuthRabbitBeanPostProcessor sleuthRabbitBeanPostProcessor(BeanFactory beanFactory) {
			return new SleuthRabbitBeanPostProcessor(beanFactory);
		}
	}

In fact, it can be roughly guessed that the sleuth rabbitbeanpostprocessor must have been used to make some modifications in the construction of RabbitTemplate, such as adding an interceptor, and then automatically adding headers and other things when using RabbitTemplate to send messages will complete the whole process

Tags: Spring RabbitMQ WebFlux REST

Posted on Tue, 26 Nov 2019 11:19:45 -0800 by gorgo666