Spring cloud advanced: message driven Spring Cloud Stream message partition

  my blog: Programmer Xiaoxiaosheng Welcome to blog!

Last chapter Spring cloud advanced: Spring Cloud Stream consumer group Among them, we realize the function that messages are only received by one consumer in the same group by configuring consumer groups. This chapter describes the functions of message partitioning.

Preface

  in Spring Cloud Stream, it's easy to connect a single application to message middleware, but more often, it's a multi instance application. In the actual application scenario, we need to consume and count the same type of message, such as the same user or the same type of log message, but the message is scattered to different places It's not easy to handle when the instance of. At this time, you can use message partition.

1, Instance Index and Instance Count

  when we deploy the same application in a cluster way, each instance can accept the number of instances of the same application and the index of its own instance in the cluster. Stream achieves this through the number of instances of spring.cloud.stream.instanceCount and the current instance index of spring.cloud.stream.instanceIndex. If the total number of instances instanceCount is 3, then the instanceIndex index starts from 0 to 1 and 2. The correct configuration of these two properties is very important to solve the partition behavior, which can be used to ensure the correct segmentation of messages among multiple instances.

2, No partition test

2.1 producer configuration

  send message entity class:

public class Message implements Serializable {
    
    private int id;
    private String body;
    public Message() {
    }
    public Message(int id, String body) {
        this.id = id;
        this.body = body;
    }

  create a channel interface to send messages

public interface SenderSource {
    String OUTPUT = "input";

    @Output(SenderSource.OUTPUT)
    MessageChannel output();

}

  test class to send message:

@RunWith(SpringRunner.class)
@SpringBootTest(classes=SenderApplication.class)
public class MessageTest {

    @Autowired
    SenderSource source;
    @Test
    public void sender() {
        Message message = new Message(1, "test send");
        for (int i = 0; i < 10; i++) {
            source.output().send(MessageBuilder.withPayload(message).build());
        }
    }
}

  applicaiton.yml configuration:

spring:
  cloud:
    stream:
      bindings:
         input:
            destination: topic
            binder: rabbit1
      binders:
         rabbit1:
             type: rabbit
             environment:
                spring:
                 rabbitmq:
                     host: localhost
                     port: 5672

2.2 consumer configuration

  create Message entity:

public class Message implements Serializable {
    private int id;
    private String body;
    public Message() {
    }
    public Message(int id, String body) {
        this.id = id;
        this.body = body;
    }

Accept message

@SpringBootApplication
@EnableBinding(Sink.class)
@EnableEurekaClient
public class ReceiverApplication {

    public static void main(String[] args) {
        SpringApplication.run(ReceiverApplication.class, args);
    }

    @Value("${server.port}")
    private String port;
    /**
     * Listen to rabbitmq messages, specific queues and topic s, and obtain them through the configuration information application
     *
     * @param msg
     */
    @StreamListener(Sink.INPUT)
    public void reader(Message msg) {
        System.out.println("receiver port:"+port +",message:"+ msg);
    }
}

   consumer application.xml configuration

spring:
  cloud:
    stream:
      bindings:
         input:
            destination: topic
            binder: rabbit1
            group: group1
      binders:
         rabbit1:
             type: rabbit
             environment:
                spring:
                 rabbitmq:
                     host: localhost
                     port: 5672
server:
  port: 8081

eureka:
    instance:
      hostname: eureka7001.com  #Instance name of eureka server
      instance-id: receiver1
    client:
      service-url:
         # url to interact with the registry
        defaultZone: http://eureka7001.com:7001/eureka/
        enabled: true

  you can modify the port number and eureka.instance-id to start multiple instances, as well as Eureka. Then we send the message through the message producer:

  we see that the same messages are scattered into different instances

  example 1:

Example two:

2, Add partition

2.1 first configure producers

#Specifies the expression of the partition key. payload indicates that after getting the message, the value of the partition is calculated by hash value

spring.cloud.stream.bindings.input.producer.partitionKeyExpression=payload

#Number of message partitions

spring.cloud.stream.bindings.input.producer.partitionCount=2

spring:
  cloud:
    stream:
      bindings:
         input:
            producer:
               partitionKeyExpression: payload
               partitionCount: 2
            destination: topic
            binder: rabbit1

      binders:
         rabbit1:
             type: rabbit
             environment:
                spring:
                 rabbitmq:
                     host: localhost
                     port: 5672

2.2 configure consumers:

#Indicates open partition

spring.cloud.stream.bindings.inputProduct.consumer.partitioned=true

#Specifies the total number of instances for the current consumer, the current instance is 2 spring.cloud.stream.instanceCount=2 #Set the index number of the current instance, starting from 0 spring.cloud.stream.instanceIndex=0

  different instance instanceIndex is configured according to the situation

spring:
  cloud:
    stream:
      bindings:
         input:
           consumer:
               partitioned: true
            destination: topic
            binder: rabbit1
            group: group1
      binders:
         rabbit1:
             type: rabbit
             environment:
                spring:
                 rabbitmq:
                     host: localhost
                     port: 5672
     instance-count: 2
     instance-index: 0

  after the modification, restart all instances and send messages through the producer. We see that all messages are sent to one instance:

In this way, we implement the same type of message and send it to the same instance.

3, Custom partition policy

  in the above demonstration, we realize message partition through spring.cloud.stream.bindings.input.producer.partitionKeyExpression=payload. What do you mean? When we configure this property, the current instance payload represents the message information we defined. Spring Cloud Stream obtains this message by default, and uses the hash method to calculate the formula:

key.hashCode() % partitionCount

  figure out which partition the message is in, and with this in mind, we can do more. In Spring Cloud Stream, the implementation of org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy interface custom partition strategy is provided. We can simply implement the following. In the same message, we randomly partition, but make sure that the partition value is in the range of the number of instances. For example, if I have 2 consumer instances, the randomly generated partition value is 0 To 1:

  create the MyPartitionKeyStrategy class to implement the PartitionKeyExtractorStrategy interface:

import org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy;
import org.springframework.messaging.Message;

import java.util.Random;

public class MyPartitionKeyStrategy implements PartitionKeyExtractorStrategy {
    @Override
    public Object extractKey(Message<?> message) {

        com.microservice.stream.controller.Message sendMsg = (com.microservice.stream.controller.Message) message.getPayload();

        Random random =new Random();

        final int r = random.nextInt(2);

        System.out.println("r:" + r);

        return r;
    }
}

  inject the current partition policy by configuring the class method:

@Configuration
public class StrategyConf {
    @Bean
    MyPartitionKeyStrategy myPartitionKeyStrategy() {
        return new MyPartitionKeyStrategy();
    }

}

  modify the manufacturer's yml configuration, change partitionKeyExpression to the method name of the injection class:

spring:
  cloud:
    stream:
      bindings:
         input:
            producer:
            # Configuration injected method name
               partitionKeyExpression: myPartitionKeyStrategy
               partitionCount: 2
            destination: topic
            binder: rabbit1

      binders:
         rabbit1:
             type: rabbit
             environment:
                spring:
                 rabbitmq:
                     host: localhost
                     port: 5672

  then send the message through the producer, and the message will be randomly sent to different instances:

Example 1:

Example two:

In the actual enterprise environment, we can implement our own partition strategy according to our own needs.

summary

  this chapter has learned how to use the partition function in Spring Cloud Stream through non partition test, setting partition and custom partition strategy. In the actual development will inevitably encounter such a demand. It is also important to master and be able to use it.

----END----

This is the sharing of this issue. You can also pay attention to the official account: programmers laugh and laugh, and pay more attention to more exciting content.

Spring cloud basic tutorial (1) - microservices and spring cloud

Spring cloud basic tutorial (2) - Eureka service discovery

Spring cloud basic tutorial (3) - Eureka advanced

Spring cloud basic tutorial (4) - getting started with configuration center

Spring cloud basic tutorial (5) - Hot validation and high availability of configuration center

Spring cloud basic tutorial (6) - load balancing Ribbon

Spring cloud basic tutorial (7) - Feign declarative service call

Spring cloud basic tutorial (VIII) - Hystrix fuse (I)

Spring cloud basic tutorial (9) - Hystrix service monitoring (2)

Spring cloud basic tutorial (10) - Zull service gateway

Spring cloud basic tutorial (XI) - Introduction to Sleuth call chain tracing

Spring cloud basic course (12) - Construction of Zipkin distributed link tracking system

Spring cloud advanced: message driven (getting started) Spring Cloud Stream [Greenwich.SR3]

Spring cloud advanced: core components of Spring Cloud Stream

Spring cloud advanced: message driven Spring Cloud Stream consumer grouping

Spring cloud advanced: message driven Spring Cloud Stream message partition

More exciting content, please look forward to

This article is based on the platform of blog one article multiple sending OpenWrite Release!

My blog address Smile in Lanling , welcome to browse!

Tags: Spring RabbitMQ xml Java

Posted on Sat, 15 Feb 2020 06:17:30 -0800 by foolguitardude