RabbitMQ: two mechanisms and optimized writing mode of consumer end

return mechanism:

Return Listener is used to process some non routable messages.

Our message producer sends messages to a queue by specifying an Exchange and Routingkey,

Then our consumers listen to the queue for message processing operations.

However, in some cases, if the current exchange does not exist or the specified routing key cannot be routed when sending messages,

At this time, we need to listen to such unreachable messages. We need to use return listener.

Production end code:

package com.sy.rabbitmq.return_listener;

import com.rabbitmq.client.*;
import com.sy.rabbitmq.TestProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Message return mechanism - return production side
 */
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // Create ConnectionFactory
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(TestProperties.getIp());
        factory.setPort(TestProperties.getPort());
        factory.setVirtualHost("/");

        //2. Get Connection
        Connection connection = factory.newConnection();

        //3. Get channel
        Channel channel = connection.createChannel();

        String exchangeName = "test_confirm_exchange";
        String routeKey = "save";

        //5. Send a message
        String msg = "Hello RabbitMQ send confirm message!";
        //6. Add an error route listener - prevent message non delivery
        channel.addReturnListener(
                new ReturnListener() {
                    @Override
                    public void handleReturn(
                        int replyCode,
                        String replyText,
                        String exchange,
                        String routingKey,
                        AMQP.BasicProperties properties,
                        byte[] body
                    ) throws IOException {
                        System.out.println("-------------return Unable to route message--------------");
                    }
                }
        );
        channel.basicPublish(exchangeName,routeKey,true,null,msg.getBytes());
    }
}

Consumer code:

package com.sy.rabbitmq.return_listener;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.sy.rabbitmq.TestProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Message return mechanism return consumer
 */
public class Comsumer {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // Create ConnectionFactory
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(TestProperties.getIp());
        factory.setPort(TestProperties.getPort());
        factory.setVirtualHost("/");

        //2. Get Connection
        Connection connection = factory.newConnection();

        //3. Get channel
        Channel channel = connection.createChannel();

        //4. Declare an exchange through channel
        String exchangeName = "test_confirm_exchange";
        String routeKey = "confirm.#";
        String queueName = "test_confirm_queue";
        channel.exchangeDeclare(exchangeName,"topic",true);

        //5. Declare a queue through channel, bind and set route key
        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueBind(queueName,exchangeName,routeKey);

        //6. Create consumers
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //7. Bind consumption to queue
        channel.basicConsume(queueName,true,consumer);
        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println(msg);
        }
    }
}

confirm mechanism:

Under normal circumstances, message persistence can be completed if the message enters the queue through the switch, but if the message is unexpected before it reaches the broker, it will cause message loss

  1. Through the transaction mechanism provided by AMQP;
  2. It is implemented by using the sender confirm ation mechanism;

How to implement the Confirm confirmation message?

Step 1: turn on the confirmation mode on the channel: channel.confirmSelect()

Step 2: add listening: addConfirmListener on the channel, which is the return result of listening success and failure,

According to the specific results, the message can be re sent, or the log can be recorded.

Production end code:

package com.sy.rabbitmq.confirm_listener;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.sy.rabbitmq.TestProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Message confirmation mode - confirm production side
 */
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // Create ConnectionFactory
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(TestProperties.getIp());
        factory.setPort(TestProperties.getPort());
        factory.setVirtualHost("/");

        //2. Get Connection
        Connection connection = factory.newConnection();

        //3. Get channel
        Channel channel = connection.createChannel();

        //4. Specify message delivery mode: confirmSelect: message confirmation mode
        channel.confirmSelect();

        String exchangeName = "test_confirm_exchange";
        String routeKey = "confirm.save";

        //5. Send a message
        String msg = "Hello RabbitMQ send confirm message!";
        channel.basicPublish(exchangeName,routeKey,null,msg.getBytes());

        //6. Add a confirmation monitor
        channel.addConfirmListener(
                new ConfirmListener() {
                    @Override
                    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                        System.out.println("----------ack----------");
                    }

                    @Override
                    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                        System.out.println("----------no-ack----------");
                    }
                }
        );
    }
}

Consumer code:

package com.sy.rabbitmq.confirm_listener;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.sy.rabbitmq.TestProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Message confirmation mode - confirm consumer
 */
public class Comsumer {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // Create ConnectionFactory
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(TestProperties.getIp());
        factory.setPort(TestProperties.getPort());
        factory.setVirtualHost("/");

        //2. Get Connection
        Connection connection = factory.newConnection();

        //3. Get channel
        Channel channel = connection.createChannel();

        //4. Declare an exchange through channel
        String exchangeName = "test_confirm_exchange";
        String routeKey = "confirm.#";
        String queueName = "test_confirm_queue";
        channel.exchangeDeclare(exchangeName,"topic",true);

        //5. Declare a queue through channel, bind and set route key
        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueBind(queueName,exchangeName,routeKey);

        //6. Create consumers
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //7. Bind consumption to the queue
        channel.basicConsume(queueName,true,consumer);
        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println(msg);
        }
    }
}

How to write the new version of the consumer end:

The consumer side uses the messages in the circular consumption queue. If the code is not beautiful, you can use:

package com.sy.rabbitmq.consumer;

import com.rabbitmq.client.*;
import com.sy.rabbitmq.TestProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Binding object of new consumption mode
 */
public class Comsumer {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // Create ConnectionFactory
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(TestProperties.getIp());
        factory.setPort(TestProperties.getPort());
        factory.setVirtualHost("/");

        //2. Get Connection
        Connection connection = factory.newConnection();

        //3. Get channel
        Channel channel = connection.createChannel();

        //4. Declare an exchange through channel
        String exchangeName = "test_confirm_exchange";
        String routeKey = "confirm.#";
        String queueName = "test_confirm_queue";
        channel.exchangeDeclare(exchangeName,"topic",true);

        //5. Declare a queue through channel, bind and set route key
        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueBind(queueName,exchangeName,routeKey);

        //----Old version writing method----
        //6. Create consumers
//        QueueingConsumer consumer = new QueueingConsumer(channel);
        //7. Bind consumption to the queue
//        channel.basicConsume(queueName,true,consumer);
//        while(true){
//            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
//            String msg = new String(delivery.getBody());
//            System.out.println(msg);
//        }

        //----The new method binds the object directly, and calls the handleDelivery of the object every time it is consumed----
        channel.basicConsume(queueName,true,new MyConsumer(channel));
    }


    /**
     * Rewrite consumer
     */
    public static class MyConsumer extends DefaultConsumer {
        /**
         * Constructs a new instance and records its association to the passed-in channel.
         *
         * @param channel the channel to which this consumer is attached
         */
        public MyConsumer(Channel channel) {
            super(channel);
        }


        /**
         * Specific consumption code writing
         * @param consumerTag
         * @param envelope
         * @param properties
         * @param body
         * @throws IOException
         */
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("-----------------New consumption mode-----------------");
            System.out.println("consumerTag"+consumerTag);
            System.out.println("envelope"+envelope);
            System.out.println("AMQP.BasicProperties"+properties);
            System.out.println("body"+new String(body));
        }
    }

}

 

Published 69 original articles, won praise 16, visited 40000+
Private letter follow

Tags: RabbitMQ Java

Posted on Sat, 01 Feb 2020 22:35:45 -0800 by purinkle