Java first understands RabbitMQ Return message mechanism

Java first understands RabbitMQ Return message mechanism

What is the Return message mechanism

The Return message mechanism is used to process some non routable messages. Under normal circumstances, the message production end routes the message to a Queue by specifying an Exchange and RoutingKey, and then the consumer end listens to the Queue for consumption. But in some cases, such as when sending messages, the current Exchange does not exist or the specified RoutingKey cannot route to the Queue. At this time, if we need to listen to such unreachable messages, we need to use the Return listener mechanism.

In the basic API, there is a key configuration item, mandatory:

  • If true, the listener receives a message with an unreachable route and then processes it later.
  • If false, the broker automatically deletes the message.
        // mandatory is true
        channel.basicPublish(exchange , routingKey , true , null , msg.getBytes());

Production end

We use the default switch here, and its routing rules can be seen in the following blog.
Java's initial understanding of RabbitMQ (default exchange)

package com.kaven.rabbitmq.api.returnListener;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    // IP of own server
    private static String ip = "IP";
    // The default port for RabbitMQ startup is also the port for applications to connect to RabbitMQ
    private static int port = 5672;
    // RabbitMQ has a virtual host of "/"
    private static String virtualHost = "/";

    // default exchange
    private static String exchange = "";
    // Routing rule of default exchange: routingKey (test) will match queue(test) with the same name
    private static String routingKey = "test";
    private static String routingKeyError = "test_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 create ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 create Connection
        Connection connection = connectionFactory.newConnection();

        // 3 Create Channel
        Channel channel = connection.createChannel();

        //4 add ReturnListener
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
                                     AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("---------- handle return ------------");
                System.out.println("replyCode: " + replyCode);
                System.out.println("replyText: " + replyText);
                System.out.println("exchange: " + exchange);
                System.out.println("routingKey: " + routingKey);
                System.out.println("properties: " + properties);
                System.out.println("body: " + new String(body));
            }
        });

        // 5 send message
        String msg = "RabbitMQ: return message";
        String msgError = "RabbitMQ: error return message";
        // mandatory is true
        channel.basicPublish(exchange , routingKey , true , null , msg.getBytes());
        channel.basicPublish(exchange , routingKeyError , true,null , msgError.getBytes());
    }
}

Consumer end

package com.kaven.rabbitmq.api.returnListener;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    // IP of own server
    private static String ip = "IP";
    // The default port for RabbitMQ startup is also the port for applications to connect to RabbitMQ
    private static int port = 5672;
    // RabbitMQ has a virtual host of "/"
    private static String virtualHost = "/";

    // default exchange
    private static String exchange = "";
    // Team name
    private static String queueName = "test";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 1 create ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 create Connection
        Connection connection = connectionFactory.newConnection();

        // 3 Create Channel
        Channel channel = connection.createChannel();

        // 4 create Queue
        channel.queueDeclare(queueName , true , false , false , null);

        // 5 create consumers
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName , true , consumer);

        // 6 receiving messages
        while (true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println(msg);
        }
    }
}

test

According to the codes of the production side and the consumer side, the messages with RoutingKey = "test_1" cannot be routed to the corresponding Queue, because we do not define a Queue named test_1here. From RabbitMQ Management, we can see that the Queue has been defined so far.
Start the production side. From RabbitMQ Management, you can see that a message is ready for the queue named test.

Production end output:

---------- handle return ------------
replyCode: 312
replyText: NO_ROUTE
exchange: 
routingKey: test_1
properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body: RabbitMQ: error return message

Obviously the result is right.

Start the consumer end and output as follows:

RabbitMQ: return message

Obviously the consumer has received the message.

As can be seen from RabbitMQ Management, the queue named test has no messages ready because it has been received by the consumer.

Try again when the manufacturer is false. The production side needs to modify the code, and the consumer side does not need to modify the code.

Production end:

package com.kaven.rabbitmq.api.returnListener;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    // IP of own server
    private static String ip = "IP";
    // The default port for RabbitMQ startup is also the port for applications to connect to RabbitMQ
    private static int port = 5672;
    // RabbitMQ has a virtual host of "/"
    private static String virtualHost = "/";

    // default exchange
    private static String exchange = "";
    // Routing rule of default exchange: routingKey (test) will match queue(test) with the same name
    private static String routingKey = "test";
    private static String routingKeyError = "test_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 create a ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 create Connection
        Connection connection = connectionFactory.newConnection();

        // 3 Create Channel
        Channel channel = connection.createChannel();

        //4 add ReturnListener
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
                                     AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("---------- handle return ------------");
                System.out.println("replyCode: " + replyCode);
                System.out.println("replyText: " + replyText);
                System.out.println("exchange: " + exchange);
                System.out.println("routingKey: " + routingKey);
                System.out.println("properties: " + properties);
                System.out.println("body: " + new String(body));
            }
        });

        // 5 send message
        String msg = "RabbitMQ: return message";
        String msgError = "RabbitMQ: error return message";
        // mandatory is false
        channel.basicPublish(exchange , routingKeyError , false,null , msgError.getBytes());
    }
}

Start the production side, and the ReturnListener of the production side is not called (no output). Look at RabbitMQ Management, and there is no message ready. The broker will automatically delete this message.

Published 293 original articles, won praise 338, visited 710000+
His message board follow

Tags: RabbitMQ Java encoding

Posted on Tue, 11 Feb 2020 22:56:01 -0800 by SquirrelJ