在PullConsumer中,有关消息的拉取RocketMQ提供了很多API,但总的来说分为两种,同步消息拉取和异步消息拉取
同步消息拉取
以同步方式拉取消息都是通过DefaultMQPullConsumerImpl的pullSyncImpl方法:
1 private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block, 2 long timeout) 3 throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 4 this.makeSureStateOK(); 5 6 if (null == mq) { 7 throw new MQClientException("mq is null", null); 8 } 9 10 if (offset < 0) { 11 throw new MQClientException("offset < 0", null); 12 } 13 14 if (maxNums <= 0) { 15 throw new MQClientException("maxNums <= 0", null); 16 } 17 18 this.subscriptionAutomatically(mq.getTopic()); 19 20 int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false); 21 22 long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; 23 24 boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType()); 25 PullResult pullResult = this.pullAPIWrapper.pullKernelImpl( 26 mq, 27 subscriptionData.getSubString(), 28 subscriptionData.getExpressionType(), 29 isTagType ? 0L : subscriptionData.getSubVersion(), 30 offset, 31 maxNums, 32 sysFlag, 33 0, 34 this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), 35 timeoutMillis, 36 CommunicationMode.SYNC, 37 null 38 ); 39 this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData); 40 if (!this.consumeMessageHookList.isEmpty()) { 41 ConsumeMessageContext consumeMessageContext = null; 42 consumeMessageContext = new ConsumeMessageContext(); 43 consumeMessageContext.setConsumerGroup(this.groupName()); 44 consumeMessageContext.setMq(mq); 45 consumeMessageContext.setMsgList(pullResult.getMsgFoundList()); 46 consumeMessageContext.setSuccess(false); 47 this.executeHookBefore(consumeMessageContext); 48 consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString()); 49 consumeMessageContext.setSuccess(true); 50 this.executeHookAfter(consumeMessageContext); 51 } 52 return pullResult; 53 }
首先通过subscriptionAutomatically方法检查Topic是否订阅
1 public void subscriptionAutomatically(final String topic) { 2 if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) { 3 try { 4 SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), 5 topic, SubscriptionData.SUB_ALL); 6 this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData); 7 } catch (Exception ignore) { 8 } 9 } 10 }
若是没有就新建一条订阅数据保存在rebalanceImpl的subscriptionInner中
之后调用pullKernelImpl方法:
1 public PullResult pullKernelImpl( 2 final MessageQueue mq, 3 final String subExpression, 4 final String expressionType, 5 final long subVersion, 6 final long offset, 7 final int maxNums, 8 final int sysFlag, 9 final long commitOffset, 10 final long brokerSuspendMaxTimeMillis, 11 final long timeoutMillis, 12 final CommunicationMode communicationMode, 13 final PullCallback pullCallback 14 ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 15 FindBrokerResult findBrokerResult = 16 this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), 17 this.recalculatePullFromWhichNode(mq), false); 18 if (null == findBrokerResult) { 19 this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); 20 findBrokerResult = 21 this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), 22 this.recalculatePullFromWhichNode(mq), false); 23 } 24 25 if (findBrokerResult != null) { 26 { 27 // check version 28 if (!ExpressionType.isTagType(expressionType) 29 && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) { 30 throw new MQClientException("The broker[" + mq.getBrokerName() + ", " 31 + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null); 32 } 33 } 34 int sysFlagInner = sysFlag; 35 36 if (findBrokerResult.isSlave()) { 37 sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner); 38 } 39 40 PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); 41 requestHeader.setConsumerGroup(this.consumerGroup); 42 requestHeader.setTopic(mq.getTopic()); 43 requestHeader.setQueueId(mq.getQueueId()); 44 requestHeader.setQueueOffset(offset); 45 requestHeader.setMaxMsgNums(maxNums); 46 requestHeader.setSysFlag(sysFlagInner); 47 requestHeader.setCommitOffset(commitOffset); 48 requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); 49 requestHeader.setSubscription(subExpression); 50 requestHeader.setSubVersion(subVersion); 51 requestHeader.setExpressionType(expressionType); 52 53 String brokerAddr = findBrokerResult.getBrokerAddr(); 54 if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { 55 brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr); 56 } 57 58 PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( 59 brokerAddr, 60 requestHeader, 61 timeoutMillis, 62 communicationMode, 63 pullCallback); 64 65 return pullResult; 66 } 67 68 throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); 69 }
首先通过findBrokerAddressInSubscribe方法查找关于消息队列的Broker信息
这里的recalculatePullFromWhichNode方法:
1 public long recalculatePullFromWhichNode(final MessageQueue mq) { 2 if (this.isConnectBrokerByUser()) { 3 return this.defaultBrokerId; 4 } 5 6 AtomicLong suggest = this.pullFromWhichNodeTable.get(mq); 7 if (suggest != null) { 8 return suggest.get(); 9 } 10 11 return MixAll.MASTER_ID; 12 }
根据消息队列,在pullFromWhichNodeTable查找其对应的Broker的ID
pullFromWhichNodeTable记录了消息对了和BrokerID的映射
1 private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable = 2 new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
(master的BrokerID为0,slave的BrokerID大于0)
findBrokerAddressInSubscribe方法:
1 public FindBrokerResult findBrokerAddressInSubscribe( 2 final String brokerName, 3 final long brokerId, 4 final boolean onlyThisBroker 5 ) { 6 String brokerAddr = null; 7 boolean slave = false; 8 boolean found = false; 9 10 HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName); 11 if (map != null && !map.isEmpty()) { 12 brokerAddr = map.get(brokerId); 13 slave = brokerId != MixAll.MASTER_ID; 14 found = brokerAddr != null; 15 16 if (!found && !onlyThisBroker) { 17 Entry<Long, String> entry = map.entrySet().iterator().next(); 18 brokerAddr = entry.getValue(); 19 slave = entry.getKey() != MixAll.MASTER_ID; 20 found = true; 21 } 22 } 23 24 if (found) { 25 return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr)); 26 } 27 28 return null; 29 }
这里就根据brokerAddrTable表查找该BrokerID对应的Broker的地址信息,以及是否是slave
封装为FindBrokerResult返回
若是没有找到Broker的路由信息,则通过updateTopicRouteInfoFromNameServer方法向NameServer请求更新,更新完成后再调用findBrokerAddressInSubscribe方法查找
之后会根据相应的信息封装请求消息头PullMessageRequestHeader
然后调用pullMessage方法:
1 public PullResult pullMessage( 2 final String addr, 3 final PullMessageRequestHeader requestHeader, 4 final long timeoutMillis, 5 final CommunicationMode communicationMode, 6 final PullCallback pullCallback 7 ) throws RemotingException, MQBrokerException, InterruptedException { 8 RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); 9 10 switch (communicationMode) { 11 case ONEWAY: 12 assert false; 13 return null; 14 case ASYNC: 15 this.pullMessageAsync(addr, request, timeoutMillis, pullCallback); 16 return null; 17 case SYNC: 18 return this.pullMessageSync(addr, request, timeoutMillis); 19 default: 20 assert false; 21 break; 22 } 23 24 return null; 25 }
这里就可以看出我前面说的两种类型,同步拉取和异步拉取
pullMessageSync方法:
1 private PullResult pullMessageSync( 2 final String addr, 3 final RemotingCommand request, 4 final long timeoutMillis 5 ) throws RemotingException, InterruptedException, MQBrokerException { 6 RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); 7 assert response != null; 8 return this.processPullResponse(response); 9 }
这里其实就是通过invokeSync方法,由Netty进行同步发送,将请求发送给Broker
关于消息的发送详见:
在收到响应后由processPullResponse方法处理
processPullResponse方法:
1 private PullResult processPullResponse( 2 final RemotingCommand response) throws MQBrokerException, RemotingCommandException { 3 PullStatus pullStatus = PullStatus.NO_NEW_MSG; 4 switch (response.getCode()) { 5 case ResponseCode.SUCCESS: 6 pullStatus = PullStatus.FOUND; 7 break; 8 case ResponseCode.PULL_NOT_FOUND: 9 pullStatus = PullStatus.NO_NEW_MSG; 10 break; 11 case ResponseCode.PULL_RETRY_IMMEDIATELY: 12 pullStatus = PullStatus.NO_MATCHED_MSG; 13 break; 14 case ResponseCode.PULL_OFFSET_MOVED: 15 pullStatus = PullStatus.OFFSET_ILLEGAL; 16 break; 17 18 default: 19 throw new MQBrokerException(response.getCode(), response.getRemark()); 20 } 21 22 PullMessageResponseHeader responseHeader = 23 (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class); 24 25 return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(), 26 responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody()); 27 }
根据响应的状态,设置PullStatus状态
然后通过decodeCommandCustomHeader方法,将响应中的信息解码
最后由PullResultExt封装消息信息
1 public class PullResultExt extends PullResult { 2 private final long suggestWhichBrokerId; 3 private byte[] messageBinary; 4 5 public PullResultExt(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset, 6 List<MessageExt> msgFoundList, final long suggestWhichBrokerId, final byte[] messageBinary) { 7 super(pullStatus, nextBeginOffset, minOffset, maxOffset, msgFoundList); 8 this.suggestWhichBrokerId = suggestWhichBrokerId; 9 this.messageBinary = messageBinary; 10 } 11 ...... 12 } 13 14 public class PullResult { 15 private final PullStatus pullStatus; 16 private final long nextBeginOffset; 17 private final long minOffset; 18 private final long maxOffset; 19 private List<MessageExt> msgFoundList; 20 21 public PullResult(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset, 22 List<MessageExt> msgFoundList) { 23 super(); 24 this.pullStatus = pullStatus; 25 this.nextBeginOffset = nextBeginOffset; 26 this.minOffset = minOffset; 27 this.maxOffset = maxOffset; 28 this.msgFoundList = msgFoundList; 29 } 30 ...... 31 }
拉取到的消息可能是多条,具体内容在PullResult 中的msgFoundList保存,MessageExt是Message的超类
回到pullSyncImpl方法,在拉取到消息后,调用processPullResult方法:
1 public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult, 2 final SubscriptionData subscriptionData) { 3 PullResultExt pullResultExt = (PullResultExt) pullResult; 4 5 this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId()); 6 if (PullStatus.FOUND == pullResult.getPullStatus()) { 7 ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary()); 8 List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer); 9 10 List<MessageExt> msgListFilterAgain = msgList; 11 if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) { 12 msgListFilterAgain = new ArrayList<MessageExt>(msgList.size()); 13 for (MessageExt msg : msgList) { 14 if (msg.getTags() != null) { 15 if (subscriptionData.getTagsSet().contains(msg.getTags())) { 16 msgListFilterAgain.add(msg); 17 } 18 } 19 } 20 } 21 22 if (this.hasHook()) { 23 FilterMessageContext filterMessageContext = new FilterMessageContext(); 24 filterMessageContext.setUnitMode(unitMode); 25 filterMessageContext.setMsgList(msgListFilterAgain); 26 this.executeHook(filterMessageContext); 27 } 28 29 for (MessageExt msg : msgListFilterAgain) { 30 String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); 31 if (traFlag != null && Boolean.parseBoolean(traFlag)) { 32 msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); 33 } 34 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET, 35 Long.toString(pullResult.getMinOffset())); 36 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
参与评论
手机查看
返回顶部