rabbitmq(5) subscription mode

1, model

A producer binds a switch, and each consumer binds a queue. Producers distribute messages through switches to all online consumers.
The switch does not have the ability of message storage and can only send messages to the current online consumers. Therefore, consumers who do not receive messages cannot get the sent messages even if they reconnect with rabbitmq

The difference between subscription mode and normal queue:
1. Only one consumer can receive the message sent by the producer in the normal queue; the message sent by the producer in the subscription mode will be received by all online consumers;
2. The producer message will be saved in the normal queue. Even if the consumer does not receive the message successfully, the producer message will be saved in the subscription mode;

2, code

2.1 producers

private final static String EXCHANGE_NAME = "test_exchange";

@Test
public void testSend() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.1.7");
    factory.setUsername("root");
    factory.setPassword("123456");

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    /* 
     * Claim switch, fanout: distribution
     * The switch does not have the ability to store messages and can only send messages to the current online consumers
     * Consumers who do not receive a message cannot get the sent message even if they reconnect with rabbitmq
     */
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

    String message = "Hello world";
    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
    System.out.println(" [1] Sent '" + message + "'");

    channel.close();
    connection.close();
}

2.2 consumers 1

private final static String EXCHANGE_NAME = "test_exchange";

public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.1.7");
    factory.setUsername("root");
    factory.setPassword("123456");

    Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    // Bind queue to switch forwarder
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    System.out.println(" [1] Waiting for messages. To exit press CTRL+C");
    Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [1] Received '" + message + "'");

            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    };
    boolean autoAck = false;
    channel.basicConsume(queueName, autoAck, consumer);
}

2.2 consumers 2

private final static String EXCHANGE_NAME = "test_exchange";

public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.1.7");
    factory.setUsername("root");
    factory.setPassword("123456");

    Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    // Bind queue to switch forwarder
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    System.out.println(" [2] Waiting for messages. To exit press CTRL+C");
    Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [2] Received '" + message + "'");

            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    };
    boolean autoAck = false;
    channel.basicConsume(queueName, autoAck, consumer);
}

Tags: RabbitMQ

Posted on Sat, 04 Apr 2020 17:54:24 -0700 by atyndall