RabbitMQ delay queue, message delay push

Author: seaward
Source: https://www.cnblogs.com/haixiang/p/10966985.html

Application scenario

At present, the common application software has the shadow of message delay push, which is also widely used, for example:

  • Taobao automatically confirms the receipt within seven days. After we sign in the goods, the logistics system will delay sending a message to the payment system seven days later to inform the payment system to send the payment to the merchant. This process lasts for seven days, which is to use the delayed push function of the message middleware.
  • 12306 ticket purchase payment confirmation page. We often have a countdown in the page of selecting tickets and clicking OK to jump, which means that if the order is not confirmed within 30 minutes, it will automatically cancel the order. In fact, at the moment when the order is placed, the ticketing business system will send a delay message to the order system, delaying 30 minutes to tell the order system that the order has not been completed. If we complete the order within 30 minutes, we can ignore the received consumption information through the logic code judgment.

In the above two scenarios, if we use the following two traditional solutions, we will undoubtedly greatly reduce the overall performance and throughput of the system:

  • Use redis to set the expiration time for the order. Finally, determine whether the order has been completed by judging whether there is still the order in redis. Compared with the delay push performance of messages, this solution has a lower performance, because we know that redis is stored in memory. When we encounter malicious orders or orders, it will bring huge pressure to memory.
  • The traditional database polling is used to judge the status of orders in the database table, which undoubtedly increases the number of IO, and the performance is extremely low.
  • Using the jvm's native DelayQueue also consumes a lot of memory, and there is no persistence policy, so the system will lose the order information when it goes down or restarts.

Implementation of message delay push

Before RabbitMQ 3.6.x, we usually used dead letter queue + TTL expiration time to implement delay queue. We will not introduce it here too much. Please refer to the previous articles to understand: TTL, dead letter queue

Starting from RabbitMQ 3.6.x, RabbitMQ officially provides plug-ins for delay queues, which can be downloaded and placed in plugins under the root directory of RabbitMQ. Delay queue plug-in download

First, we create switches and message queues, and the configuration in application.properties is the same as in the previous article.

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MQConfig {

    public static final String LAZY_EXCHANGE = "Ex.LazyExchange";
    public static final String LAZY_QUEUE = "MQ.LazyQueue";
    public static final String LAZY_KEY = "lazy.#";

    @Bean
    public TopicExchange lazyExchange(){
        //Map<String, Object> pros = new HashMap<>();
        //Set switch to support delay message push
        //pros.put("x-delayed-message", "topic");
        TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Queue lazyQueue(){
        return new Queue(LAZY_QUEUE, true);
    }

    @Bean
    public Binding lazyBinding(){
        return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);
    }
}

In the declaration of Exchange, we can set exchange.setDelayed(true) to open the delay queue, or we can set it as the following method to pass in the declaration of switch, because the bottom layer of the first method is realized in this way.

        //Map<String, Object> pros = new HashMap<>();
        //Set switch to support delay message push
        //pros.put("x-delayed-message", "topic");
        TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);

When sending a Message, we need to specify the delay push time. Here, we pass in the parameter new MessagePostProcessor() in the method of sending a Message to get the Message object, because we need to use the api of the Message object to set the delay time.

import com.anqi.mq.config.MQConfig;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class MQSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //Confirm callback returncallback code is omitted, please refer to the previous article
  
    public void sendLazy(Object message){
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id + timestamp globally unique
        CorrelationData correlationData = new CorrelationData("12345678909"+new Date());

        //Specify the header delay time when sending messages
        rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message,
                new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //Set message persistence
                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                //message.getMessageProperties().setHeader("x-delay", "6000");
                message.getMessageProperties().setDelay(6000);
                return message;
            }
        }, correlationData);
    }
}

We can observe the underlying code of setDelay(Integer i), and also set x-delay in the header. It's the same as setting the header manually

message.getMessageProperties().setHeader("x-delay", "6000");

/**
 * Set the x-delay header.
 * @param delay the delay.
 * @since 1.6
 */
public void setDelay(Integer delay) {
	if (delay == null || delay < 0) {
		this.headers.remove(X_DELAY);
	}
	else {
		this.headers.put(X_DELAY, delay);
	}
}

Consumer consumption

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Component
public class MQReceiver {

    @RabbitListener(queues = "MQ.LazyQueue")
    @RabbitHandler
    public void onLazyMessage(Message msg, Channel channel) throws IOException{
        long deliveryTag = msg.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag, true);
        System.out.println("lazy receive " + new String(msg.getBody()));

    }

test result

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
public class MQSenderTest {

    @Autowired
    private MQSender mqSender;

    @Test
    public void sendLazy() throws  Exception {
        String msg = "hello spring boot";

        mqSender.sendLazy(msg + ":");
    }
}

As expected, after 6 seconds, I received the message lazy receive hello spring boot:




Please indicate the source of reprint, thank you. https://www.cnblogs.com/haixiang/p/10966985.html


Tags: RabbitMQ Java Redis Database

Posted on Fri, 10 Apr 2020 22:19:15 -0700 by raker7