RocketMQ -- message configuration

quote: https://help.aliyun.com/document_detail/29532.html

Message retry

Production message retry

Retry of production message may cause duplicate message

//Send timeout, default is 3000
producer.setSendMsgTimeout(6000); 

//Retry times may cause message repetition
producer.setRetryTimesWhenSendAsyncFailed(2);
producer.setRetryTimesWhenSendFailed(2);

//Try to use another broker when sending fails; default is false
producer.setRetryAnotherBrokerWhenNotStoreOK(false);

Consumer message retry

Retry of sequence message
For sequential messages, when the consumer fails to consume the message, RocketMQ will automatically retry the message continuously (the interval time is one second each time). At this time, the application will encounter the situation that the message consumption is blocked.
Therefore, when using sequential messages, it is recommended to ensure that the application can monitor and handle consumption failure in time to avoid blocking.

Retry of unordered message
For unordered messages (normal, timing, delay, transaction messages), when the consumer fails to consume the message, you can achieve the result of message retry by setting the return status.
The retry of unordered messages only takes effect for the cluster consumption mode; the broadcast mode does not provide the failure retry feature, that is, after the consumption fails, the failure message will not be retried and the new message will continue to be consumed.

retry count
Message queuing RocketMQ allows at most 16 retries for each message by default. The interval between retries is as follows:

If the message fails after 16 retries, it will not be delivered.
No matter how many times a message is retried, the Message ID of these retried messages will not change.

to configure
In the cluster consumption mode, if message consumption fails and message retry is expected, it needs to be explicitly configured in the implementation of message listener interface (one of the three modes is optional):

  • return Action.ReconsumeLater (recommended)
  • Return Null
  • Throw exception
	consumer.registerMessageListener(new MessageListenerConcurrently() {
			@Override
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
			                                                ConsumeConcurrentlyContext context) {
				//todo processes the message -- service
				
			    //Mode 1: Return Action.ReconsumeLater , message will retry
		        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
			    //Mode 2: return null, message will try again
				return null;
				//Mode 3: throw an exception directly and the message will be retried
				throw new RuntimeException("Consumer Message exceotion");
		}

If you expect that even if the message processing fails, you do not expect to retry, you can return consu by eating the exception meConcurrentlyStatus.CONSUME_ Success; yes.

Custom message maximum retries

		/**
		 * If the maximum number of retries is less than or equal to 16, the retry interval is as described in the table above.
		 * The maximum number of retries is more than 16, and the interval between retries more than 16 is 2 hours each time.
		 */
		msg.putUserProperty(MessageConst.PROPERTY_MAX_RECONSUME_TIMES,"19");

Get message retries

		consumer.registerMessageListener(new MessageListenerConcurrently() {
			@Override
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
			                                                ConsumeConcurrentlyContext context) {
			
			//Number of retries to get the message
				msgs.get(0).getReconsumeTimes();
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
		});

Message filtering

MessageSelector.byTag

consumer.subscribe("topic1", "tag1 || tag2");
consumer.registerMessageListener(new MessageListenerConcurrently() {});

Because each message can only have one tag, this may not be suitable for complex scenarios.
You can use the following bySql

MessageSelector.bySql

RocketMQ supports the following syntax:

  • Number expression: >, > =, < and < =, between=
  • Character expression: =, < >, in, is null or is not null; - character must be enclosed in single quotation mark '
  • Logical expression: AND, OR, NOT

example

//producer configuration
Message msg = new Message("topic1",
		"tagA",
		("Hello RocketMQ "+i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
msg.putUserProperty("a", String.valueOf(i));//Set property a
SendResult sendResult = producer.send(msg);

//#####################################################

//consumer configuration
consumer.subscribe("topic1", MessageSelector.bySql("a between 0 and 3"));

broker opens filter

 ##conf/broker.conf  Add configuration:
 enablePropertyFilter=true

Best practices

Consumption idempotent

After receiving the message, it is necessary for RocketMQ consumers to idempotent the message according to the unique Key in the service.

The necessity of consumption idempotent
In Internet applications, especially when the network is unstable, the messages of RocketMQ message queue may be repeated. This repetition can be simply summarized as the following two situations:

  • Duplicate message when sending:
    When a message has been successfully sent to the server and completed the persistence, there is a network flash or client downtime, which leads to the failure of the server response to the client. If the producer realizes that the message fails to be sent and tries to send the message again, the consumer will receive two messages with the same content and Message ID in the future.
  • Duplicate message during delivery:
    In the scenario of message consumption, the message has been delivered to the consumer and the business processing has been completed. When the client feeds back the response to the server, the network flashes. In order to ensure that the message is consumed at least once, the server of RocketMQ will try to deliver the processed message again after the network recovery. The consumer will receive two messages with the same content and Message ID in the future.
  • Duplicate messages during load balancing:
    When the Broker or client of the message queue RocketMQ restarts, expands or shrinks, Rebalance will be triggered, and the consumer may receive duplicate messages.

Handling suggestions
Because Message ID may conflict (duplicate), it is not recommended to use Message ID as the processing basis for truly safe idempotent processing. The best way is to use the unique business ID as the Key basis for idempotent processing. The unique business ID can be set through the message Key:

//send message
Message message = new Message();
message.setKeys("ORDERID_100");
SendResult sendResult = producer.send(message);

//#############

//receive messages
consumer.registerMessageListener(new MessageListenerConcurrently() {
	@Override
	public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
	                                                ConsumeConcurrentlyContext context) {
		for(MessageExt ext : msgs){
			String keys = ext.getKeys();
		}
		return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
	}
});

Tags: network less

Posted on Fri, 05 Jun 2020 23:13:00 -0700 by kankohi