日本免费全黄少妇一区二区三区-高清无码一区二区三区四区-欧美中文字幕日韩在线观看-国产福利诱惑在线网站-国产中文字幕一区在线-亚洲欧美精品日韩一区-久久国产精品国产精品国产-国产精久久久久久一区二区三区-欧美亚洲国产精品久久久久

rocketmq源碼解析 rocketmq源碼部署( 三 )

2.1.2.1
ConsumeMessageOrderlyService#lockMQPeriodically:定時(shí)向broker發(fā)送批量鎖住當(dāng)前正在消費(fèi)的隊(duì)列集合的消息 。
2.1.2.1.1 RebalanceImpl#lockAll:鎖住所有正在消息的隊(duì)列 。
// ConsumeMessageOrderlyService#lockMQPeriodically if (!this.stopped) { this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll(); } // RebalanceImpl#lockAll HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();// 根據(jù)brokerName從processQueueTable獲取正在消費(fèi)的隊(duì)列集合 ...... Set<MessageQueue> lockOKMQSet = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);// 向Broker發(fā)送鎖住消息隊(duì)列的指令 for (MessageQueue mq : lockOKMQSet) { ProcessQueue processQueue = this.processQueueTable.get(mq); if (processQueue != null) { if (!processQueue.isLocked()) { log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq); } processQueue.setLocked(true); processQueue.setLastLockTimestamp(System.currentTimeMillis()); } } ......2.1.3 MQClientInstance#start:啟動MQClientInstance 。過程較復(fù)雜,放到大標(biāo)題四中分析 。
// DefaultMQPushConsumerImpl#start mQClientFactory.start();四.源碼分析 – Consumer接收順序消息(二)
1 MQClientInstance#start:啟動客戶端實(shí)例MQClientInstance 。
// MQClientInstance#start synchronized (this) { switch (this.serviceState) { case CREATE_JUST: ...... // Start pull service 啟動拉取消息服務(wù) this.pullMessageService.start(); // Start rebalance service 啟動消費(fèi)端負(fù)載均衡服務(wù) this.rebalanceService.start(); ......1.1 PullMessageService#run:啟動拉取消息服務(wù) 。實(shí)際調(diào)用的是DefaultMQPushConsumerImpl的pullMessage方法 。
// PullMessageService#run public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); } // PullMessageService#pullMessage private void pullMessage(final PullRequest pullRequest) { final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest);// 調(diào)用DefaultMQPushConsumerImpl的pullMessage } else { log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); } }1.1.1.1 DefaultMQPushConsumerImpl#pullMessage:拉取消息 。提交到
ConsumeMessageOrderlyService的線程池consumeExecutor中執(zhí)行 。
// DefaultMQPushConsumerImpl#pullMessage ...... PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; ...... DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); ......1.1.1.1.1.1.1 ConsumeRequest#run:處理消息消費(fèi)的線程 。
// ConsumeMessageOrderlyService.ConsumeRequest#run List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize); ...... long beginTimestamp = System.currentTimeMillis(); ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; boolean hasException = false; try { this.processQueue.getLockConsume().lock(); if (this.processQueue.isDropped()) { log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",this.messageQueue); break; } status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);// 實(shí)際消費(fèi)消息的地方,回調(diào)消息監(jiān)聽器的consumeMessage方法 } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageOrderlyService.this.consumerGroup, msgs,messageQueue); hasException = true; } finally { this.processQueue.getLockConsume().unlock(); } ......

推薦閱讀