日本免费全黄少妇一区二区三区-高清无码一区二区三区四区-欧美中文字幕日韩在线观看-国产福利诱惑在线网站-国产中文字幕一区在线-亚洲欧美精品日韩一区-久久国产精品国产精品国产-国产精久久久久久一区二区三区-欧美亚洲国产精品久久久久

rocketmq源碼解析 rocketmq源碼部署

本文主要分析RocketMQ中如何保證消息有序的 。
RocketMQ的版本為:4.2.0 release 。
一.時(shí)序圖
還是老規(guī)矩,先把分析過程的時(shí)序圖擺出來:
1.Producer發(fā)送順序消息

rocketmq源碼解析 rocketmq源碼部署


2.Consumer接收順序消息(一)

rocketmq源碼解析 rocketmq源碼部署


3.Consumer接收順序消息(二)

rocketmq源碼解析 rocketmq源碼部署


二.源碼分析 – Producer發(fā)送順序消息
1 DefaultMQProducer#send:發(fā)送消息,入?yún)⒅杏凶远x的消息隊(duì)列選擇器 。
// DefaultMQProducer#send public SendResult send(Message msg, MessageQueueSelector selector, Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg, selector, arg); }1.1 DefaultMQProducerImpl#makeSureStateOK:確保Producer的狀態(tài)是運(yùn)行狀態(tài)-ServiceState.RUNNING 。
// DefaultMQProducerImpl#makeSureStateOK private void makeSureStateOK() throws MQClientException { if (this.serviceState != ServiceState.RUNNING) { throw new MQClientException("The producer service state not OK, "+ this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); } }1.2 DefaultMQProducerImpl#tryToFindTopicPublishInfo:根據(jù)Topic獲取發(fā)布Topic用到的路由信息 。
// DefaultMQProducerImpl#tryToFindTopicPublishInfo private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);// 為空則從 NameServer更新獲取,false,不傳入 defaultMQProducer topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {// 有了路由信息而且狀態(tài)OK,則返回 return topicPublishInfo; } else { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }1.3 調(diào)用自定義消息隊(duì)列選擇器的select方法 。
// DefaultMQProducerImpl#sendSelectImpl MessageQueue mq = null; try { mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg); } catch (Throwable e) { throw new MQClientException("select message queue throwed exception.", e); } // Producer#main SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId);【rocketmq源碼解析 rocketmq源碼部署】1.4 DefaultMQProducerImpl#sendKernelImpl:發(fā)送消息的核心實(shí)現(xiàn)方法 。
// DefaultMQProducerImpl#sendKernelImpl ...... switch (communicationMode) { case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; ......1.4.1 MQClientAPIImpl#sendMessage:發(fā)送消息 。
// MQClientAPIImpl#sendMessage ...... switch (communicationMode) {// 根據(jù)發(fā)送消息的模式(同步/異步)選擇不同的方式,默認(rèn)是同步 case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTimeSync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); } return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request); ......

推薦閱讀