In depth interpretation of KafkaListener annotation of spring Kafka

brief introduction

At present, Kafka is mainly used as a distributed publish subscribe message system, and it is also one of the most popular message queuing systems. Therefore, more and more frameworks integrate Kafka, such as spring Kafka, which will be mentioned in this article.

Since Kafka is a message publishing and subscription system, it includes message producers and message consumers. This paper focuses on the in-depth interpretation and use cases of kafkaListener annotation of spring Kafka framework.

unscramble

Source code interpretation

@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })

@Retention(RetentionPolicy.RUNTIME)

@MessageMapping

@Documented

@Repeatable(KafkaListeners.class)

public @interface KafkaListener {



   /**

    * The id of the consumer. When the GroupId is not configured, the default id is GroupId

    */

   String id() default "";



   /**

    * Monitor the container factory. When listening, you need to configure the containerFactory property to distinguish between single data and multi data consumption

    */

   String containerFactory() default "";



   /**

    * The Topic to be monitored can be multiple and mutually exclusive with the topicPattern attribute
*/

   String[] topics() default {};




   /**

    * Regular expression of the Topic to be monitored. Mutually exclusive with topics and topicpartitions
    */

   String topicPattern() default "";


   /**

    * You can configure more detailed listening information. You must listen to a specific partition in a Topic, or start listening from an offset of 200. You can configure this parameter to be mutually exclusive with the topicPattern property
    */

   TopicPartition[] topicPartitions() default {};



   /**

    *Listener container group 

    */

   String containerGroup() default "";



   /**

    * Listen to exception handler, configure BeanName

    */

   String errorHandler() default "";



   /**

    * Consumption group ID 

    */

   String groupId() default "";



   /**

    * id GroupId or not

    */

   boolean idIsGroup() default true;



   /**

    * Consumer Id prefix

    */

   String clientIdPrefix() default "";



   /**

    * The BeanName of the real listening container needs to be preceded by ","__ "

    */

   String beanRef() default "__listener";



}
View Code

Use cases

ConsumerRecord class consumption

Using the ConsumerRecord class to receive has certain advantages. The ConsumerRecord class contains partition information, message header, message body and other contents. If the business needs to obtain these parameters, using ConsumerRecord will be a good choice. It is more convenient to receive the message body with a specific type, such as String type.

Here we write a Listener method to listen to "topic1"Topic and print the content contained in ConsumerRecord to the console:

@Component

public class Listener {



    private static final Logger log = LoggerFactory.getLogger(Listener.class);



    @KafkaListener(id = "consumer", topics = "topic1")

    public void consumerListener(ConsumerRecord<Integer, String> record) {

        log.info("topic.quick.consumer receive : " + record.toString());

    }



}
View Code

Mass consumption

Mass consumption is very practical in real business scenarios. Because mass consumption can increase kafka consumption throughput and improve performance.

Steps to achieve mass consumption:

1. Recreate a new consumer configuration, configured to pull 10 messages at a time

2. Create a listening container factory named batchContainerFactory. Set it to batch consumption and set the concurrency to 5. The concurrency depends on the number of partitions. It must be less than or equal to the number of partitions. Otherwise, threads will be idle all the time.

3. Create a Topic with 8 partitions.

4. Create a listening method, set the consumption id to "batchConsumer", the clientID prefix to "batch", listen to "batch", and use the "batchContainerFactory" factory to create the listening container.

@Component

public class BatchListener {



    private static final Logger log= LoggerFactory.getLogger(BatchListener.class);



    private Map<String, Object> consumerProps() {

        Map<String, Object> props = new HashMap<>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");

        //Number of pull messages at a time

        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

                NumberDeserializers.IntegerDeserializer.class);

        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

                StringDeserializer.class);

        return props;

    }



    @Bean("batchContainerFactory")

    public ConcurrentKafkaListenerContainerFactory listenerContainer() {

        ConcurrentKafkaListenerContainerFactory container

                = new ConcurrentKafkaListenerContainerFactory();

        container.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));

        //Set concurrency, less than or equal to Topic Number of partitions for

        container.setConcurrency(5);

        //Must be set to listen in bulk

        container.setBatchListener(true);

        return container;

    }



    @Bean

    public NewTopic batchTopic() {

        return new NewTopic("topic.batch", 8, (short) 1);

    }



    @KafkaListener(id = "batchConsumer",clientIdPrefix = "batch"

            ,topics = {"topic.batch"},containerFactory = "batchContainerFactory")

    public void batchListener(List<String> data) {

        log.info("topic.batch  receive : ");

        for (String s : data) {

            log.info(  s);

        }

    }

}
View Code

 

Listen to the partition specified in the Topic

Use the topicPartitions property of the @ KafkaListener annotation to listen for different partition partitions.

@Topic Partition: topic -- the name of the topic to be monitored, partitions -- the partition id of the topic to be monitored.

partitionOffsets -- you can set to listen from an offset, @ PartitionOffset: partition -- partition Id, non array, initialOffset -- initial offset.

@Bean

public NewTopic batchWithPartitionTopic() {

    return new NewTopic("topic.batch.partition", 8, (short) 1);

}



@KafkaListener(id = "batchWithPartition",clientIdPrefix = "bwp",containerFactory = "batchContainerFactory",

        topicPartitions = {

                @TopicPartition(topic = "topic.batch.partition",partitions = {"1","3"}),

                @TopicPartition(topic = "topic.batch.partition",partitions = {"0","4"},

                        partitionOffsets = @PartitionOffset(partition = "2",initialOffset = "100"))

        }

)

public void batchListenerWithPartition(List<String> data) {

    log.info("topic.batch.partition  receive : ");

    for (String s : data) {

        log.info(s);

    }

}
View Code

Get message header and body by annotation

This can be used when the message you receive contains a request header and your listening method needs to get a lot of fields of the message.. The default listening container factory is used here. If you want to use batch consumption, you can change the corresponding type to List, such as List < string > data, List < integer > key.

@Payload: get the message body of the message, that is, send content

@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY): get the key to send the message

@Header(KafkaHeaders.RECEIVED_PARTITION_ID): get the partition from which the current message is monitored

@Header(KafkaHeaders.RECEIVED_TOPIC): obtain the monitored topic name

@Header(KafkaHeaders.RECEIVED_TIMESTAMP): get timestamp

@KafkaListener(id = "params", topics = "topic.params")

public void otherListener(@Payload String data,

                         @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,

                         @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,

                         @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,

                         @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {

    log.info("topic.params receive : \n"+

            "data : "+data+"\n"+

            "key : "+key+"\n"+

            "partitionId : "+partition+"\n"+

            "topic : "+topic+"\n"+

            "timestamp : "+ts+"\n"

    );

}
View Code

Use Ack mechanism to confirm consumption

Kafka consumes messages through the latest saved offset, and the messages confirmed to be consumed will not be deleted immediately, so we can repeatedly consume the undeleted data. When the first message is not confirmed and the second message is confirmed, Kafka will save the offset of the second message, that is to say, the first message will no longer be obtained by the listener It is not obtained manually according to the offset of the first message. Kafka's ack mechanism can effectively ensure that consumption is not lost. Because automatic submission is directly submitted after Kafka pulls the data, it is easy to lose data, especially when things need to be controlled.

Using Kafka's Ack mechanism is relatively simple. It only needs three simple steps:

  1. Set ENABLE_AUTO_COMMIT_CONFIG=false, disable auto commit
  2. Set AckMode=MANUAL_IMMEDIATE
  3. Add the acknowledgedgment ack parameter to the listening method

4. Use Consumer.seek Method, you can specify the location of an offset

@Component

public class AckListener {

    private static final Logger log = LoggerFactory.getLogger(AckListener.class);



    private Map<String, Object> consumerProps() {

        Map<String, Object> props = new HashMap<>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);

        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return props;

    }



    @Bean("ackContainerFactory")

    public ConcurrentKafkaListenerContainerFactory ackContainerFactory() {

        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();

        factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));

        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

        factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));

        return factory;

    }





    @KafkaListener(id = "ack", topics = "topic.ack", containerFactory = "ackContainerFactory")

    public void ackListener(ConsumerRecord record, Acknowledgment ack) {

        log.info("topic.quick.ack receive : " + record.value());

        ack.acknowledge();

    }

}
View Code

Solve repeated consumption

When ack is used to manually submit the offset in the previous section, if the consumer hangs the restart, it will start to consume again from the committed offset position, rather than the consumer offset position. This also means that it is possible to repeat consumption.

In the 0.9 client, there are three ack policies:

Strategy 1: automatic and periodic ack.

Strategy 2: consumer.commitSync(), call commitsync and manually synchronize ack. Commitsync once for each message processed.

Policy 3: consumer. commitASync(), manual asynchronous ack. ,

Then use policy 2 to send commitSync every time one message is submitted. Can we solve the problem of "repeated consumption"? The following code:

while (true) {

        List<ConsumerRecord> buffer = new ArrayList<>();

        ConsumerRecords<String, String> records = consumer.poll(100);

        for (ConsumerRecord<String, String> record : records) {

            buffer.add(record);

        }

        insertIntoDb(buffer);    //Eliminate processing, save to db

        consumer.commitSync();   //Send synchronously ack

        buffer.clear();

    }

}
View Code

The answer is no! Because the above insertIntoDb and commitSync cannot perform atomic operations: if the data processing is completed and commitSync is hung, the server restarts again, the message will still be consumed repeatedly.

So how to solve the problem of repeated consumption? The answer is to save the committed offset by yourself, rather than relying on kafka's cluster to save the committed offset. The message processing and saving offset are made into an atomic operation, and the message is added with a unique id for heavy judgment.

According to the official document, to save the offset, you need to:

  1. enable.auto.commit=false , disable automatic ack.
  2. Every time a message is fetched, the corresponding offset is saved.
  3. Next reboot, pass consumer.seek Function, locate the offset saved by yourself, and consume from there.
  4. Further processing can add a unique id to the message for duplicate judgment.

Tags: kafka Spring less Attribute

Posted on Sat, 30 May 2020 01:00:05 -0700 by foyer