PullConsumer Message Pull Source Analysis in RocketMQ

RocketMQ provides many API s for pulling messages in PullConsumer, but in general there are two kinds: synchronous and asynchronous message pulling.


Synchronized message pull-out
The pullSyncImpl method of DefaultMQPullConsumerImpl is used to pull and cancel messages synchronously:

 1 private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block,
 2     long timeout)
 3     throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
 4     this.makeSureStateOK();
 5 
 6     if (null == mq) {
 7         throw new MQClientException("mq is null", null);
 8     }
 9 
10     if (offset < 0) {
11         throw new MQClientException("offset < 0", null);
12     }
13 
14     if (maxNums <= 0) {
15         throw new MQClientException("maxNums <= 0", null);
16     }
17 
18     this.subscriptionAutomatically(mq.getTopic());
19 
20     int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
21 
22     long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
23 
24     boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
25     PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
26         mq,
27         subscriptionData.getSubString(),
28         subscriptionData.getExpressionType(),
29         isTagType ? 0L : subscriptionData.getSubVersion(),
30         offset,
31         maxNums,
32         sysFlag,
33         0,
34         this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
35         timeoutMillis,
36         CommunicationMode.SYNC,
37         null
38     );
39     this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
40     if (!this.consumeMessageHookList.isEmpty()) {
41         ConsumeMessageContext consumeMessageContext = null;
42         consumeMessageContext = new ConsumeMessageContext();
43         consumeMessageContext.setConsumerGroup(this.groupName());
44         consumeMessageContext.setMq(mq);
45         consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
46         consumeMessageContext.setSuccess(false);
47         this.executeHookBefore(consumeMessageContext);
48         consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
49         consumeMessageContext.setSuccess(true);
50         this.executeHookAfter(consumeMessageContext);
51     }
52     return pullResult;
53 }

First check whether Topic subscribes by subscriptionAutomatically method

 

 1 public void subscriptionAutomatically(final String topic) {
 2     if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {
 3         try {
 4             SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
 5                 topic, SubscriptionData.SUB_ALL);
 6             this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData);
 7         } catch (Exception ignore) {
 8         }
 9     }
10 }

If not, create a new subscription data and save it in subscription Inner of rebalanceImpl


The pullKernelImpl method is then called:

 1 public PullResult pullKernelImpl(
 2     final MessageQueue mq,
 3     final String subExpression,
 4     final String expressionType,
 5     final long subVersion,
 6     final long offset,
 7     final int maxNums,
 8     final int sysFlag,
 9     final long commitOffset,
10     final long brokerSuspendMaxTimeMillis,
11     final long timeoutMillis,
12     final CommunicationMode communicationMode,
13     final PullCallback pullCallback
14 ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
15     FindBrokerResult findBrokerResult =
16         this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
17             this.recalculatePullFromWhichNode(mq), false);
18     if (null == findBrokerResult) {
19         this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
20         findBrokerResult =
21             this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
22                 this.recalculatePullFromWhichNode(mq), false);
23     }
24 
25     if (findBrokerResult != null) {
26         {
27             // check version
28             if (!ExpressionType.isTagType(expressionType)
29                 && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
30                 throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
31                     + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
32             }
33         }
34         int sysFlagInner = sysFlag;
35 
36         if (findBrokerResult.isSlave()) {
37             sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
38         }
39 
40         PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
41         requestHeader.setConsumerGroup(this.consumerGroup);
42         requestHeader.setTopic(mq.getTopic());
43         requestHeader.setQueueId(mq.getQueueId());
44         requestHeader.setQueueOffset(offset);
45         requestHeader.setMaxMsgNums(maxNums);
46         requestHeader.setSysFlag(sysFlagInner);
47         requestHeader.setCommitOffset(commitOffset);
48         requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
49         requestHeader.setSubscription(subExpression);
50         requestHeader.setSubVersion(subVersion);
51         requestHeader.setExpressionType(expressionType);
52 
53         String brokerAddr = findBrokerResult.getBrokerAddr();
54         if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
55             brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
56         }
57 
58         PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
59             brokerAddr,
60             requestHeader,
61             timeoutMillis,
62             communicationMode,
63             pullCallback);
64 
65         return pullResult;
66     }
67 
68     throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
69 }

First, find Broker information about message queues by findBrokerAddressInSubscribe method


Here's the recalculatePullFromWhichNode method:

 1 public long recalculatePullFromWhichNode(final MessageQueue mq) {
 2     if (this.isConnectBrokerByUser()) {
 3         return this.defaultBrokerId;
 4     }
 5 
 6     AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
 7     if (suggest != null) {
 8         return suggest.get();
 9     }
10 
11     return MixAll.MASTER_ID;
12 }

According to the message queue, find the ID of the corresponding Broker in pullFromWhichNodeTable
pullFromWhichNodeTable records the mapping of message pairs to BrokerID

1 private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
2         new ConcurrentHashMap<MessageQueue, AtomicLong>(32);

(master's BrokerID is 0, slave's BrokerID is greater than 0)

 

findBrokerAddressInSubscribe method:

 1 public FindBrokerResult findBrokerAddressInSubscribe(
 2     final String brokerName,
 3     final long brokerId,
 4     final boolean onlyThisBroker
 5 ) {
 6     String brokerAddr = null;
 7     boolean slave = false;
 8     boolean found = false;
 9 
10     HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
11     if (map != null && !map.isEmpty()) {
12         brokerAddr = map.get(brokerId);
13         slave = brokerId != MixAll.MASTER_ID;
14         found = brokerAddr != null;
15 
16         if (!found && !onlyThisBroker) {
17             Entry<Long, String> entry = map.entrySet().iterator().next();
18             brokerAddr = entry.getValue();
19             slave = entry.getKey() != MixAll.MASTER_ID;
20             found = true;
21         }
22     }
23 
24     if (found) {
25         return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
26     }
27 
28     return null;
29 }

Here, the Broker AddrTable table is used to find the address information of the Broker corresponding to the Broker ID and whether it is slave or not.
Encapsulated as FindBrokerResult Return


If the Broker's routing information is not found, the update is requested from NameServer through the updateTopicRouteInfoFromNameServer method, and the findBrokerAddressInSubscribe method is called after the update is completed.


The request header PullMessageRequest Header is then encapsulated according to the corresponding information.

Then call the pullMessage method:

 1 public PullResult pullMessage(
 2     final String addr,
 3     final PullMessageRequestHeader requestHeader,
 4     final long timeoutMillis,
 5     final CommunicationMode communicationMode,
 6     final PullCallback pullCallback
 7 ) throws RemotingException, MQBrokerException, InterruptedException {
 8     RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
 9 
10     switch (communicationMode) {
11         case ONEWAY:
12             assert false;
13             return null;
14         case ASYNC:
15             this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
16             return null;
17         case SYNC:
18             return this.pullMessageSync(addr, request, timeoutMillis);
19         default:
20             assert false;
21             break;
22     }
23 
24     return null;
25 }

Here you can see the two types I mentioned earlier, synchronous pull and asynchronous pull.


pullMessageSync method:

1 private PullResult pullMessageSync(
2     final String addr,
3     final RemotingCommand request,
4     final long timeoutMillis
5 ) throws RemotingException, InterruptedException, MQBrokerException {
6     RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
7     assert response != null;
8     return this.processPullResponse(response);
9 }

In fact, this is through invokeSync method, sent synchronously by Netty, and sent the request to Broker.
For more information on the sending of the message, see ____________

[Source code analysis of Producer message in RocketMQ]

 

Processing by the processPullResponse method after receiving the response
ProcePullResponse method:

 1 private PullResult processPullResponse(
 2     final RemotingCommand response) throws MQBrokerException, RemotingCommandException {
 3     PullStatus pullStatus = PullStatus.NO_NEW_MSG;
 4     switch (response.getCode()) {
 5         case ResponseCode.SUCCESS:
 6             pullStatus = PullStatus.FOUND;
 7             break;
 8         case ResponseCode.PULL_NOT_FOUND:
 9             pullStatus = PullStatus.NO_NEW_MSG;
10             break;
11         case ResponseCode.PULL_RETRY_IMMEDIATELY:
12             pullStatus = PullStatus.NO_MATCHED_MSG;
13             break;
14         case ResponseCode.PULL_OFFSET_MOVED:
15             pullStatus = PullStatus.OFFSET_ILLEGAL;
16             break;
17 
18         default:
19             throw new MQBrokerException(response.getCode(), response.getRemark());
20     }
21 
22     PullMessageResponseHeader responseHeader =
23         (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
24 
25     return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
26         responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
27 }

Depending on the state of the response, set the PullStatus state

Then the information in the response is decoded by decodeCommand CustomHeader method.
Finally, message information is encapsulated by PullResultExt

 1 public class PullResultExt extends PullResult {
 2     private final long suggestWhichBrokerId;
 3     private byte[] messageBinary;
 4 
 5     public PullResultExt(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset,
 6         List<MessageExt> msgFoundList, final long suggestWhichBrokerId, final byte[] messageBinary) {
 7         super(pullStatus, nextBeginOffset, minOffset, maxOffset, msgFoundList);
 8         this.suggestWhichBrokerId = suggestWhichBrokerId;
 9         this.messageBinary = messageBinary;
10     }
11     ......
12 }
13 
14 public class PullResult {
15     private final PullStatus pullStatus;
16     private final long nextBeginOffset;
17     private final long minOffset;
18     private final long maxOffset;
19     private List<MessageExt> msgFoundList;
20     
21     public PullResult(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset,
22         List<MessageExt> msgFoundList) {
23         super();
24         this.pullStatus = pullStatus;
25         this.nextBeginOffset = nextBeginOffset;
26         this.minOffset = minOffset;
27         this.maxOffset = maxOffset;
28         this.msgFoundList = msgFoundList;
29     }
30     ......
31 }

The message retrieved may be multiple. The specific content is saved in the msgFoundList in PullResult. MessageExt is a superclass of Message.

 

Back to the pullSyncImpl method, after pulling the message, the processPullResult method is called:

 1 public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
 2     final SubscriptionData subscriptionData) {
 3     PullResultExt pullResultExt = (PullResultExt) pullResult;
 4 
 5     this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
 6     if (PullStatus.FOUND == pullResult.getPullStatus()) {
 7         ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
 8         List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
 9 
10         List<MessageExt> msgListFilterAgain = msgList;
11         if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
12             msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
13             for (MessageExt msg : msgList) {
14                 if (msg.getTags() != null) {
15                     if (subscriptionData.getTagsSet().contains(msg.getTags())) {
16                         msgListFilterAgain.add(msg);
17                     }
18                 }
19             }
20         }
21 
22         if (this.hasHook()) {
23             FilterMessageContext filterMessageContext = new FilterMessageContext();
24             filterMessageContext.setUnitMode(unitMode);
25             filterMessageContext.setMsgList(msgListFilterAgain);
26             this.executeHook(filterMessageContext);
27         }
28 
29         for (MessageExt msg : msgListFilterAgain) {
30             String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
31             if (traFlag != null && Boolean.parseBoolean(traFlag)) {
32                 msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
33             }
34             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
35                 Long.toString(pullResult.getMinOffset()));
36             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
37                 Long.toString(pullResult.getMaxOffset()));
38         }
39 
40         pullResultExt.setMsgFoundList(msgListFilterAgain);
41     }
42 
43     pullResultExt.setMessageBinary(null);
44 
45     return pullResult;
46 }

First, call the updatePullFromWhichNode method:

1 public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {
2    AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
3     if (null == suggest) {
4         this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
5     } else {
6         suggest.set(brokerId);
7     }
8 }

The mapping of message queues and BrokerID recorded in pullFromWhichNodeTable is updated to the recommended ID sent by Broker.
Combining with the previous blog, if we adopt cluster mode, we can achieve load balancing on the consumer side.


In the case of PullStatus.FOUND, the decodes method of MessageDecoder is invoked to decode message data in CommitLog format into truly readable messages.

Tag is then judged, Tag is set, and Tag message records are added.

After that, when the filterMessage Hook hook is set, the filter Message method of the filterMessage Hook hook is executed through the executeHook method:

 1 public void executeHook(final FilterMessageContext context) {
 2     if (!this.filterMessageHookList.isEmpty()) {
 3         for (FilterMessageHook hook : this.filterMessageHookList) {
 4             try {
 5                 hook.filterMessage(context);
 6             } catch (Throwable e) {
 7                 log.error("execute hook error. hookName={}", hook.hookName());
 8             }
 9         }
10     }
11 }

Then set the properties of the message


When processPullResult is completed, if the ConsumeMessageHook hook is set, the executeHookBefore and executeHookAfter methods are called to execute the consumeMessageBefore and consumeMessageAfter methods in the hook, respectively:

 1 public void executeHookBefore(final ConsumeMessageContext context) {
 2     if (!this.consumeMessageHookList.isEmpty()) {
 3         for (ConsumeMessageHook hook : this.consumeMessageHookList) {
 4             try {
 5                 hook.consumeMessageBefore(context);
 6             } catch (Throwable ignored) {
 7             }
 8         }
 9     }
10 }
11 
12 public void executeHookAfter(final ConsumeMessageContext context) {
13     if (!this.consumeMessageHookList.isEmpty()) {
14         for (ConsumeMessageHook hook : this.consumeMessageHookList) {
15             try {
16                 hook.consumeMessageAfter(context);
17             } catch (Throwable ignored) {
18             }
19         }
20     }
21 }

This concludes the synchronous pull-out of PullConsumer messages

 


Asynchronous message fetching

Asynchronous pull API s are implemented by pullAsyncImpl method:

 1 private void pullAsyncImpl(
 2     final MessageQueue mq,
 3     final SubscriptionData subscriptionData,
 4     final long offset,
 5     final int maxNums,
 6     final PullCallback pullCallback,
 7     final boolean block,
 8     final long timeout) throws MQClientException, RemotingException, InterruptedException {
 9     this.makeSureStateOK();
10 
11     if (null == mq) {
12         throw new MQClientException("mq is null", null);
13     }
14 
15     if (offset < 0) {
16         throw new MQClientException("offset < 0", null);
17     }
18 
19     if (maxNums <= 0) {
20         throw new MQClientException("maxNums <= 0", null);
21     }
22 
23     if (null == pullCallback) {
24         throw new MQClientException("pullCallback is null", null);
25     }
26 
27     this.subscriptionAutomatically(mq.getTopic());
28 
29     try {
30         int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
31 
32         long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
33 
34         boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
35         this.pullAPIWrapper.pullKernelImpl(
36             mq,
37             subscriptionData.getSubString(),
38             subscriptionData.getExpressionType(),
39             isTagType ? 0L : subscriptionData.getSubVersion(),
40             offset,
41             maxNums,
42             sysFlag,
43             0,
44             this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
45             timeoutMillis,
46             CommunicationMode.ASYNC,
47             new PullCallback() {
48 
49                 @Override
50                 public void onSuccess(PullResult pullResult) {
51                     pullCallback
52                         .onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData));
53                 }
54 
55                 @Override
56                 public void onException(Throwable e) {
57                     pullCallback.onException(e);
58                 }
59             });
60     } catch (MQBrokerException e) {
61         throw new MQClientException("pullAsync unknow exception", e);
62     }
63 }

Compared with synchronization, the parameter has one more PullCallback, which is used to handle callbacks after asynchronous pull-out.


The process is basically a synchronous pull-out similar, except that when the pullKernelImpl method is called, a PullCallback is created.
In onSuccess and onException, the corresponding method of pullCallback is actually invoked, thus completing the asynchronous callback.

In the parameters of onSuccess callback, similar to synchronization, the results are further processed by the processPullResult method.


Later pullKernelImpl methods are the same as synchronization

Only the pullMessageAsync method is finally called:

 1 private void pullMessageAsync(
 2     final String addr,
 3     final RemotingCommand request,
 4     final long timeoutMillis,
 5     final PullCallback pullCallback
 6 ) throws RemotingException, InterruptedException {
 7     this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
 8         @Override
 9         public void operationComplete(ResponseFuture responseFuture) {
10             RemotingCommand response = responseFuture.getResponseCommand();
11             if (response != null) {
12                 try {
13                     PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
14                     assert pullResult != null;
15                     pullCallback.onSuccess(pullResult);
16                 } catch (Exception e) {
17                     pullCallback.onException(e);
18                 }
19             } else {
20                 if (!responseFuture.isSendRequestOK()) {
21                     pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
22                 } else if (responseFuture.isTimeout()) {
23                     pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
24                         responseFuture.getCause()));
25                 } else {
26                     pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
27                 }
28             }
29         }
30     });
31 }

This is actually done asynchronously via Netty
See:

[Source code analysis of Producer message in RocketMQ]

 


Since it is asynchronous, a callback InvokeCallback is set up here.
When the request is sent and the response is received, the operation Complete method of InvokeCallback is executed.

In the operationComplete method, as in synchronization, the processPullResponse method is executed to process the response
Then the onSuccess method of pullCallback, which is the callback interface just created, is called to execute the callback interface passed in by the user.


This is where the asynchronous pulling of messages ends.

Tags: Java Netty

Posted on Tue, 13 Aug 2019 05:35:01 -0700 by ainoy31