日本免费全黄少妇一区二区三区-高清无码一区二区三区四区-欧美中文字幕日韩在线观看-国产福利诱惑在线网站-国产中文字幕一区在线-亚洲欧美精品日韩一区-久久国产精品国产精品国产-国产精久久久久久一区二区三区-欧美亚洲国产精品久久久久

rocketmq源碼解析 rocketmq源碼部署( 四 )

1.2 RebalanceService#run:啟動(dòng)消息端負(fù)載均衡服務(wù) 。
// RebalanceService#run public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end"); } // MQClientInstance#doRebalance public void doRebalance() { for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { try { impl.doRebalance(); } catch (Throwable e) { log.error("doRebalance exception", e); } } } } // DefaultMQPushConsumerImpl#doRebalance public void doRebalance() { if (!this.pause) { this.rebalanceImpl.doRebalance(this.isConsumeOrderly()); } }1.2.1.1.1 RebalanceImpl#doRebalance:負(fù)載均衡服務(wù)類處理 。
// RebalanceImpl#doRebalance public void doRebalance(final boolean isOrder) { Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); if (subTable != null) { for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { final String topic = entry.getKey(); try { this.rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); } } } } this.truncateMessageQueueNotMyTopic(); } // RebalanceImpl#rebalanceByTopic switch (messageModel) { case BROADCASTING: { Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); if (mqSet != null) { boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);// 根據(jù)Toipc去除queue if (changed) { this.messageQueueChanged(topic, mqSet, mqSet); log.info("messageQueueChanged {} {} {} {}", consumerGroup, topic, mqSet, mqSet); } } else { ...... // RebalanceImpl#updateProcessQueueTableInRebalance this.dispatchPullRequest(pullRequestList);// RebalancePushImpl分發(fā)消息1.2.1.1.1.1.1.1 RebalancePushImpl#dispatchPullRequest:RebalancePushImpl分發(fā) 。
// RebalancePushImpl#dispatchPullRequest public void dispatchPullRequest(List<PullRequest> pullRequestList) { for (PullRequest pullRequest : pullRequestList) { this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest); } }五.總結(jié)
相比Producer的發(fā)送流程,Consumer的接收流程稍微復(fù)雜一點(diǎn) 。通過上面的源碼分析,可以知道RocketMQ是怎樣保證消息的有序的:
1.通過ReblanceImp的lockAll方法,每隔一段時(shí)間定時(shí)鎖住當(dāng)前消費(fèi)端正在消費(fèi)的隊(duì)列 。設(shè)置本地隊(duì)列ProcessQueue的locked屬性為true 。保證broker中的每個(gè)消息隊(duì)列只對應(yīng)一個(gè)消費(fèi)端;
2.另外,消費(fèi)端也是通過鎖,保證每個(gè)ProcessQueue只有一個(gè)線程消費(fèi) 。

推薦閱讀