AcTiveMQ simple learning 2 - Publish / subscribe to messages

Follow the previous article "ActiveMQ simple learning 1 of JMS": https://my.oschina.net/hanchao/blog/1611447

1, Introduction

The publisher / subscriber model supports a specific message topic to publish messages. 0 or N subscribers may be interested in receiving messages from a specific topic. In this model, publishers and subscribers don't know each other. This model can be summarized as follows:

Multiple consumers (subject subscribers) can get the same message

There is a time dependency between publishers and subscribers. The publisher needs to establish a subscription so that customers can subscribe. Subscribers must remain active to receive messages. If the publisher publishes a message and the subscriber is inactive, the subscriber cannot get the message, and even if it is reactivated, it cannot get the message just published.

Often used when a topic message needs to be consumed by multiple consumers

Note: the publish and subscribe mode and point-to-point use are basically the same, and there are no more statements here. Just put

Destination destination = session.createQueue("test-Queue");

Change to the following: publish / subscribe theme mode:

Topic topic  = session.createTopic("test-Topic");

2, Code introduction

package com.hanchao.mq;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;

import javax.jms.*;

/**
 * Publish and subscribe message model test
 *
 * @author liweihan
 * @time 2018-01-27 12:23
 */
public class AcTiveMQTest2 {

    /**
     * @author liweihan
     * @description  Production message to topic
     * @throws Exception
     */
    @Test
    public void sendMessageToTopic() throws Exception {

        //1. create a ConnectionFactory
        String brokerUrl = "tcp://localhost:61616";
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
        //2. Create a connection and turn it on
        Connection connection = connectionFactory.createConnection();
        connection.start();
        //3. Create Session
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        //4. Create message destination topic
        Topic topic = session.createTopic("test-Topic");
        //5. Create producers
        MessageProducer producer = session.createProducer(topic);
        //6. Create and send messages
        for (int i = 4; i <= 9; i++) {
            TextMessage textMessage = session.createTextMessage("Hello,MQ-"+i);
            producer.send(textMessage,DeliveryMode.PERSISTENT,i,0);
        }

        //7. Release resources
        producer.close();
        session.close();
        connection.close();
    }

    /**
     * @author liweihan
     * @description  Topic consumption message 1: when a topic publishes a message,
     *                This consumer must be alive to receive messages published by the topic.
     *
     *  Start before the producer can test successfully!
     */
    @Test
    public void consumerMessageFromTopic() throws Exception{
        //1. Create ConnectionFactory
        String brokerUrl = "tcp://localhost:61616";
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);


        //2. Create a connection and turn it on
        Connection connection = connectionFactory.createConnection();
        connection.start();
        //3. Create Session
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        //4. Create message destination
        Topic topic  = session.createTopic("test-Topic");
        //5. Create consumers
        MessageConsumer consumer = session.createConsumer(topic);
        //6. Consumption message: listen to the messages in the queue. If there are new messages, the onMessage() method will be executed
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage)message;
                try {
                    String text = textMessage.getText();
                    System.out.println(" consumer 1 >>> : " + text);
                } catch (JMSException e) {
                    //Do not deal with the exception, and directly throw it up
                    throw  new RuntimeException(e);
                }
            }
        });
        /**
         * System.in.read();In order not to let the program exit, you can print messages!
         * Not required by the program!
         */
        System.in.read();
        //Release resources
        consumer.close();
        session.close();
        connection.close();
    }

    /**
     * @author liweihan
     * @description  Consumer message 2: when the subject publishes a message,
     *                This consumer must be alive to receive messages published by the topic.
     *
     * Start before the producer can test successfully!
     */
    @Test
    public void consumerMessageFromTopic2() throws Exception{
        //1. Create ConnectionFactory
        String brokerUrl = "tcp://localhost:61616";
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);


        //2. Create a connection and turn it on
        Connection connection = connectionFactory.createConnection();
        connection.start();
        //3. Create Session
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        //4. Create message destination
        Topic topic  = session.createTopic("test-Topic");
        //5. Create consumers
        MessageConsumer consumer = session.createConsumer(topic);
        //6. Consumption message: listen to the messages in the queue. If there are new messages, the onMessage() method will be executed
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage)message;
                try {
                    String text = textMessage.getText();
                    System.out.println(" consumer 2 >>> : " + text);
                } catch (JMSException e) {
                    //Do not deal with the exception, and directly throw it up
                    throw  new RuntimeException(e);
                }
            }
        });
        /**
         * System.in.read();In order not to let the program exit, you can print messages!
         * Not required by the program!
         */
        System.in.read();
        //Release resources
        consumer.close();
        session.close();
        connection.close();
    }
}

Tags: Session Apache Junit

Posted on Thu, 30 Apr 2020 19:36:50 -0700 by yandoo