Stream message driven of Spring Cloud series

In the actual development process, message oriented middleware is often used to communicate between services. Message oriented middleware solves the problems of application decoupling, asynchronous processing, flow cutting, etc., and realizes high performance, high availability, scalability and final consistency architecture.

The internal implementation of different middleware is not the same. The differences of these middleware cause us some problems in the actual project development. For example, the project middleware is Kafka. If we want to replace it with RabbitMQ, it is undoubtedly a disastrous work. A lot of things have to be redone, because it is highly coupled with our system. At this time, we can use Spring Cloud Stream to integrate our message middleware and reduce the coupling between the system and middleware.

  

Several application scenarios of message middleware

  

Application decoupling

  

Suppose that the company has several different systems. Each system has A linkage relationship in some businesses. For example, if system A completes some operations, it needs to trigger system B and system C, but there is coupling between each system. For this scenario, message middleware can be used to complete the decoupling. When system A completes the operation, the data will be put into the message queue, and system B and C can subscribe to the message, so that each system can only agree on the format of the message.

  

Traditional mode:

Middleware mode:

  

Asynchronous processing

  

For example, when users place an order on an e-commerce website, they will push SMS or email to users after the order is completed. The process of sending SMS and email can be completed asynchronously. Because order payment is the core business, e-mail and SMS are not core functions, and may take a long time, so for this business scenario, you can choose to put them in the message queue first, and other services will process them asynchronously.

  

Traditional mode:

Middleware mode:

  

Flow peak shaving

  

For example, there are a lot of requests coming in at a time in seckill activity. Some services may fail to withstand the high concurrency of the instant. In this scenario, add a message queue in the middle, put the request in the queue first, and then smoothly push the request in the queue to the service, or let the service pull it out of the queue.

  

Traditional mode:

Middleware mode:

  

Log processing

  

For small projects, we usually do not have so many requirements for log processing, but when the user volume and data volume reach a certain peak, problems will follow. For example:

  • How to store user logs
  • How to use user logs after storage
  • How to store a large number of logs without affecting the system

And so on many other problems, so we need to use the message queue to decouple the business and transfer the data better.

Kafka was originally created for processing logs.

  

summary

  

Message queuing is an important component in a distributed system. Its general usage scenario can be simply described as: * * when it is not necessary to get results immediately, but the concurrency needs to be controlled, it is almost the time to use message queuing. **In the project, some time-consuming operations without immediate return are extracted and asynchronous processing is carried out, which greatly saves the server's request response time and improves the system throughput.

When you encounter the above situations, you should consider using message queuing. If you happen to be using RabbitMQ or Kafka, and you are also using Spring Cloud, you can consider using Spring Cloud Stream.

  

What is Spring Cloud Stream

  

Spring Cloud Stream is a framework for building message driven microservice applications. The framework provides a flexible programming model, which is based on being familiar with spring idioms. It provides reasonable configuration of middleware from multiple vendors, including publish subscribe, message grouping and message partition processing support.

Spring Cloud Stream solves the problem of developers using message middleware without awareness. Because Stream further encapsulates message middleware, it can make the code layer without awareness of middleware, or even switch middleware dynamically, which makes the development of microservice highly decoupled, and services can pay more attention to their own business processes.

  

Core concepts

  

  

Form Explain
Middleware Middleware, supports RabbitMQ and Kafka.
Binder Target binder, which refers to Kafka or RabbitMQ. A binder is a package that encapsulates the target middleware. If Kafka is used, spring cloud stream binder Kafka is used; if RabbitMQ is used, spring cloud stream binder rabbit is used.
@Input The annotation identifies the input channel through which messages received (message consumers) will enter the application.
@Output Annotations identify the output channel through which messages published (message producers) leave the application.
@StreamListener Listen to the queue and receive messages from the queue of consumers.
@EnableBinding Annotation identification binding, binding channel and switch exchange together.

  

working principle

  

By defining the Binder as the middle layer, the isolation between the application and the details of message middleware is realized. By exposing a unified Channel channel to the application, the application does not need to consider the implementation of different message middleware. When we need to upgrade message middleware or replace other message middleware products, we need to replace the corresponding Binder binder without modifying any application logic.

  

  

There are several core concepts in the model diagram as follows:

  • Source: when we need to send messages, we need to use Source.java, which will serialize the messages we want to send (convert to JSON format string by default), and then send these data to the Channel;
  • Sink: when we need to listen to messages, we need to use Sink.java, which is responsible for obtaining messages from the message channel, de sequencing messages into message objects, and then handing them to specific message listening processing;
  • Channel: usually we need to specify the Topic and message queue name when sending or listening messages to the message middleware. Once we need to change the Topic, we need to modify the code of message sending or listening. Through the channel object, our business code only needs to correspond to the channel. The specific subject of this channel can be specified in the configuration file. In this way, when the subject changes, we do not need to make any changes to the code, so as to realize the decoupling with the specific message middleware;
  • Binder: different binders can integrate with different message middleware. Binder provides a unified message receiving and sending interface, so that we can deploy different message middleware according to actual needs, or adjust our configuration according to the message middleware deployed in actual production.

  

Environmental preparation

  

Stream demo aggregation project. SpringBoot 2.2.4.RELEASE,Spring Cloud Hoxton.SR1.

  • RabbitMQ: Message Queuing

  • Eureka server: Registry

  • eureka-server02: Registry

  

  

Entry case

  

Click on the link to view: Stream introduction case video (for more, please pay attention to the official account, Mr. Ward).

  

Message producer

  

Create project

  

Create the stream producer subproject under the stream demo project.

  

Add dependency

  

To use the RabbitMQ binder, you can add it to the Spring Cloud Stream application by using the following Maven coordinates:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

Or use Spring Cloud Stream RabbitMQ Starter:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

  

The complete dependence is as follows:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>stream-producer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!-- Inherit parent dependency -->
    <parent>
        <groupId>com.example</groupId>
        <artifactId>stream-demo</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <!-- Project dependency -->
    <dependencies>
        <!-- netflix eureka client rely on -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <!-- spring cloud stream binder rabbit Binder dependency -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>

        <!-- spring boot test rely on -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

</project>

  

configuration file

  

Configure the RabbitMQ message queue and the channel for sending and receiving Stream messages.

server:
  port: 8001 # port

spring:
  application:
    name: stream-producer # apply name
  rabbitmq:
    host: 192.168.10.101  # Server IP
    port: 5672            # Server port
    username: guest       # user name
    password: guest       # Password
    virtual-host: /       # Virtual host address
  cloud:
    stream:
      bindings:
        # Message sending channel
        # Same value as @ Output("output") annotation in org.springframework.cloud.stream.messaging.Source
        output:
          destination: stream.message # Bound switch name

# Configure Eureka Server registry
eureka:
  instance:
    prefer-ip-address: true       # Use ip address to register
    instance-id: ${spring.cloud.client.ip-address}:${server.port} # ip:port
  client:
    service-url:                  # Set service registry address
      defaultZone: http://localhost:8761/eureka/,http://localhost:8762/eureka/

  

send message

  

  MessageProducer.java

package com.example.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * Message producer
 */
@Component
@EnableBinding(Source.class)
public class MessageProducer {

    @Autowired
    private Source source;

    /**
     * send message
     *
     * @param message
     */
    public void send(String message) {
        source.output().send(MessageBuilder.withPayload(message).build());
    }

}

  

Startup class

  

  StreamProducerApplication.java

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class StreamProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(StreamProducerApplication.class);
    }

}

  

Message consumer

  

Create project

  

Create the stream consumer subproject under the stream demo project.

  

Add dependency

  

To use the RabbitMQ binder, you can add it to the Spring Cloud Stream application by using the following Maven coordinates:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

Or use Spring Cloud Stream RabbitMQ Starter:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

  

The complete dependence is as follows:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>stream-consumer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!-- Inherit parent dependency -->
    <parent>
        <groupId>com.example</groupId>
        <artifactId>stream-demo</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <!-- Project dependency -->
    <dependencies>
        <!-- netflix eureka client rely on -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <!-- spring cloud stream binder rabbit Binder dependency -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
        
        <!-- spring boot test rely on -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

</project>

  

configuration file

  

Configure the RabbitMQ message queue and the channel for sending and receiving Stream messages.

server:
  port: 8002 # port

spring:
  application:
    name: stream-consumer # apply name
  rabbitmq:
    host: 192.168.10.101  # Server IP
    port: 5672            # Server port
    username: guest       # user name
    password: guest       # Password
    virtual-host: /       # Virtual host address
  cloud:
    stream:
      bindings:
        # Message receiving channel
        # Same value as @ Input("input") annotation in org.springframework.cloud.stream.messaging.Sink
        input:
          destination: stream.message # Bound switch name

# Configure Eureka Server registry
eureka:
  instance:
    prefer-ip-address: true       # Use ip address to register
    instance-id: ${spring.cloud.client.ip-address}:${server.port} # ip:port
  client:
    service-url:                  # Set service registry address
      defaultZone: http://localhost:8761/eureka/,http://localhost:8762/eureka/

  

receive messages

  

  MessageConsumer.java

package com.example.consumer;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;

/**
 * Message consumer
 */
@Component
@EnableBinding(Sink.class)
public class MessageConsumer {

    /**
     * receive messages
     *
     * @param message
     */
    @StreamListener(Sink.INPUT)
    public void receive(String message) {
        System.out.println("message = " + message);
    }

}

  

Startup class

  

  StreamConsumerApplication.java

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class StreamConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(StreamConsumerApplication.class);
    }

}

  

test

  

unit testing

  

  MessageProducerTest.java

package com.example;

import com.example.producer.MessageProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest(classes = {StreamProducerApplication.class})
public class MessageProducerTest {

    @Autowired
    private MessageProducer messageProducer;

    @Test
    public void testSend() {
        messageProducer.send("hello spring cloud stream");
    }

}

  

Visit

  

Start the message consumer and run the unit test. The message consumer console prints the following results:

message = hello spring cloud stream

RabbitMQ interface is as follows:

  

Custom message channel

  

Create message channel

  

Refer to source code Source.java and Sink.java to create a custom message channel.

Custom message sending channel MySource.java

package com.example.channel;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * Custom message sending channel
 */
public interface MySource {

    String MY_OUTPUT = "my_output";

    @Output(MY_OUTPUT)
    MessageChannel myOutput();

}

Custom message receiving channel MySink.java

package com.example.channel;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

/**
 * Custom message receiving channel
 */
public interface MySink {

    String MY_INPUT = "my_input";

    @Input(MY_INPUT)
    SubscribableChannel myInput();

}

  

configuration file

  

Message producer.

server:
  port: 8001 # port

spring:
  application:
    name: stream-producer # apply name
  rabbitmq:
    host: 192.168.10.101  # Server IP
    port: 5672            # Server port
    username: guest       # user name
    password: guest       # Password
    virtual-host: /       # Virtual host address
  cloud:
    stream:
      bindings:
        # Message sending channel
        # Same value as @ Output("output") annotation in org.springframework.cloud.stream.messaging.Source
        output:
          destination: stream.message # Bound switch name
        my_output:
          destination: my.message # Bound switch name

  

Message consumer.

server:
  port: 8002 # port

spring:
  application:
    name: stream-consumer # apply name
  rabbitmq:
    host: 192.168.10.101  # Server IP
    port: 5672            # Server port
    username: guest       # user name
    password: guest       # Password
    virtual-host: /       # Virtual host address
  cloud:
    stream:
      bindings:
        # Message receiving channel
        # Same value as @ Input("input") annotation in org.springframework.cloud.stream.messaging.Sink
        input:
          destination: stream.message # Bound switch name
        my_input:
          destination: my.message # Bound switch name

  

code refactoring

  

Message producer MyMessageProducer.java.

package com.example.producer;

import com.example.channel.MySource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * Message producer
 */
@Component
@EnableBinding(MySource.class)
public class MyMessageProducer {

    @Autowired
    private MySource mySource;

    /**
     * send message
     *
     * @param message
     */
    public void send(String message) {
        mySource.myOutput().send(MessageBuilder.withPayload(message).build());
    }

}

  

Message consumer MyMessageConsumer.java.

package com.example.consumer;

import com.example.channel.MySink;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;

/**
 * Message consumer
 */
@Component
@EnableBinding(MySink.class)
public class MyMessageConsumer {

    /**
     * receive messages
     *
     * @param message
     */
    @StreamListener(MySink.MY_INPUT)
    public void receive(String message) {
        System.out.println("message = " + message);
    }

}

  

test

  

unit testing

  

  MessageProducerTest.java

package com.example;

import com.example.producer.MyMessageProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest(classes = {StreamProducerApplication.class})
public class MessageProducerTest {

    @Autowired
    private MyMessageProducer myMessageProducer;

    @Test
    public void testMySend() {
        myMessageProducer.send("hello spring cloud stream");
    }

}

  

Visit

  

Start the message consumer and run the unit test. The message consumer console prints the following results:

message = hello spring cloud stream

RabbitMQ interface is as follows:

  

Configuration optimization

  

The reason why Spring Cloud microservice development is simple is that, in addition to the official complete encapsulation, there is also an advantage that convention is greater than configuration. Developers only need to specify the non-conforming parts of the application, and adopt the default configuration where there is no specified configuration, so as to strive for the simplest configuration as the core idea.

The simple understanding is that Spring follows the idea of recommending default configuration. When there are special requirements, the user-defined configuration does not need to be configured.

  

In Spring Cloud Stream, the value of @ Output("output") and @ Input("input") annotations is the bound switch name by default. So we can reconstruct the case of custom message channel as follows.

  

Create message channel

  

Refer to source code Source.java and Sink.java to create a custom message channel.

Custom message sending channel MySource02.java

package com.example.channel;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * Custom message sending channel
 */
public interface MySource02 {

    String MY_OUTPUT = "default.message";

    @Output(MY_OUTPUT)
    MessageChannel myOutput();

}

Custom message receiving channel MySink02.java

package com.example.channel;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

/**
 * Custom message receiving channel
 */
public interface MySink02 {

    String MY_INPUT = "default.message";

    @Input(MY_INPUT)
    SubscribableChannel myInput();

}

  

configuration file

  

Message producer.

server:
  port: 8001 # port

spring:
  application:
    name: stream-producer # apply name
  rabbitmq:
    host: 192.168.10.101  # Server IP
    port: 5672            # Server port
    username: guest       # user name
    password: guest       # Password
    virtual-host: /       # Virtual host address

  

Message consumer.

server:
  port: 8002 # port

spring:
  application:
    name: stream-consumer # apply name
  rabbitmq:
    host: 192.168.10.101  # Server IP
    port: 5672            # Server port
    username: guest       # user name
    password: guest       # Password
    virtual-host: /       # Virtual host address

  

code refactoring

  

Message producer MyMessageProducer02.java.

package com.example.producer;

import com.example.channel.MySource02;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * Message producer
 */
@Component
@EnableBinding(MySource02.class)
public class MyMessageProducer02 {

    @Autowired
    private MySource02 mySource02;

    /**
     * send message
     *
     * @param message
     */
    public void send(String message) {
        mySource02.myOutput().send(MessageBuilder.withPayload(message).build());
    }

}

  

Message consumer MyMessageConsumer02.java.

package com.example.consumer;

import com.example.channel.MySink02;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;

/**
 * Message consumer
 */
@Component
@EnableBinding(MySink02.class)
public class MyMessageConsumer02 {

    /**
     * receive messages
     *
     * @param message
     */
    @StreamListener(MySink02.MY_INPUT)
    public void receive(String message) {
        System.out.println("message = " + message);
    }

}

  

test

  

unit testing

  

  MessageProducerTest.java

package com.example;

import com.example.producer.MyMessageProducer02;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest(classes = {StreamProducerApplication.class})
public class MessageProducerTest {

    @Autowired
    private MyMessageProducer02 myMessageProducer02;

    @Test
    public void testMySend02() {
        myMessageProducer02.send("Convention is greater than configuration");
    }

}

  

Visit

  

Start the message consumer and run the unit test. The message consumer console prints the following results:

message = convention is greater than configuration

RabbitMQ interface is as follows:

  

SMS email sending case

  

A message driven microservice application can be both a message producer and a message consumer. Next, we simulate a message processing process of SMS email:

  • Send the original message to the source.message switch;
  • The message driven microservice application receives the original message through the source.message switch, and sends it to sms.message and email.message switches after processing;
  • Message driven microservice applications receive processed messages and send SMS and email through sms.message and email.message switches.

  

Create message channel

  

Sending original messages, receiving processed messages and sending SMS and email messages drive the microservice application.

package com.example.channel;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

/**
 * Custom message channel
 */
public interface MyProcessor {

    String SOURCE_MESSAGE = "source.message";
    String SMS_MESSAGE = "sms.message";
    String EMAIL_MESSAGE = "email.message";

    @Output(SOURCE_MESSAGE)
    MessageChannel sourceOutput();

    @Input(SMS_MESSAGE)
    SubscribableChannel smsInput();

    @Input(EMAIL_MESSAGE)
    SubscribableChannel emailInput();

}

  

Receive the original message, and send the message of SMS and mailbox respectively after processing to drive the microservice application.

package com.example.channel;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

/**
 * Custom message channel
 */
public interface MyProcessor {

    String SOURCE_MESSAGE = "source.message";
    String SMS_MESSAGE = "sms.message";
    String EMAIL_MESSAGE = "email.message";

    @Input(SOURCE_MESSAGE)
    MessageChannel sourceOutput();

    @Output(SMS_MESSAGE)
    SubscribableChannel smsOutput();

    @Output(EMAIL_MESSAGE)
    SubscribableChannel emailOutput();

}

  

configuration file

  

The contract is greater than the configuration. The configuration file only modifies the port and application name. Other configurations are consistent.

spring:
  application:
    name: stream-producer # apply name
  rabbitmq:
    host: 192.168.10.101  # Server IP
    port: 5672            # Server port
    username: guest       # user name
    password: guest       # Password
    virtual-host: /       # Virtual host address

  

spring:
  application:
    name: stream-consumer # apply name
  rabbitmq:
    host: 192.168.10.101  # Server IP
    port: 5672            # Server port
    username: guest       # user name
    password: guest       # Password
    virtual-host: /       # Virtual host address

  

Message driven microservice A

  

send message

  

Send the original message 10086|10086@email.com to the source.message switch.

package com.example.producer;

import com.example.channel.MyProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * Message producer
 */
@Component
@EnableBinding(MyProcessor.class)
public class SourceMessageProducer {

    private Logger logger = LoggerFactory.getLogger(SourceMessageProducer.class);

    @Autowired
    private MyProcessor myProcessor;

    /**
     * Send original message
     *
     * @param sourceMessage
     */
    public void send(String sourceMessage) {
        logger.info("The original message was sent successfully. The original message is:{}", sourceMessage);
        myProcessor.sourceOutput().send(MessageBuilder.withPayload(sourceMessage).build());
    }

}

  

receive messages

  

Receive the processed message and send SMS and email.

package com.example.consumer;

import com.example.channel.MyProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;

/**
 * Message consumer
 */
@Component
@EnableBinding(MyProcessor.class)
public class SmsAndEmailMessageConsumer {

    private Logger logger = LoggerFactory.getLogger(SmsAndEmailMessageConsumer.class);

    /**
     * Receive message phone number
     *
     * @param phoneNum
     */
    @StreamListener(MyProcessor.SMS_MESSAGE)
    public void receiveSms(String phoneNum) {
        logger.info("The phone number is:{},Call SMS sending service, send SMS...", phoneNum);
    }

    /**
     * Email address of receiving message
     *
     * @param emailAddress
     */
    @StreamListener(MyProcessor.EMAIL_MESSAGE)
    public void receiveEmail(String emailAddress) {
        logger.info("The email address is:{},Call the mail sending service to send mail...", emailAddress);
    }

}

  

Message driven microservice B

  

receive messages

  

Receive the original message 10086|10086@email.com and send it to sms.message and email.message switches.

package com.example.consumer;

import com.example.channel.MyProcessor;
import com.example.producer.SmsAndEmailMessageProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;

/**
 * Message consumer
 */
@Component
@EnableBinding(MyProcessor.class)
public class SourceMessageConsumer {

    private Logger logger = LoggerFactory.getLogger(SourceMessageConsumer.class);

    @Autowired
    private SmsAndEmailMessageProducer smsAndEmailMessageProducer;

    /**
     * Receive the original message, process it and send it
     *
     * @param sourceMessage
     */
    @StreamListener(MyProcessor.SOURCE_MESSAGE)
    public void receive(String sourceMessage) {
        logger.info("The original message is received successfully. The original message is:{}", sourceMessage);
        // Send message phone number
        smsAndEmailMessageProducer.sendSms(sourceMessage.split("\\|")[0]);
        // Email address of sending message
        smsAndEmailMessageProducer.sendEmail(sourceMessage.split("\\|")[1]);
    }

}

  

send message

  

Send phone number 10086 and email address 10086@email.com to sms.message and email.message switches.

package com.example.producer;

import com.example.channel.MyProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * Message producer
 */
@Component
@EnableBinding(MyProcessor.class)
public class SmsAndEmailMessageProducer {

    private Logger logger = LoggerFactory.getLogger(SmsAndEmailMessageProducer.class);

    @Autowired
    private MyProcessor myProcessor;

    /**
     * Send message phone number
     *
     * @param smsMessage
     */
    public void sendSms(String smsMessage) {
        logger.info("The phone number message was sent successfully. The message is:{}", smsMessage);
        myProcessor.smsOutput().send(MessageBuilder.withPayload(smsMessage).build());
    }

    /**
     * Email address of sending message
     *
     * @param emailMessage
     */
    public void sendEmail(String emailMessage) {
        logger.info("The email address message was sent successfully. The message is:{}", emailMessage);
        myProcessor.emailOutput().send(MessageBuilder.withPayload(emailMessage).build());
    }

}

  

test

  

unit testing

  

  MessageProducerTest.java

package com.example;

import com.example.producer.SourceMessageProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest(classes = {StreamProducerApplication.class})
public class MessageProducerTest {

    @Autowired
    private SourceMessageProducer sourceMessageProducer;

    @Test
    public void testSendSource() {
        sourceMessageProducer.send("10086|10086@email.com");
    }

}

  

Visit

  

The print results of message driven microservice A console are as follows:

The phone number is 10086, call SMS sending service, send SMS
 Email address: 10086@email.com, call email sending service, send email

  

The print results of message driven microservice B console are as follows:

The original message was received successfully. The original message is: 10086|10086@email.com
 Phone number message sent successfully, message: 10086
 Email address message sent successfully, message: 10086@email.com

  

RabbitMQ interface is as follows:

Next, we will explain how Stream implements message grouping and message partitioning. Please pay attention to it

This paper adopts Intellectual sharing "signature - non-commercial use - no deduction 4.0 international" License Agreement.

You can go through classification See more about Spring Cloud The article.

  

Your comments and forwarding are the biggest support for me.

   A kind of Scan the code and follow Mr. halloward's "document + video". Each article is provided with a special video explanation, which makes learning easier~

Tags: Programming Spring RabbitMQ Java Maven

Posted on Wed, 29 Apr 2020 20:03:23 -0700 by schajee