Spring Cloud Stream [message grouping]

spring boot version: 2.1.10.RELEASE
spring cloud version: Greenwich.SR4

Last blog post Spring Cloud Stream I have a preliminary understanding of Spring Cloud Stream, and have written an example of getting started. Based on the previous article, this paper implements the grouping of messages.

Stream receiver project

(1) Add configuration

#Configure eureka registry
eureka.client.service-url.defaultZone=http://192.168.xxx.xxx:8761/eureka/,http://192.168.xxx.xxx:8762/eureka/
eureka.instance.prefer-ip-address=true

#rabbitmq configuration
spring.rabbitmq.host=192.168.xxx.xxx
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=/

# Corresponding MQ is exchange
# Where inputProduct is variable, IReceiveService@InputValue in annotation
# exchangeProduct is the name of the exchanger in rabbitmq, Sender and receiver should be the same
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
# The specific group corresponding to MQ is the queue name and persistent queue
spring.cloud.stream.bindings.inputProduct.group=groupProduct

(2) Receive class interface

package com.ebook.stream;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

/**
 * @author:JZ
 * @date:2020/2/2
 */
public interface IReceiveService {

    String INPUT = "inputProduct";

    @Input(INPUT)
    SubscribableChannel receive();

}

(3) Implementation class of receiving class interface

package com.ebook.stream;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;

/**
 * @author:JZ
 * @date:2020/2/2
 */

@Service
@EnableBinding({IReceiveService.class})
public class ReceiveService {
	
	//Product is a custom entity class
    @StreamListener(IReceiveService.INPUT)
    public void onReceive(Product product) {
        System.out.println("receive:" + product.toString());
    }

}

Stream sender project

(1) Add configuration

#Configure eureka registry
eureka.client.service-url.defaultZone=http://192.168.xxx.xxx:8761/eureka/,http://192.168.xxx.xxx:8762/eureka/
eureka.instance.prefer-ip-address=true

#rabbitmq configuration
spring.rabbitmq.host=192.168.xxx.xxx
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=/

# Corresponding MQ is exchange,
# Where outputProduct is variable, The corresponding value is ISendService In class@OutputValue in annotation
# exchangeProduct is the name of the exchanger in rabbitmq, Sender and receiver should be the same
spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct

(2) Send interface

package com.ebook.stream;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.SubscribableChannel;

/**
 * @author:JZ
 * @date:2020/2/2
 */
public interface ISendService {

    String OUTPUT = "outputProduct";

    @Output(OUTPUT)
    SubscribableChannel send();

}

(3) Add annotation to startup class

package com.ebook.stream;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;

/**
 * @author:JZo
 * @date:2020/1/11
 */

@SpringBootApplication
@EnableEurekaClient
@EnableBinding({ISendService.class})//Register as bean
public class StreamSenderApplication {

    public static void main(String[] args) {
        SpringApplication.run(StreamSenderApplication.class, args);
    }

}

(4) Run test class, send message

import com.ebook.stream.ISendService;
import com.ebook.stream.Product;
import com.ebook.stream.StreamGroupSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = StreamGroupSender.class)
public class StreamTest {

	@Autowired
	private ISendService send;
	
	@Test
	public void send() throws InterruptedException {
		//Product is a custom entity class
		Product product = new Product(1, "Spring Cloud");
		Message message = MessageBuilder.withPayload(product).build();
		this.send.send().send(message);
	}
}

summary

When the same service cluster configuration can be realized through the above configuration, only one service is sent.

279 original articles published, praised by 190 and visited 220000+
Private letter follow

Tags: Spring RabbitMQ Junit

Posted on Tue, 04 Feb 2020 07:19:51 -0800 by The_Walrus