Function test of ActiveMQ high availability + load balancing cluster

1. Basic function test

ActiveMQ is a JMS Provider implementation that fully supports the JMS 1.1 and J2EE 1.4 specifications. The full name of JMS is Java Message Service, that is, Java Message Service. It is mainly used to deliver messages between producers and consumers. Producers are responsible for generating messages while consumers are responsible for receiving messages. There are two types of message delivery, mainly as follows:

  • One is point-to-point, that is, a producer and a consumer correspond one by one.
  • The other is publish / subscribe mode, that is, after a producer generates a message and sends it, it can be received by multiple consumers.

The message types of ActiveMQ and JMS correspond to the following

JMS message model P2P message model Pub/Sub message model
ActiveMQ Queue queue Topic queue
characteristic One to one, the producer produces a message, which can only be consumed by one consumer One to many, producers produce a message that can be consumed by multiple consumers

Next, we will verify the two types of scenarios separately.

1.1 point to point mode (Queue)

The point-to-point mode is mainly based on a queue, and messages can be sent and received synchronously or asynchronously. The point-to-point message mode can have multiple senders and receivers, but each message will only be sent to a Consumer once.

1.1.1 import dependency

1.ActiveMQ dependency

<dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-all</artifactId>
   <version>5.15.12</version>
</dependency>

2. Springboot ActiveMQ connection pool dependency
If you want to enable connection pooling and use springboot2.0 + and the following versions, the maven configuration dependency is:

<dependency> 
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId> 
</dependency>

If you want to enable connection pooling and use springboot2.1 +, the maven configuration dependency is:

<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
</dependency>

1.1.2 producers publish Queue messages

Because the client of ActiveMQ can only access the Broker of the Master, and other brokers in Slave cannot be accessed, the client connection Broker should use the failover protocol. The specific code is as follows.

String url="failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621) "; / / service address
String queueName = "queue-testqq";//Message name to create
//1. Create ConnectiongFactory and bind address
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2. Create Connection
Connection connection = factory.createConnection();
//3. Start connection
connection.start();
//4. Create a session
//First parameter: enable transaction
//Second parameter: whether to automatically confirm the message
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5. Create a queue
Destination destination = session.createQueue(queueName);
//6. Create a producer
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 20; i++) {
    //7. Create message
    TextMessage textMessage = session.createTextMessage("I am Queue Message producer:" + i);
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    //8. Send message
    producer.send(textMessage);
    System.out.println("Send the first set of messages:" + i);
}
connection.close();

After running the code, you can view the corresponding Queue information in the ActiveMQ console. At this time, there are messages to be received.

1.1.3 consumers receive Queue messages

Consumer can use MessageConsumer.receive() receive messages synchronously, or by using the MessageConsumer.setMessageListener() register a MessageListener for asynchronous reception.
Here, we use asynchronous reception to consume messages. The specific code is as follows:

String url="failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621) "; / / service address
String queueName="queue-testqq";//Message name to consume

//1. Create ConnectiongFactory and bind address
ConnectionFactory factory=new ActiveMQConnectionFactory(url);
//2. Create Connection
Connection connection= factory.createConnection();
//3. Start connection
connection.start();
//4. Create a session
/** First parameter, use transaction or not
 If true is set, you must use the session.commit();
 If false is set, do not use session.commit();
 */
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5. Create a goal
Destination destination=session.createQueue(queueName);
//6. Create a consumer
MessageConsumer consumer=session.createConsumer(destination);
//7. Create a listener
consumer.setMessageListener(new MessageListener() {
    public void onMessage(Message arg0) {
        TextMessage textMessage=(TextMessage)arg0;
        try {
            System.out.println("Receive message:"+textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
});

After executing the code, you can output the following in the console:

View the corresponding Queue information in the ActiveMQ console. At this time, the message has been consumed.

1.1.4 multi consumer model

Open more than two consumers at the same time, run the producer again, and observe the output of each consumer console.


After observation, it is concluded that a message can only be received and consumed by one consumer and cannot be consumed repeatedly. At the same time, it is found that in the case of multiple consumers, the message will be evenly divided, that is, the load balancing strategy.
Messages sent by producers:

Information received by consumer 1:

Information received by consumer 2:

1.2 publish / subscribe mode (Topic)

The Publish/Subscribe message field uses topic as its Destination. The publisher sends messages to topic, and the subscriber registers to receive messages from topic. Any message sent to topic is automatically delivered to all subscribers. The receiving mode (synchronous and asynchronous) is the same as P2P domain.

1.2.1 producers publish Topic messages

Pub/Sub mode and P2P mode are basically the same at the code implementation level, with only Queue and Topic changing. The specific code of the Topic message released by the producer is as follows:

String url="failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621) "; / / service address, port default 61616
        String topicName="topic-testqq";//Message name to create

        //1. Create ConnectiongFactory and bind address
        ConnectionFactory factory=new ActiveMQConnectionFactory(url);
        //2. Create Connection
        Connection connection= factory.createConnection();
        //3. Start connection
        connection.start();
        //4. Create session (parameter 1: start transaction, parameter 2: message confirmation mode)
        Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5. Create a goal
        Destination destination=session.createTopic(topicName);
//        Topic topic = session.createTopic(topicName);
        //6. Create a producer
        MessageProducer producer=session.createProducer(destination);
        for (int i = 0; i < 15; i++) {
            //7. Create message
            TextMessage textMessage=session.createTextMessage("I am topic Type message producer:"+i);
            //8. Send message
            producer.send(textMessage);
            System.out.println("Send message:"+i);
        }
        connection.close();

At this point, the message producer does not start first. Under the Pub/Sub mode, you must start the sub first, otherwise the messages published before you start the sub can't be consumed. Just like you started to subscribe to newspapers today, you can't receive the newspapers before today. The same is true for the publish / subscribe mode.

1.2.2 consumers receive Topic messages

Like P2P mode, Pub/Sub mode has two message receiving modes: synchronous receiving and asynchronous receiving. Here, the Topic message is consumed asynchronously. The specific code is as follows:

String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621) "; / / service address, port default 61616
String topicName = "topic-testqq";//Message name to create

//1. Create ConnectiongFactory and bind address
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2. Create Connection
Connection connection = factory.createConnection();
//3. Start connection
connection.start();
//4. Create a session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5. Create a goal
Destination destination = session.createTopic(topicName);
//6. Create a consumer
MessageConsumer consumer = session.createConsumer(destination);
//7. Receive message. You can choose to receive synchronously or asynchronously
/*//When the consumer receives synchronously, the receive(long timeout) main thread waits for the next message to arrive in blocking mode. The timeout can be set and null will be returned.
TextMessage message = (TextMessage) consumer.receive(1000);
System.out.println("Receive Topic message synchronously: "+ message";*/
//Consumers receive asynchronously and create a listener
consumer.setMessageListener(new MessageListener() {
    public void onMessage(Message arg0) {
        TextMessage textMessage = (TextMessage) arg0;
        try {
            System.out.println("Asynchronous reception Topic Message:" + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
});

Run the code, and you can see that the console output the consumed Topic message.


The ActiveMQ console can also see the corresponding Topic publishing and subscription information. as follows

1.2.3 multi consumer model

Open more than 2 consumers at the same time, run the producer again, and observe the output of each consumer console

The two consumers are running as follows:


It can be found that:

  • topic publish / subscribe mode, a message can be consumed by multiple consumers
  • topic publish / subscribe mode requires consumers to consume instantly, that is, when producers publish messages, consumers must be online at the same time to receive consumption messages.

2. High availability test

2.1 test scheme I

2.1.1 test case

1. The producer connects to the cluster to send 50 messages and sets sleep for 1 second for each message sent
2. Observe the node to which the producer sends the message, and stop the node
3. Observe the message log sent by the producer to see if all messages are sent normally

2.1.2 test code

1. Producers publish messages

String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";
String queueName = "queue-testqq";//Message name to create
        
//1. Create ConnectiongFactory and bind address
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2. Create Connection
Connection connection = factory.createConnection();
//3. Start connection
connection.start();
//4. Create a session
//First parameter: enable transaction
//Second parameter: whether to automatically confirm the message
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5. Create a goal
Destination destination = session.createQueue(queueName);
//        Queue queue = session.createQueue("test-Queue");
//6. Create a producer
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 50; i++) {
//7. Create message
TextMessage textMessage = session.createTextMessage("I am the first group of message producers:" + i);
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
}
//8. Send message
producer.send(textMessage);
System.out.println("Send the first set of messages:" + i);
  }
connection.close();

2.1.3 test process

1. Run the producer to publish messages and observe the producer console

2. Stop mq1(61616) node

3. Continue to observe the producer console
(1) 61616 node is unable to connect at this time

(2) The producer has successfully connected to 61619 node and continues to send messages

2.2 test plan II

2.2.1 test cases

1. The producer connects the cluster and sends 20 messages
2. Observe the node to which the producer sends the message, and stop the node
3. Consumers connect cluster consumption messages and observe message consumption

2.2.2 test code

1. Producers publish messages

String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";
String queueName = "queue-testqq";//Message name to create
        
//1. Create ConnectiongFactory and bind address
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2. Create Connection
Connection connection = factory.createConnection();
//3. Start connection
connection.start();
//4. Create a session
//First parameter: enable transaction
//Second parameter: whether to automatically confirm the message
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5. Create a goal
Destination destination = session.createQueue(queueName);
//        Queue queue = session.createQueue("test-Queue");
//6. Create a producer
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 20; i++) {
//7. Create message
TextMessage textMessage = session.createTextMessage("I am the first group of message producers:" + i);
//8. Send message
producer.send(textMessage);
System.out.println("Send the first set of messages:" + i);
  }
connection.close();

2. Consumers receive messages

String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";
String queueName = "queue-testqq";//Message name to create

    //1. Create ConnectiongFactory and bind address
    ConnectionFactory factory = new ActiveMQConnectionFactory(url);
    //2. Create Connection
    Connection connection = factory.createConnection();
    //3. Start connection
    connection.start();
    //4. Create a session
    /** First parameter, use transaction or not
     If true is set, you must use the session.commit();
     If false is set, do not use session.commit();
     */
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //5. Create a goal
    Destination destination = session.createQueue(queueName);
    //6. Create a consumer
    MessageConsumer consumer = session.createConsumer(destination);
    //7. Create a listener
    consumer.setMessageListener(new MessageListener() {
        public void onMessage(Message arg0) {
            TextMessage textMessage = (TextMessage) arg0;
            try {
                System.out.println("Receive message:" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    });

2.2.3 test process

1. Run the producer to publish messages and observe the producer console

2. Stop mq4(61619) node

3. Start consumers and observe consumption

It is observed that consumers connect 61618 nodes and receive consumption messages successfully.

2.3 test conclusion

After testing, when the current cluster starts the message producer to send messages and causes the node where the producer is located to shut down, the conclusion is as follows:
1. For ActiveMQ cluster of high availability architecture, during the process of producing messages, the node where the producer is located hangs up, and the client will temporarily block sending messages, but the overall availability will not be affected.
2. For the ActiveMQ cluster with high availability architecture, after the node where the message producer is located hangs up, the consumer can still consume the message normally
3. If one of the nodes of the current ActiveMQ cluster hangs up, ActiveMQ provides services normally without affecting service availability

3. Load balancing test

The final architecture is that the two master slave clusters are connected to each other, and the two clusters can consume each other's messages. However, if the cluster connected by the client hangs up, the client still cannot send messages, that is to say, the load balance of activemq is only to achieve the load balance of consumption, and the high availability is guaranteed by the master slave.

3.1 test cases

1. Start two consumers to listen to the same queue, and configure all nodes of the cluster for service addresses
2. The producer connects the cluster to send 20 consecutive messages to the specified queue
3. Observe the logs of consumption messages of two producers

3.2 test code

1. Producers publish messages

String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";
String queueName = "queue-testqq";//Message name to create

//1. Create ConnectiongFactory and bind address
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2. Create Connection
Connection connection = factory.createConnection();
//3. Start connection
connection.start();
//4. Create a session
//First parameter: enable transaction
//Second parameter: whether to automatically confirm the message
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5. Create a goal
Destination destination = session.createQueue(queueName);
//Queue queue = session.createQueue("test-Queue");
//6. Create a producer
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 20; i++) {
//7. Create message
TextMessage textMessage = session.createTextMessage("I am the first group of message producers:" + i);
//8. Send message
producer.send(textMessage);
System.out.println("Send the first set of messages:" + i);
}
connection.close();

2. Consumers receive consumption messages

String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618," +
"tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";
String queueName = "queue-testqq";//Message name to create
//1. Create ConnectiongFactory and bind address
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2. Create Connection
Connection connection = factory.createConnection();
//3. Start connection
connection.start();
//4. Create a session
/** First parameter, use transaction or not
 If true is set, you must use the session.commit();
If false is set, do not use session.commit();
*/
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5. Create a goal
Destination destination = session.createQueue(queueName);
//6. Create a consumer
MessageConsumer consumer = session.createConsumer(destination);
//7. Create a listener
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message arg0) {
TextMessage textMessage = (TextMessage) arg0;
try {
/* if (textMessage.getText().contains("10")) {
System.out.println("======Message exception ========);
throw new Exception();
}*/
System.out.println("Receive message:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});

3.3 test process

  1. At the same time, open two consumers, run the producer, and observe the output of each consumer console.

  1. Then run the producer and observe the output of each consumer console.
    Information released by producers:


Information received by consumer 1:

Information received by consumer 2:

It is observed that the message will be evenly divided among multiple consumers, that is, the load balancing strategy. And the same message will only be received and consumed by one consumer.

3.4 test conclusion

After testing, when multiple consumers consume the same queue, the current cluster can realize the load balance of message consumption, so as to realize the shunt of ActiveMQ cluster and improve the cluster throughput.

So far, the function test, high availability test and load balancing test of ActiveMQ cluster have been completed. Currently, the function of high availability + load balancing of ActiveMQ cluster is normal.

Tags: Java Session Load Balance Apache

Posted on Fri, 22 May 2020 03:07:54 -0700 by naitsir