RabbitMQ queue type

RabbitMQ is based on AMQP protocol. The default switch type is direct, direct direct forwarding, fanout broadcast mode, and topic with wildcard matching broadcast mode

  

direct

This mode of sending messages is the simplest and the default one of rabbitmq. It creates a queue to send messages to it directly and waits for the listener to consume

// Register a regular queue directly and send messages to this queue
@Component
public class SendMessage {

    @Bean
    public Queue getQueues() {
        return new Queue(RabbitConstant.HELLO_QUEUE);
    }

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send() {
        try {
            User user = new User();
            user.setAddress("address");
            user.setAge("1");
            user.setName("mq");
            amqpTemplate.convertAndSend(RabbitConstant.HELLO_QUEUE, JSON.toJSONString(user));
        } catch (Exception e) {
            e.getStackTrace();
        }
    }
}

This kind of queue is point-to-point sending and receiving. A message can only be consumed by one of all listeners

// Listen to this queue
@Component
@RabbitListener(queues = RabbitConstant.HELLO_QUEUE)
public class ReceiverMessageOne {

    @RabbitHandler
    public void receiverHello(String val) {

        System.out.println("one ReceiverMessage: " + val);
    }
}

It can be seen from the results that consumers receive messages in a regular order. Consumers poll for messages,

one ReceiverMessage: {"address":"address","age":"1","name":"mq"}
Thrid ReceiverMessage: {"address":"address","age":"1","name":"mq"}
two ReceiverMessage: {"address":"address","age":"1","name":"mq"}
one ReceiverMessage: {"address":"address","age":"1","name":"mq"}
Thrid ReceiverMessage: {"address":"address","age":"1","name":"mq"}
two ReceiverMessage: {"address":"address","age":"1","name":"mq"}
one ReceiverMessage: {"address":"address","age":"1","name":"mq"}
Thrid ReceiverMessage: {"address":"address","age":"1","name":"mq"}
two ReceiverMessage: {"address":"address","age":"1","name":"mq"}
one ReceiverMessage: {"address":"address","age":"1","name":"mq"}

topic

The message mode of topic type can use wildcards to set the rules for the switch to forward messages to the queue. The focus is on the binding between the switch and the queue. After configuration, the queue can accept messages from other similar queues

RabbitMqTopicConfiguration configure switches and queues
@Configuration
public class RabbitMqTopicConfiguration {

    /**
     * topic Switch registration
     */
    @Bean(name = "paySuccessTopicExchange")
    public TopicExchange paySuccessFanoutExchange() {
        return new TopicExchange(RabbitConstant.PAY_SUCCESS_TOPIC_EXCHANGE);
    }

    /**
     * topic Queue registration
     */
    @Bean(name = "paySuccessTopicQueue")
    public Queue paySuccessQueue() {
        return new Queue(RabbitConstant.PAY_SUCCESS_TOPIC_QUEUE);
    }

    /**
     * topic Queue registration
     */
    @Bean(name = "paySuccessTopicQueue2")
    public Queue paySuccessQueue2() {
        return new Queue(RabbitConstant.PAY_SUCCESS_TOPIC_QUEUE_TWO);
    }

    /**
     *
     */
    @Bean
    public Binding bingPaySuccessFanoutExchangeAndQueue(
            @Qualifier("paySuccessTopicExchange") TopicExchange paySuccessTopicExchange,
            @Qualifier("paySuccessTopicQueue") Queue paySuccessTopicQueue) {
        return BindingBuilder.bind(paySuccessTopicQueue).to(paySuccessTopicExchange).with("com.xin.pay.success.topic.#");
    }

    @Bean
    public Binding bingPaySuccessFanoutExchangeAndQueue2(
            @Qualifier("paySuccessTopicExchange") TopicExchange paySuccessTopicExchange,
            @Qualifier("paySuccessTopicQueue2") Queue paySuccessTopicQueue) {
        return BindingBuilder.bind(paySuccessTopicQueue).to(paySuccessTopicExchange).with(RabbitConstant.PAY_SUCCESS_TOPIC_QUEUE_TWO);
    }

}
RabbitMqTopicSendMessage sender
@Component
public class RabbitMqTopicSendMessage {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Bean
    public Queue getQueues() {
        return new Queue(RabbitConstant.PAY_SUCCESS_TOPIC_QUEUE);
    }

    @Bean
    public Queue getQueues2() {
        return new Queue(RabbitConstant.PAY_SUCCESS_TOPIC_QUEUE_TWO);
    }


    public void send() {

        try {
            User user = new User();
            user.setName("This is queue 11111111111111");
            user.setAddress("11111111111111111");
            user.setAge("1");

            User user2 = new User();
            user2.setName("This is line 22222222222");
            user2.setAddress("22222222222222222");
            user2.setAge("2");
            
            amqpTemplate.convertAndSend(
                    RabbitConstant.PAY_SUCCESS_TOPIC_EXCHANGE, RabbitConstant.PAY_SUCCESS_TOPIC_QUEUE, JSON.toJSONString(user));

            amqpTemplate.convertAndSend(
                    RabbitConstant.PAY_SUCCESS_TOPIC_EXCHANGE, RabbitConstant.PAY_SUCCESS_TOPIC_QUEUE_TWO, JSON.toJSONString(user2));

        } catch (Exception e) {
            e.getStackTrace();
        }
    }
Rabbitmqtopictreceivermessage consumer
@Component
@RabbitListener(queues = RabbitConstant.PAY_SUCCESS_TOPIC_QUEUE)
public class RabbitMqTopicReceiverMessage {


    @RabbitHandler
    public void receiver(String val){

        System.out.println("1 receiver :"+val);
    }
}

The results are as follows: the queues bound to the same topic type switch can receive and send messages to other queues

2 receiver :{"address":"22222222222222222","age":"2","name":"This is line 22222222222"}
1 receiver :{"address":"11111111111111111","age":"1","name":"This is queue 11111111111111"}
1 receiver :{"address":"22222222222222222","age":"2","name":"This is line 22222222222"}

 

Fanout

fanout type switch is a standard broadcast mode. You can get messages after subscribing to the queues of the switch

RabbitMqFanoutConfiguration 
/**
 * @Author: 
 * @Description: The broadcast switch is not so complicated. It just pushes messages to all queues bound to the switch
 */
@Configuration
public class RabbitMqFanoutConfiguration {


    /**
     * Broadcast switch registration
     */
    @Bean(name = "paySuccessFanoutExchange")
    public FanoutExchange paySuccessFanoutExchange() {
        return new FanoutExchange(RabbitConstant.PAY_SUCCESS_FANOUT_EXCHANGE);
    }

    /**
     * Subscription broadcast queue registration
     */
    @Bean(name = "paySuccessQueue")
    public Queue paySuccessQueue() {
        return new Queue(RabbitConstant.PAY_SUCCESS_FANOUT_EXCHANGE_QUEUE);
    }

    /**
     * Subscription broadcast queue registration
     */
    @Bean(name = "paySuccessQueue2")
    public Queue paySuccessQueue2() {
        return new Queue(RabbitConstant.PAY_SUCCESS_FANOUT_EXCHANGE_QUEUE2);
    }

    /**
     * Bind the broadcast switch and the queue. A switch can bind multiple queues
     */
    @Bean
    public Binding bingPaySuccessFanoutExchangeAndQueue(
            @Qualifier("paySuccessFanoutExchange") FanoutExchange paySuccessFanoutExchange,
            @Qualifier("paySuccessQueue") Queue paySuccessQueue) {
        return BindingBuilder.bind(paySuccessQueue).to(paySuccessFanoutExchange);
    }

    @Bean
    public Binding bingPaySuccessFanoutExchangeAndQueue2(
            @Qualifier("paySuccessFanoutExchange") FanoutExchange paySuccessFanoutExchange,
            @Qualifier("paySuccessQueue2") Queue paySuccessQueue) {
        return BindingBuilder.bind(paySuccessQueue).to(paySuccessFanoutExchange);
    }
}
RabbitMFanoutSendMessage
@Component
public class RabbitMFanoutSendMessage {


    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send() {

        try {
            User user = new User();
            user.setName("This is queue 11111111111111");
            user.setAddress("11111111111111111");
            user.setAge("1");

            User user2 = new User();
            user2.setName("This is line 22222222222");
            user2.setAddress("22222222222222222");
            user2.setAge("2");
            amqpTemplate.convertAndSend(
                    RabbitConstant.PAY_SUCCESS_FANOUT_EXCHANGE, RabbitConstant.PAY_SUCCESS_FANOUT_EXCHANGE_QUEUE, JSON.toJSONString(user));

            amqpTemplate.convertAndSend(
                    RabbitConstant.PAY_SUCCESS_FANOUT_EXCHANGE, RabbitConstant.PAY_SUCCESS_FANOUT_EXCHANGE_QUEUE2, JSON.toJSONString(user2));

        } catch (Exception e) {
            e.getStackTrace();
        }
    }

}
RabbitMqFanoutReceiverMessage
@Component
@RabbitListener(queues = RabbitConstant.PAY_SUCCESS_FANOUT_EXCHANGE_QUEUE2)
public class RabbitMqFanoutReceiverMessage {


    @RabbitHandler
    public void receiver(String val){

        System.out.println("1 receiver :"+val);
    }
}

https://blog.csdn.net/clypm/article/details/76178992

53 original articles published, 40 praised, 120000 visitors+
Private letter follow

Tags: JSON RabbitMQ

Posted on Tue, 14 Jan 2020 20:11:01 -0800 by centenial