The concept of message persistence in ActiveMQ learning

1, Three properties of message:

1. Message header

2. Message body

3. Message properties

2, Message type

        1,TextMessage   

        2,MapMessage    

        3,ObjectMessage    

        4,BytesMessage    

        5,StreamMessage

3, Message persistence

I. parameter description

1. Non persistence

                          messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

Non persistent, message does not exist when mq goes down

2. Persistence (message is persistent by default)

                          messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

Persistence, message exists when mq goes down

2. Persistent message

1. Non persistent queue

Producers



import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ProjectName: springbootActiveMQ
 * @Package: cn.**.test
 * @Author: huat
 * @Date: 2020/1/2 17:04
 * @Version: 1.0
 */
public class ActiveMQTest {
    //url path
    private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
    //Queue name
    private static final String QUEUE_NAME="queue01";
    //Topic name
    private static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) {
        //1. Create connection factory
        //If the account password has not been modified, the account password is admin by default
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
        //If the account password is changed
        //The first parameter is the account, the second is the password, and the third is the requested url
        //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
        try {
            //2. Get connection through connection factory
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //3. Create session session
            //There will be two parameters in it. The first is things and the second is sign in
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //4. Create a destination (queue or topic specifically), here is to create a queue
            Queue queue=session.createQueue(QUEUE_NAME);
            //5. Create message producer, queue mode
            MessageProducer messageProducer = session.createProducer(queue);
            //6. Three messages are produced by messageProducer and sent to MQ message queue
            for (int i=0;i<3;i++){
                //7. Create message
                TextMessage textMessage = session.createTextMessage("msg----->" + i);//Create a text message
                //Message attribute
                textMessage.setStringProperty("c01","vip");
                //8. Send to mq via messageProducer
                messageProducer.send(textMessage);
                //9. Data non persistence
                messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            }
            messageProducer.close();
            session.close();
            connection.close();
            System.out.println("Message sent successfully");
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}

Consumer


import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ProjectName: springbootActiveMQ
 * @Package: cn.**.test
 * @Author: huat
 * @Date: 2020/1/3 8:47
 * @Version: 1.0
 */
public class ActiveMQConsumer {
    //url path
    private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
    //Queue name
    private static final String QUEUE_NAME="queue01";

    public static void main(String[] args) {
        //1. Create connection factory
        //If the account password has not been modified, the account password is admin by default
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
        //If the account password is changed
        //The first parameter is the account, the second is the password, and the third is the requested url
        //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
        try {
            //2. Get connection through connection factory
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //3. Create session session
            //There will be two parameters in it. The first is things and the second is sign in
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //4. The name of the queue accepted here should be the same as that of the sender
            Queue queue = session.createQueue(QUEUE_NAME);
            //5. Create consumer
            MessageConsumer consumer = session.createConsumer(queue);
            //6. Consume messages by listening
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    //If message is not equal to null and belongs to TextMessage type (because the type of message sending is TextMessage, it is judged whether it is this type here)
                    if(null!=message&&message instanceof TextMessage){
                        TextMessage textMessage=(TextMessage)message;
                        try {
                            System.out.println(textMessage.getText());
                            //Get message properties
                            System.out.println("Message attribute--->"+textMessage.getStringProperty("c01"));
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            //7. Keep the console running
            System.in.read();
            //8. Closed resources
            consumer.close();
            session.close();
            connection.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

}

2. Non persistent theme

Producers

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ProjectName: springbootActiveMQ
 * @Package: cn.**.test
 * @Author: huat
 * @Date: 2020/1/4 9:29
 * @Version: 1.0
 */
public class ActiveMQTopicTest {
    //url path
    private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
    //Topic name
    private static final String TOPIC_NAME = "topic01";
    public static void main(String[] args) {
        //1. Create connection factory
        //If the account password has not been modified, the account password is admin by default
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
        //If the account password is changed
        //The first parameter is the account, the second is the password, and the third is the requested url
        //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
        try {
            //2. Get connection through connection factory
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //3. Create session session
            //There will be two parameters in it. The first is things and the second is sign in
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //4. Create a destination (queue or theme). This is to create a theme
            Topic topic=session.createTopic(TOPIC_NAME);
            //5. Create message producer, subject mode
            MessageProducer messageProducer = session.createProducer(topic);
            //6. Three messages are produced by messageProducer and sent to MQ message subject
            for (int i=0;i<3;i++){
                //7. Create message
                TextMessage textMessage = session.createTextMessage("msg----->" + i);//Create a text message
                //8. Send to mq via messageProducer
                messageProducer.send(textMessage);
            }
            messageProducer.close();
            session.close();
            connection.close();
            System.out.println("Message sent successfully");
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}

Consumer


import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ProjectName: springbootActiveMQ
 * @Package: cn.**.test
 * @Author: huat
 * @Date: 2020/1/4 9:43
 * @Version: 1.0
 */
public class ActiveMQTopicConsumer {
    //url path
    private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
    //Topic name
    private static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) {
        //1. Create connection factory
        //If the account password has not been modified, the account password is admin by default
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
        //If the account password is changed
        //The first parameter is the account, the second is the password, and the third is the requested url
        //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
        try {
            //2. Get connection through connection factory
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //3. Create session session
            //There will be two parameters in it. The first is things and the second is sign in
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //4. The name of the queue accepted here should be the same as that of the sender
            Topic topic = session.createTopic(TOPIC_NAME);
            //5. Create consumer
            MessageConsumer consumer = session.createConsumer(topic);
            //6. Consume messages by listening
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    //If message is not equal to null and belongs to TextMessage type (because the type of message sending is TextMessage, it is judged whether it is this type here)
                    if(null!=message&&message instanceof TextMessage){
                        TextMessage textMessage=(TextMessage)message;
                        try {
                            System.out.println(textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            //7. Keep the console running
            System.in.read();
            //8. Closed resources
            consumer.close();
            session.close();
            connection.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

3. Persistent queue

Producers



import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ProjectName: springbootActiveMQ
 * @Package: cn.**.test
 * @Author: huat
 * @Date: 2020/1/2 17:04
 * @Version: 1.0
 */
public class ActiveMQTest {
    //url path
    private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
    //Queue name
    private static final String QUEUE_NAME="queue01";
    //Topic name
    private static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) {
        //1. Create connection factory
        //If the account password has not been modified, the account password is admin by default
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
        //If the account password is changed
        //The first parameter is the account, the second is the password, and the third is the requested url
        //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
        try {
            //2. Get connection through connection factory
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //3. Create session session
            //There will be two parameters in it. The first is things and the second is sign in
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //4. Create a destination (queue or topic specifically), here is to create a queue
            Queue queue=session.createQueue(QUEUE_NAME);
            //5. Create message producer, queue mode
            MessageProducer messageProducer = session.createProducer(queue);
            //6. Three messages are produced by messageProducer and sent to MQ message queue
            for (int i=0;i<3;i++){
                //7. Create message
                TextMessage textMessage = session.createTextMessage("msg----->" + i);//Create a text message
                //Message attribute
                textMessage.setStringProperty("c01","vip");
                //8. Send to mq via messageProducer
                messageProducer.send(textMessage);
                //9. Data persistence
                messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
            }
            messageProducer.close();
            session.close();
            connection.close();
            System.out.println("Message sent successfully");
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}

Consumer


import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ProjectName: springbootActiveMQ
 * @Package: cn.**.test
 * @Author: huat
 * @Date: 2020/1/3 8:47
 * @Version: 1.0
 */
public class ActiveMQConsumer {
    //url path
    private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
    //Queue name
    private static final String QUEUE_NAME="queue01";

    public static void main(String[] args) {
        //1. Create connection factory
        //If the account password has not been modified, the account password is admin by default
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
        //If the account password is changed
        //The first parameter is the account, the second is the password, and the third is the requested url
        //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
        try {
            //2. Get connection through connection factory
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //3. Create session session
            //There will be two parameters in it. The first is things and the second is sign in
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //4. The name of the queue accepted here should be the same as that of the sender
            Queue queue = session.createQueue(QUEUE_NAME);
            //5. Create consumer
            MessageConsumer consumer = session.createConsumer(queue);
            //6. Consume messages by listening
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    //If message is not equal to null and belongs to TextMessage type (because the type of message sending is TextMessage, it is judged whether it is this type here)
                    if(null!=message&&message instanceof TextMessage){
                        TextMessage textMessage=(TextMessage)message;
                        try {
                            System.out.println(textMessage.getText());
                            //Get message properties
                            System.out.println("Message attribute--->"+textMessage.getStringProperty("c01"));
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            //7. Keep the console running
            System.in.read();
            //8. Closed resources
            consumer.close();
            session.close();
            connection.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

}

4. Persistent theme

Producers



import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ProjectName: springbootActiveMQ
 * @Package: cn.**.test
 * @Author: huat
 * @Date: 2020/1/4 9:29
 * @Version: 1.0
 */
public class ActiveMQTopicTest {
    //url path
    private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
    //Topic name
    private static final String TOPIC_NAME = "topic01";
    public static void main(String[] args) {
        //1. Create connection factory
        //If the account password has not been modified, the account password is admin by default
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
        //If the account password is changed
        //The first parameter is the account, the second is the password, and the third is the requested url
        //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
        try {
            //2. Get connection through connection factory
            Connection connection = activeMQConnectionFactory.createConnection();

            //3. Create session session
            //There will be two parameters in it. The first is things and the second is sign in
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //4. Create a destination (queue or theme). This is to create a theme
            Topic topic=session.createTopic(TOPIC_NAME);
            //5. Create message producer, subject mode
            MessageProducer messageProducer = session.createProducer(topic);
            //Persistent data
            messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
            connection.start();
            //6. Three messages are produced by messageProducer and sent to MQ message subject
            for (int i=0;i<3;i++){
                //7. Create message
                TextMessage textMessage = session.createTextMessage("msg----->" + i);//Create a text message
                //8. Send to mq via messageProducer
                messageProducer.send(textMessage);
            }
            messageProducer.close();
            session.close();
            connection.close();
            System.out.println("Message sent successfully");
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}

Consumer



import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ProjectName: springbootActiveMQ
 * @Package: cn.bdqn.test
 * @Author: huat
 * @Date: 2020/1/4 9:43
 * @Version: 1.0
 */
public class ActiveMQTopicConsumer {
    //url path
    private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
    //Topic name
    private static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) {
        //1. Create connection factory
        //If the account password has not been modified, the account password is admin by default
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
        //If the account password is changed
        //The first parameter is the account, the second is the password, and the third is the requested url
        //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
        try {
            //2. Get connection through connection factory
            Connection connection = activeMQConnectionFactory.createConnection();
            //Subscription name
            connection.setClientID("test01");

            //3. Create session session
            //There will be two parameters in it. The first is things and the second is sign in
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //4. The name of the queue accepted here should be the same as that of the sender
            Topic topic = session.createTopic(TOPIC_NAME);
            //5. Persistent subscriber. The first parameter is the subscription subject name and the second is the comment
            TopicSubscriber topicSubscriber=session.createDurableSubscriber(topic,"remakr");
           //6. Start connection
            connection.start();
            //receive message, unlimited time
            Message message = topicSubscriber.receive();
            while (null!=message){
                TextMessage textMessage=(TextMessage)message;
                System.out.println("******------>"+textMessage.getText());
                //Wait five seconds
                topicSubscriber.receive(5000L);
            }
            session.close();
            connection.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

Topic persistence considerations

1. If the consumer registration is closed, the message provider sends the message after the consumer closes, and the consumer opens it again, the message sent by the message provider will be received.

2. If the message provider sends a message before the consumer registers, the consumer will not receive the previous message

Summary:

1. Make sure to run the consumer once first, which is equivalent to registering with MQ. Similar to the topic I subscribed to

2. Then the consumer sends a message when running

3. No matter whether the consumer is online or not, they will receive messages. If they are not online, they will accept all the messages they have not received the next time they connect.

Tags: Programming Session Apache Attribute

Posted on Tue, 07 Jan 2020 08:15:01 -0800 by capella07