Can Kafka asynchronous messages also block? Record a Dubbo Frequent Overtime Investigation Process

On-line service A calls service B interface to complete a transaction. After a production change in the evening, system monitoring finds that service B interface frequently timeouts, and then even returns Thread pool is EXHAUSTED, which is a thread pool depletion error. Because service B relies on external interfaces, it was initially mistaken for external interface delays, so the number of threads in service B dubbo thread pool was temporarily increased. After configuration changes, restart the service and the service returns to normal. After a period of time, Service B returns to the thread pool depletion error again. After this in-depth investigation, it was found that the asynchronous message sent by Kafka blocked the dubbo thread, resulting in a call timeout.

I. Problem Analysis

Dubbo 2.6.5´╝îKafak maven 0.8.0-beta1

Service A invokes service B and receives the following error:

2019-08-30 09:14:52,311 WARN method [%f [DUBBO] Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-xxxx, Pool Size: 1000 (active: 1000, core: 1000, max: 1000, largest: 1000), Task: 6491 (completed: 5491), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://xxxx!, dubbo version: 2.6.0, current host: 127.0.0.1

You can see that the current dubbo thread pool is fully loaded and can no longer accept new calls. Normally, dubbo threads can complete tasks quickly and then return them to the thread pool. Due to the blocking of tasks performed by threads, the consumer calls timed out. Since the existing threads are blocked on the service provider side, the thread pool must continuously create new threads to process tasks until the maximum number of threads is reached and the system returns to Thread pool is EXHAUSTED.

Thread tasks can be blocked for a long time due to:

  • Frequent fullgc results in system pause.
  • Some blocking API s are called, such as socket connections that do not set timeout to cause blocking.
  • System internal deadlock

By analyzing the dump situation of the system stack, it is found that all dubbo threads are in WATTING state.

The following figure shows the application stack dump log:

From the stack log, you can see that the dubbo thread finally blocked in LinkedBlockingQueue put, which happened in Kafka's messaging method.

Service B needs to use Kafka to send monitoring messages. In order to send messages without affecting the main business, service B uses Kafka to send messages asynchronously. Because the Kafka server has recently replaced the external port, the configuration of the service B Kafka has not been changed in time. Finally, Service B modifies the configuration and restarts the service, which solves the problem.

2. Kafka Asynchronous Model

The following is an analysis of the actual causes of Kafka's asynchronous message blocking.

0.8.0 Kafka sends messages in synchronous mode by default, and asynchronous sending requires setting the producer.type=async attribute. Synchronization mode requires waiting for Kafka to send messages to the message queue, which of course blocks the main thread. The biggest advantage of asynchronous mode is that there is no need to wait for Kafka to send.

It was thought that the asynchronization here was to run tasks using sub-threads, but the Kafka asynchronization mode was not. View Kafka's official documents producer You can see the description of asynchronous patterns.

Batching is one of the big drivers of efficiency, and to enable batching the Kafka producer has an asynchronous mode that accumulates data in memory and sends out larger batches in a single request. The batching can be configured to accumulate no more than a fixed number of messages and to wait no longer than some fixed latency bound (say 100 messages or 5 seconds). This allows the accumulation of more bytes to send, and few larger I/O operations on the servers. Since this buffering happens in the client it obviously reduces the durability as any data buffered in memory and not yet sent will be lost in the event of a producer crash.

As we can see, the Kafka asynchronous mode will pack multiple messages and send them to the server in batches. This mode will first put messages in the memory queue until a certain number of messages arrive (default is 200) or the waiting time is exceeded (default is 5000ms).

The biggest advantage of this is to increase the throughput of message sending and reduce network I/O. Of course, there are obvious disadvantages. If the producer goes down, the message may be lost if it is not sent in memory.

Next, we analyze the blocking process from the kafka source code.

3. Kafka Source Parsing

The Kafka message sender uses the following configuration:

        Properties props = new Properties();

        props.put("metadata.broker.list", "localhost:9092");
    // Select Asynchronous Sending
        props.put("producer.type", "async");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("queue.buffering.max.messages","1");
        props.put("batch.num.messages","1");
        Producer<Integer, String> producer= new Producer(new ProducerConfig(props));
        producer.send(new KeyedMessage("test", "hello world"));

Here, we set producer.type=async to make Kafka send messages asynchronously.

The source code of send method is as follows:

ps: This version of Kafka source code is written in Scala, but the source code is relatively simple and easy to read.

  def send(messages: KeyedMessage[K,V]*) {
    if (hasShutdown.get)
      throw new ProducerClosedException
    recordStats(messages)
    sync match {
      case true => eventHandler.handle(messages)
    // Because producer.type=async sends asynchronously
      case false => asyncSend(messages)
    }
  }

Since we set producer.type=async above, asyncSend asynchronous sending mode will be used here.

The asyncSend source code is as follows:

  private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {
    for (message <- messages) {
      val added = config.queueEnqueueTimeoutMs match {
        case 0  =>
          queue.offer(message)
        case _  =>
          try {
            config.queueEnqueueTimeoutMs < 0 match {
    
            case true =>
              queue.put(message)
              true
            case _ =>
              queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)
            }
          }
          catch {
            case e: InterruptedException =>
              false
          }
      }
      if(!added) {
        producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark()
        producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark()
        throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)
      }else {
        trace("Added to send queue an event: " + message.toString)
        trace("Remaining queue size: " + queue.remainingCapacity)
      }
    }
  }

asyncSend will add messages to the Linked Blocking Queue blocking queue. Different methods are used here according to the config.queueEnqueueTimeoutMs parameter.

When config.queueEnqueueTimeoutMs=0, LinkedBlockingQueue offer will be called, and if the queue is not full, the element will be inserted at the end of the queue. If the queue is not full, return false directly. So if the queue is full at this time, the message will no longer be added to the queue, and then asyncSend will throw a QueueFullException exception.

When config. queueEnqueueTimeoutMs < 0, LinkedBlockingQueue put is called to join the element, and if the queue is full, the method will be blocked until the queue has available space.

When config. queueEnqueueTimeoutMs > 0, LinkedBlockingQueue offer will be called. The difference here is that the timeout time is set. If the queue is full, it will block until the timeout is known.

The config.queueEnqueueTimeoutMs parameter takes effect through the queue.enqueue.timeout.ms configuration, defaulting to - 1. By default, the maximum number of LinkedBlockingQueue s is 10,000, which can be changed by setting queue.buffering.max.messages.

After the message is put into the queue, Kafka will use an asynchronous thread to continuously get the message from the queue and send the message in batches.

The asynchronous processing message code is as follows:

  private def processEvents() {
    var lastSend = SystemTime.milliseconds
    var events = new ArrayBuffer[KeyedMessage[K,V]]
    var full: Boolean = false

    // drain the queue until you get a shutdown command
    Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS))
                      .takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {
      currentQueueItem =>
        val elapsed = (SystemTime.milliseconds - lastSend)
        // check if the queue time is reached. This happens when the poll method above returns after a timeout and
        // returns a null object
        val expired = currentQueueItem == null
        if(currentQueueItem != null) {
          trace("Dequeued item for topic %s, partition key: %s, data: %s"
              .format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message))
          events += currentQueueItem
        }

        // check if the batch size is reached
        full = events.size >= batchSize

        if(full || expired) {
          if(expired)
            debug(elapsed + " ms elapsed. Queue time reached. Sending..")
          if(full)
            debug("Batch full. Sending..")
          // if either queue time has reached or batch size has reached, dispatch to event handler
          tryToHandle(events)
          lastSend = SystemTime.milliseconds
          events = new ArrayBuffer[KeyedMessage[K,V]]
        }
    }
    // send the last batch of events
    tryToHandle(events)
    if(queue.size > 0)
      throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue"
        .format(queue.size))
  }

Here, asynchronous threads will continue to get tasks from the queue, and once conditions are met, they will send tasks in batches. This provision is:

  1. The number of batch messages reaches 200, and batch.num.messages parameters can be set to change the configuration.
  2. The maximum waiting time is 5000ms by default. queue.buffering.max.ms can be set to change the configuration.

IV. Problem Solutions

Although the above problem is solved by replacing the correct address of Kafka, in order to prevent the problem from recurring next time, the following solutions can be adopted:

  1. Change the default configuration of config.queueEnqueueTimeoutMs, such as system monitoring logs that allow loss, so config.queueEnqueueTimeoutMs=0 can be set.
  2. Upgrade the Kafka version. The latest version of Kafka uses Java to rewrite the sender logic and no longer uses blocking queues to store messages.

This article is first published in: studyidea.cn/kafka...

Welcome to pay attention to my public number: procedures, get daily dry goods push. If you are interested in my topic, you can also pay attention to my blog: studyidea.cn

Tags: Java kafka Dubbo Scala Maven

Posted on Tue, 08 Oct 2019 16:08:59 -0700 by jestercobblepot