Message mode of RabbitMQ

Article directory

100% delivery of messages

How to guarantee 100% successful delivery of messages?
What is reliable delivery at the production end?

  • Guarantee the successful sending of messages
  • Guarantee the successful reception of MQ node
  • The sender receives an MQ node (Broker) acknowledgement
  • Perfect message compensation mechanism

BAT/TMD Internet solutions:

  • The message falls into the database and marks the message status
  • Delayed delivery of messages, secondary confirmation, callback check

Scheme 1: the message is dropped to the database and the message status is marked


The process is as follows:
Step 1: enter the order into the warehouse, and create a MSG (status is 0) into the MSG DB Library
Step 2: send out the message
Step 3: listen for message reply (from Broker)
Step 4: modify the status of the message to 1 (success)
Step 5: the distributed timed task grabs the message with status 0
Step 6: resend the message with status 0
Step 7: set the status to 2 (message delivery failure status) if more than 3 attempts are made (which can be modified according to the actual situation)

Scheme II delayed delivery of messages, secondary confirmation and callback check


Step 1: first, the business data is settled in the database, and the first message is sent after success
Step 2: send the second message immediately (it can be used to find the first message), which is used to delay (it may be sent 2 or 3 minutes later) the message delivery check
Step 3: after the Broker receives the message, the consumer processes the message
Step 4: after the processing is successful, send the confirm message
Step 5: after receiving the confirm message, persist the message
Step 6: after receiving the delay message, check the DB database. If the corresponding first message has been processed, nothing will be done. If the delay message is received, check the DB database and find that the corresponding first message fails to be processed (or there is no record), send the retransmission command to the upstream service. Repeat step 1

The concept of idempotent

What is idempotence?

  • We can learn from optimistic locking mechanism of database
  • For example, we execute an SQL statement to update the inventory
  • Update t_repository set count = count -1,version = version + 1 where version = 1
  • Elasticsearch also strictly follows the concept of idempotence. Each data update, version+1 (mentioned earlier in the blogger's blog)

Consumer - idempotent protection
How to avoid the repeated consumption of messages in the peak period of massive orders?

Consumption implements idempotence, which means that our messages will never be consumed multiple times, even if we receive multiple identical messages

The mainstream idempotent operation in the industry
Unique ID + fingerprint code mechanism, using database primary key to de duplicate
Using the atomicity of Redis to realize

Unique ID + fingerprint code mechanism
Unique ID + fingerprint code mechanism, using database primary key to de duplicate
Select count (1) from t "order where id = unique ID + fingerprint code
Benefit: simple implementation
Disadvantage: performance bottleneck with database write in high concurrent delivery
Solution: algorithm routing by database and table according to ID

Using the atomicity of Redis to realize
Problems to be considered when using Redis to idempotent
First: do we need to drop data into the database? If so, how to make the database and cache atomicity?
Second: if the database is not dropped, it is stored in the cache. How to set the timing synchronization strategy?

Confirm confirmation message

Understand the Confirm message confirmation mechanism
Message confirmation refers to that after the producer delivers the message, if the Broker receives the message, it will give us a response.
The producer receives the response to determine whether the message is sent to the Broker normally. This method is also the core guarantee for reliable delivery of the message

How to implement Confirm message?
Step 1: turn on confirmation mode on Channel: channel.confirmSelect()
Step 2: add a listener: addConfirmListener on the channel. The successful and failed results of the listener will be returned. According to the specific results, the message will be resend or log!

Consumer code:

package comswx.rabbitmqapi.confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * @author Songwanxi
 * @site www.lentter.club
 * @company
 * @create  2020-02-28 17:47
 *
 * SMS microservice
 */
public class Consumer {
    public static void main(String[] args) throws Exception{
        //Connection factory
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("122.51.229.49");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.#";
        String queueName = "test_confirm_queue";

        //4 declare the switch and queue, then set the binding, and finally make the routing Key
        channel.exchangeDeclare(exchangeName, "topic", true);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        //5 create consumers
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, queueingConsumer);

        while(true){
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());

            System.err.println("Consumer end: " + msg);
        }
    }
}

Production end:

package comswx.rabbitmqapi.confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

/**
 * @author Songwanxi
 * @site www.lentter.club
 * @company
 * @create  2020-02-28 17:53
 * Payment microservice
 */
public class Producer {
    public static void main(String[] args) throws Exception {
//Connection factory
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("122.51.229.49");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        //4 specify our message delivery mode: message confirmation mode
        channel.confirmSelect();

        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.save";

        //5 send a message
        String msg = "Hello RabbitMQ Send confirm message!";
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());

        //6 add a confirmation monitor
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("-------no ack!-No sign----------");
            }

            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("-------ack!--Sign for---------");
            }
        });
    }
}

Return return message

Return Listener is used to process some non routable messages!
Normal situation: our message producer, by specifying an Exchange and RoutingKey, delivers the message to a queue, and then our consumer listens to the queue for consumption processing operations!
Exception: in some cases, if the current Exchange does not exist or the specified routing key cannot be routed when sending messages, if we need to listen to such unreachable messages at this time, we need to use Return Listener!

There is a key configuration item in the base API
Mandatory: if it is true, the listener will receive a message with an unreachable route, and then carry out subsequent processing. If it is false, the Broker will automatically delete the message!

Consumer:

package comswx.rabbitmqapi.returnlistener;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * @author Songwanxi
 * @site www.lentter.club
 * @company
 * @create  2020-02-28 18:05
 */
public class Consumer {
    public static void main(String[] args) throws Exception {
        //Connection factory
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("122.51.229.49");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_return_exchange";
        String routingKey = "return.#";
        String queueName = "test_return_queue";

        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        channel.basicConsume(queueName, true, queueingConsumer);

        while(true){
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.err.println("Consumer: " + msg);
        }

    }
}

Producer:

package comswx.rabbitmqapi.returnlistener;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author Songwanxi
 * @site www.lentter.club
 * @company
 * @create  2020-02-28 18:06
 */
public class Producer {
    public static void main(String[] args) throws Exception {
        //Connection factory
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("122.51.229.49");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        String exchange = "test_return_exchange";
        String routingKey = "return.save";//Correct rule return#
        String routingKeyError = "abc.save";//This is wrong

        String msg = "Hello RabbitMQ Return Message";


        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange,
                                     String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.err.println("---------handle  return----------");
                System.err.println("replyCode: " + replyCode);
                System.err.println("replyText: " + replyText);
                System.err.println("exchange: " + exchange);
                System.err.println("routingKey: " + routingKey);
                System.err.println("properties: " + properties);
                System.err.println("body: " + new String(body));
            }
        });

        //Successful message delivery will be consumed by consumers
//        channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
        //Message unreachable, will trigger ReturnListener
        channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());

    }
}

Listening to unreachable messages

Custom consumer

We usually write a while loop in the code, carry out the consumer.nextDelivery method to get the next message, and then carry out consumption processing!
But we use the user-defined Consumer more convenient, more decoupling, but also the most commonly used way of use in practical work!
Custom consumer:

package comswx.rabbitmqapi.consumer;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

/**
 * @author Songwanxi
 * @site www.lentter.club
 * @company
 * @create  2020-02-28 18:16
 * The processing scheme of the message received by SMS micro service
 */
public class MyConsumer extends DefaultConsumer {
    public MyConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
    }
}

Consumer:

package comswx.rabbitmqapi.consumer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * @author Songwanxi
 * @site www.lentter.club
 * @company
 * @create  2020-02-28 17:47
 *
 * SMS microservice
 */
public class Consumer {
    public static void main(String[] args) throws Exception{
        //Connection factory
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("122.51.229.49");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_consumer_exchange";
        String routingKey = "consumer.#";
        String queueName = "test_consumer_queue";

        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        channel.basicConsume(queueName, true, new MyConsumer(channel));
    }
}

Producer:

package comswx.rabbitmqapi.consumer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

/**
 * @author Songwanxi
 * @site www.lentter.club
 * @company
 * @create  2020-02-28 17:53
 * Payment microservice
 */
public class Producer {
    public static void main(String[] args) throws Exception {
//Connection factory
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("122.51.229.49");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchange = "test_consumer_exchange";
        String routingKey = "consumer.save";

        String msg = "Hello RabbitMQ Consumer Message";

        for(int i =0; i<5; i ++){
            channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
        }
    }
}

Published 93 original articles, won praise 9, visited 4719
Private letter follow

Tags: RabbitMQ Database Java Redis

Posted on Fri, 28 Feb 2020 02:38:57 -0800 by projectshifter