Getting Started with rocketmq (Literacy Edition - demo with Example)

1. What is Rocketmq

Message Queue RocketMQ is a professional message middleware independently developed by Alibaba Group. Based on highly available distributed cluster technology, it provides a series of message cloud services such as message subscription and publishing, message track query, timed (delayed) message, resource statistics, monitoring and alarm, etc. It is the core product of enterprise Internet architecture.With more than nine years of history, Message Queuing RocketMQ provides distributed applications with the ability to decouple asynchronously, cut peaks and fill valleys, and features required for Internet applications such as mass message stacking, high throughput, reliable retries. It is the core product used by Alibaba Shuang11.
Open Source in 2012, became the top-level apache project in 2017.

2. Noun Interpretation

The following mainly defines and parses the proprietary terms and terms involved in the message queue RocketMQ.

  • Topic
    Message Subject, Level 1 Message Type, Classifies messages by Topic.
  • Message
    Message, the carrier of information in a message queue.
  • Message ID
    A globally unique identification of a message, automatically generated by the Message Queue RocketMQ system, that uniquely identifies a message.
  • Message Key
    The business identity of the message, set by the message Producer, uniquely identifies a business logic.
  • Tag
    Message Label, a secondary message type, used to further distinguish the classification of messages under a Topic.
  • Producer
    A message producer, also known as a message publisher, is responsible for producing and sending messages.
  • Producer instance
    An object instance of a Producer, different Producer instances can run in different processes or on different machines.Producer instance is thread-safe and can be shared among multiple threads within the same process.
  • Consumer
    Message consumers, also known as message subscribers, are responsible for receiving and consuming messages.
  • Consumer instance
    An object instance of Consumer, different Consumer instances can run in different processes or on different machines.Configure thread pool consumption messages within a Consumer instance.
  • Group
    A Producer or Consumer that usually produces or consumes the same type of message and has the same logic for publishing or subscribing to it.
  • Group ID
    Identification of the Group.


  • Transaction message: Implement X/Open XA-like distributed transaction capabilities to achieve final transaction consistency.
  • Timed (Delayed) Messages: Allows the message producer to specify a timed (delayed) delivery of the message for up to 40 days.
  • Big Message: Supports up to 4 MB messages.
  • Message Track: Messages can be clearly located from the publisher through the message track, and full links can be delivered to message subscribers via the message queue RocketMQ server to facilitate location and troubleshooting.
  • Broadcast consumption: Allow all consumers identified by the same Group ID to consume a message individually once.
  • Sequential message: Allows message consumers to consume messages in the order they are sent.
  • Reset consumption progress: Reset consumption progress based on time, allowing users to retrieve messages or discard stacked messages.
  • Dead Letter Queue: Store messages that cannot be consumed properly in a special Dead Letter Queue for subsequent processing.
  • Message filtering: Consumers can filter messages based on message tags (Tag s) to ensure that consumers only receive filtered message types in the end.Message filtering is done on the server side of the message queue RocketMQ.

3. Conceptual Model of Rocketmq

These three are the most basic concepts in Rocketmq.Producer is the producer of the message; Consumer is the consumer of the message.Messages are delivered through Topic.Topic stores the logical address of the message.Specifically, the Producer sends the message to a specific Topic.Consumer subscribes to Topic to pull or receive messages actively or passively.
In fact, topic needs to break up more concepts.The system deployment architecture is as follows:

(1) NameServer is a virtually stateless node that can be deployed in a cluster without any information synchronization between nodes
(2) Broker deployment is relatively complex, Broker atmosphere Master and Slave, a Master can correspond to more than one Slaver, but a Slaver can only correspond to one Master. The relationship between Master and Slaver is defined by specifying the same BrokerName, different BrokerId, 0 for Master, and non-0 for Slaver.Master can deploy multiple.Each broker establishes long connections to all nodes in the NameServer cluster, registering Topic information periodically to all NameServers
(3) Producer establishes a long connection with one of the nodes in the NameServer cluster (randomly selected), periodically fetches Topic routing information from NameServer, establishes a long connection to the Master providing Topic services, and periodically sends a heartbeat to the Master.Produce is completely stateless and can be deployed in clusters
(4) Consumer establishes a long connection with one of the nodes in the NameServer cluster (randomly selected), periodically fetches Topic routing information from NameServer, establishes a long connection to Master and Slaver providing Topic services, and periodically sends heartbeats to Master and Slaver.Consumer can subscribe to messages either from Master or Slave, and the subscription rules are determined by Broker configuration

4. Storage characteristics of Rocketmq

The message store for RocketMQ is accomplished by consume queue and commit log.

4.1,Commit Log

This is where messages are actually stored.Messages from all RocketMQ producers go to this place, and commitlog s on each broker are shared by all queue s on this machine without any distinction.

4.2,Consume Queue

This is a logical queue.This corresponds to the messageQueue under Topic above.Consumers deal directly with ConstumeQueue.ConsumeQueue records the consumer location associated with the location of the commitlog.So even if ConsumeQueue has a problem, as long as the commitlog is still there, the message is not lost and can be recovered.You can also reproduce or skip some messages by modifying the consumer location.

5. Basic Application Example demo

Spring Integration rocketmq
Pom Dependency

        <!-- rocketmq Dependent Packages -->



Xml Connection Configuration

	<!-- This is the producer -->
	<bean id="defaultMQProducer" class="org.apache.rocketmq.client.producer.DefaultMQProducer"
		init-method="start" destroy-method="shutdown">
		<property name="producerGroup" value="ProducerGroud1" />
		<property name="namesrvAddr" value="" />
		<!-- Message Sending Failure Retries,Default is 2, which may cause duplicate messages -->
		<property name="retryTimesWhenSendFailed" value="2"></property>
		<!-- Message not persisted successfully sent to another broker,Default to false -->
		<property name="retryAnotherBrokerWhenNotStoreOK" value="true"></property>

RocketMQ provides three ways to send messages: synchronous, asynchronous, and one-way
* Synchronized Send Synchronized Send means that the next packet is sent after the manufacturer sends the data and receives the broker's response.

private DefaultMQProducer defaultMQProducer;
     * Synchronized reliable transmission, mainly for important message reminders, SMS reminders, SMS marketing systems, etc. send methods are set to CommunicationMode.SYNC
     * Synchronous sending means that producer will not return immediately after sending a message and will wait for broker's response
    public String sendMessagesSynchronously() throws Exception {
        for (int i = 0; i < 1; i++) {
            // Create a message instance specifying the subject, label, and message body
            Message msg = new Message("Topict", "TagA", ("Hello RocketMQ " + i).getBytes());
            // Send a message to a broker in brokers
            SendResult sendResult = defaultMQProducer.send(msg);
        return "Synchronized Send Successful";

* Asynchronous Send Asynchronous Send refers to the producer sending out the data, waiting for the broker's response, and then sending the next packet.

     * Asynchronous reliable transport, mostly used in response time sensitive business scenarios where the send method is set to CommunicationMode.ASYNC
    public String sendMessagesAsynchronously() throws Exception {
        for (int i = 0; i < 2; i++) {
            final int index = i;
            // Create a message instance specifying the subject, label, and message body.
            Message msg = new Message("Topic2", "TagA", "OrderID1", "Hello world".getBytes());
            defaultMQProducer2.send(msg, new SendCallback()
                public void onSuccess(SendResult sendResult) {
                    System.out.println(index + "  " + sendResult.getMsgId());

                public void onException(Throwable e) {
                    System.out.println(index + "  " + "error");
        return "Asynchronous Send Successful";

* One-way Send One-way Send refers to sending messages without waiting for roker's response and without callback function triggering.

     * One-way transport, often used in reliable and stable business, such as log collection
    public String sendMessagesinOnewayMode() throws Exception {
        for (int i = 0; i < 2; i++) {
            // Create a message instance specifying the subject, label, and message body.
            Message msg = new Message("Topic3", "TagA", ("Hello RocketMQ " + i).getBytes());
            // Send a message to a broker in brokers
        return "One-way Send Successful";

Define consumers, set up consumer groups, NameServer addresses, etc. RocketMQ provides two modes of consumption, PUSH and PUL. Most scenarios use PUSH mode, which correspond to DefaultMQPushConsumer class and DefaultMQPullConsumer class respectively. PUSH mode is actually implemented internally or in the form of PULL, through PULL continuous wheelsAsk Broker for a message. In Push mode, Consumer encapsulates the polling process, registers the MessageListener listener, and wakes up consumeMessage() in the MessageListener listener to consume the message.

<!-- Configure listeners -->
	<bean id="registerMessageListener" class="com.epoint.rocketmq.spring.Consumer"></bean>
	<!-- Configure consumers -->
 	<bean id="PushConsumer"
		init-method="start" destroy-method="shutdown">
		<property name="consumerGroup" value="CID_1" />
		<property name="namesrvAddr" value="" />
		<property name="messageListener" ref="registerMessageListener" />
		<property name="subscription">
				<!-- topic theme -->
				<entry key="Topic">
					<!-- Subscribe TAG,Consumers will receive TAGA or TAGB or TAGC Message, TAG The design basically meets most of the requirements, and for complex cases, you can use SQL92 Expression Filter Message
					Reference to official documents -->
					<!-- <value>TAGA || TAGB || TAGC</value> -->

For the consumption retry mechanism, you can choose one of the two default consumption retries shown below to retry 16 times. If both fail, they will enter the dead letter queue.

If you want to intervene manually, you can choose to retry three times, do not retry again, return to success, and save the message separately (consumeMessageBatchMaxSize needs to be the default 1, it is not recommended to modify the secondary parameter).

Sequential consumption requires the former to consume successfully before continuing, so without this state of RECONSUME_LATER, only SUSPEND_CURRENT_QUEUE_A_MOMENT suspends the remaining consumption of the queue until the original message keeps retrying successfully.

public class Consumer implements MessageListenerConcurrently
   public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
           ConsumeConcurrentlyContext consumeConcurrentlyContext) {

       // Consumer News
       for (MessageExt me : list) {
           System.out.print("msg=" + new String(me.getBody()) + "\n");
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

6. Summary

Distributed transactions are essentially transactions with multiple databases under unified control, which can be divided into uncontrollable, partial and full control according to the degree of control.Either uncontrol or no distributed transaction is introduced. Partial control is a two-phase commit of various variants, including the message transaction + final consistency, TCC mode mentioned above. Full control is a two-phase commit.The benefits of partial control are good concurrency and performance. The disadvantage is that data consistency is weakened. Full control sacrifices performance and guarantees consistency, depending on the business scenario.

7. Follow-up

With the release of RocketMQ 4.3.0, the feature of transactional messages has been opened up. For distributed transactions, the most common term is the two-phase commit protocol. The next article will detail how to use RocketMQ to implement distributed transactional messages.

Published 3 original articles, won 1. Visits 85
Private letter follow

Tags: Apache Spring xml

Posted on Wed, 15 Jan 2020 19:13:32 -0800 by chrisdarby