Erlo

RocketMQ中PullConsumer的消息拉取源码分析

2019-08-13 17:02:33 发布   381 浏览  
页面报错/反馈
收藏 点赞

在PullConsumer中,有关消息的拉取RocketMQ提供了很多API,但总的来说分为两种,同步消息拉取和异步消息拉取


同步消息拉取
以同步方式拉取消息都是通过DefaultMQPullConsumerImpl的pullSyncImpl方法:

 1 private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block,
 2     long timeout)
 3     throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
 4     this.makeSureStateOK();
 5 
 6     if (null == mq) {
 7         throw new MQClientException("mq is null", null);
 8     }
 9 
10     if (offset < 0) {
11         throw new MQClientException("offset < 0", null);
12     }
13 
14     if (maxNums <= 0) {
15         throw new MQClientException("maxNums <= 0", null);
16     }
17 
18     this.subscriptionAutomatically(mq.getTopic());
19 
20     int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
21 
22     long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
23 
24     boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
25     PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
26         mq,
27         subscriptionData.getSubString(),
28         subscriptionData.getExpressionType(),
29         isTagType ? 0L : subscriptionData.getSubVersion(),
30         offset,
31         maxNums,
32         sysFlag,
33         0,
34         this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
35         timeoutMillis,
36         CommunicationMode.SYNC,
37         null
38     );
39     this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
40     if (!this.consumeMessageHookList.isEmpty()) {
41         ConsumeMessageContext consumeMessageContext = null;
42         consumeMessageContext = new ConsumeMessageContext();
43         consumeMessageContext.setConsumerGroup(this.groupName());
44         consumeMessageContext.setMq(mq);
45         consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
46         consumeMessageContext.setSuccess(false);
47         this.executeHookBefore(consumeMessageContext);
48         consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
49         consumeMessageContext.setSuccess(true);
50         this.executeHookAfter(consumeMessageContext);
51     }
52     return pullResult;
53 }

首先通过subscriptionAutomatically方法检查Topic是否订阅

 

 1 public void subscriptionAutomatically(final String topic) {
 2     if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {
 3         try {
 4             SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
 5                 topic, SubscriptionData.SUB_ALL);
 6             this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData);
 7         } catch (Exception ignore) {
 8         }
 9     }
10 }

若是没有就新建一条订阅数据保存在rebalanceImpl的subscriptionInner中


之后调用pullKernelImpl方法:

 1 public PullResult pullKernelImpl(
 2     final MessageQueue mq,
 3     final String subExpression,
 4     final String expressionType,
 5     final long subVersion,
 6     final long offset,
 7     final int maxNums,
 8     final int sysFlag,
 9     final long commitOffset,
10     final long brokerSuspendMaxTimeMillis,
11     final long timeoutMillis,
12     final CommunicationMode communicationMode,
13     final PullCallback pullCallback
14 ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
15     FindBrokerResult findBrokerResult =
16         this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
17             this.recalculatePullFromWhichNode(mq), false);
18     if (null == findBrokerResult) {
19         this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
20         findBrokerResult =
21             this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
22                 this.recalculatePullFromWhichNode(mq), false);
23     }
24 
25     if (findBrokerResult != null) {
26         {
27             // check version
28             if (!ExpressionType.isTagType(expressionType)
29                 && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
30                 throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
31                     + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
32             }
33         }
34         int sysFlagInner = sysFlag;
35 
36         if (findBrokerResult.isSlave()) {
37             sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
38         }
39 
40         PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
41         requestHeader.setConsumerGroup(this.consumerGroup);
42         requestHeader.setTopic(mq.getTopic());
43         requestHeader.setQueueId(mq.getQueueId());
44         requestHeader.setQueueOffset(offset);
45         requestHeader.setMaxMsgNums(maxNums);
46         requestHeader.setSysFlag(sysFlagInner);
47         requestHeader.setCommitOffset(commitOffset);
48         requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
49         requestHeader.setSubscription(subExpression);
50         requestHeader.setSubVersion(subVersion);
51         requestHeader.setExpressionType(expressionType);
52 
53         String brokerAddr = findBrokerResult.getBrokerAddr();
54         if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
55             brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
56         }
57 
58         PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
59             brokerAddr,
60             requestHeader,
61             timeoutMillis,
62             communicationMode,
63             pullCallback);
64 
65         return pullResult;
66     }
67 
68     throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
69 }

首先通过findBrokerAddressInSubscribe方法查找关于消息队列的Broker信息


这里的recalculatePullFromWhichNode方法:

 1 public long recalculatePullFromWhichNode(final MessageQueue mq) {
 2     if (this.isConnectBrokerByUser()) {
 3         return this.defaultBrokerId;
 4     }
 5 
 6     AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
 7     if (suggest != null) {
 8         return suggest.get();
 9     }
10 
11     return MixAll.MASTER_ID;
12 }

根据消息队列,在pullFromWhichNodeTable查找其对应的Broker的ID
pullFromWhichNodeTable记录了消息对了和BrokerID的映射

1 private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
2         new ConcurrentHashMap<MessageQueue, AtomicLong>(32);

(master的BrokerID为0,slave的BrokerID大于0)

 

findBrokerAddressInSubscribe方法:

 1 public FindBrokerResult findBrokerAddressInSubscribe(
 2     final String brokerName,
 3     final long brokerId,
 4     final boolean onlyThisBroker
 5 ) {
 6     String brokerAddr = null;
 7     boolean slave = false;
 8     boolean found = false;
 9 
10     HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
11     if (map != null && !map.isEmpty()) {
12         brokerAddr = map.get(brokerId);
13         slave = brokerId != MixAll.MASTER_ID;
14         found = brokerAddr != null;
15 
16         if (!found && !onlyThisBroker) {
17             Entry<Long, String> entry = map.entrySet().iterator().next();
18             brokerAddr = entry.getValue();
19             slave = entry.getKey() != MixAll.MASTER_ID;
20             found = true;
21         }
22     }
23 
24     if (found) {
25         return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
26     }
27 
28     return null;
29 }

这里就根据brokerAddrTable表查找该BrokerID对应的Broker的地址信息,以及是否是slave
封装为FindBrokerResult返回


若是没有找到Broker的路由信息,则通过updateTopicRouteInfoFromNameServer方法向NameServer请求更新,更新完成后再调用findBrokerAddressInSubscribe方法查找


之后会根据相应的信息封装请求消息头PullMessageRequestHeader

然后调用pullMessage方法:

 1 public PullResult pullMessage(
 2     final String addr,
 3     final PullMessageRequestHeader requestHeader,
 4     final long timeoutMillis,
 5     final CommunicationMode communicationMode,
 6     final PullCallback pullCallback
 7 ) throws RemotingException, MQBrokerException, InterruptedException {
 8     RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
 9 
10     switch (communicationMode) {
11         case ONEWAY:
12             assert false;
13             return null;
14         case ASYNC:
15             this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
16             return null;
17         case SYNC:
18             return this.pullMessageSync(addr, request, timeoutMillis);
19         default:
20             assert false;
21             break;
22     }
23 
24     return null;
25 }

这里就可以看出我前面说的两种类型,同步拉取和异步拉取


pullMessageSync方法:

1 private PullResult pullMessageSync(
2     final String addr,
3     final RemotingCommand request,
4     final long timeoutMillis
5 ) throws RemotingException, InterruptedException, MQBrokerException {
6     RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
7     assert response != null;
8     return this.processPullResponse(response);
9 }

这里其实就是通过invokeSync方法,由Netty进行同步发送,将请求发送给Broker
关于消息的发送详见:

【RocketMQ中Producer消息的发送源码分析】

 

在收到响应后由processPullResponse方法处理
processPullResponse方法:

 1 private PullResult processPullResponse(
 2     final RemotingCommand response) throws MQBrokerException, RemotingCommandException {
 3     PullStatus pullStatus = PullStatus.NO_NEW_MSG;
 4     switch (response.getCode()) {
 5         case ResponseCode.SUCCESS:
 6             pullStatus = PullStatus.FOUND;
 7             break;
 8         case ResponseCode.PULL_NOT_FOUND:
 9             pullStatus = PullStatus.NO_NEW_MSG;
10             break;
11         case ResponseCode.PULL_RETRY_IMMEDIATELY:
12             pullStatus = PullStatus.NO_MATCHED_MSG;
13             break;
14         case ResponseCode.PULL_OFFSET_MOVED:
15             pullStatus = PullStatus.OFFSET_ILLEGAL;
16             break;
17 
18         default:
19             throw new MQBrokerException(response.getCode(), response.getRemark());
20     }
21 
22     PullMessageResponseHeader responseHeader =
23         (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
24 
25     return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
26         responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
27 }

根据响应的状态,设置PullStatus状态

然后通过decodeCommandCustomHeader方法,将响应中的信息解码
最后由PullResultExt封装消息信息

 1 public class PullResultExt extends PullResult {
 2     private final long suggestWhichBrokerId;
 3     private byte[] messageBinary;
 4 
 5     public PullResultExt(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset,
 6         List<MessageExt> msgFoundList, final long suggestWhichBrokerId, final byte[] messageBinary) {
 7         super(pullStatus, nextBeginOffset, minOffset, maxOffset, msgFoundList);
 8         this.suggestWhichBrokerId = suggestWhichBrokerId;
 9         this.messageBinary = messageBinary;
10     }
11     ......
12 }
13 
14 public class PullResult {
15     private final PullStatus pullStatus;
16     private final long nextBeginOffset;
17     private final long minOffset;
18     private final long maxOffset;
19     private List<MessageExt> msgFoundList;
20     
21     public PullResult(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset,
22         List<MessageExt> msgFoundList) {
23         super();
24         this.pullStatus = pullStatus;
25         this.nextBeginOffset = nextBeginOffset;
26         this.minOffset = minOffset;
27         this.maxOffset = maxOffset;
28         this.msgFoundList = msgFoundList;
29     }
30     ......
31 }

拉取到的消息可能是多条,具体内容在PullResult 中的msgFoundList保存,MessageExt是Message的超类

 

回到pullSyncImpl方法,在拉取到消息后,调用processPullResult方法:

 1 public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
 2     final SubscriptionData subscriptionData) {
 3     PullResultExt pullResultExt = (PullResultExt) pullResult;
 4 
 5     this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
 6     if (PullStatus.FOUND == pullResult.getPullStatus()) {
 7         ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
 8         List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
 9 
10         List<MessageExt> msgListFilterAgain = msgList;
11         if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
12             msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
13             for (MessageExt msg : msgList) {
14                 if (msg.getTags() != null) {
15                     if (subscriptionData.getTagsSet().contains(msg.getTags())) {
16                         msgListFilterAgain.add(msg);
17                     }
18                 }
19             }
20         }
21 
22         if (this.hasHook()) {
23             FilterMessageContext filterMessageContext = new FilterMessageContext();
24             filterMessageContext.setUnitMode(unitMode);
25             filterMessageContext.setMsgList(msgListFilterAgain);
26             this.executeHook(filterMessageContext);
27         }
28 
29         for (MessageExt msg : msgListFilterAgain) {
30             String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
31             if (traFlag != null && Boolean.parseBoolean(traFlag)) {
32                 msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
33             }
34             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
35                 Long.toString(pullResult.getMinOffset()));
36             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
登录查看全部

参与评论

评论留言

还没有评论留言,赶紧来抢楼吧~~

手机查看

返回顶部

给这篇文章打个标签吧~

棒极了 糟糕透顶 好文章 PHP JAVA JS 小程序 Python SEO MySql 确认