日本免费全黄少妇一区二区三区-高清无码一区二区三区四区-欧美中文字幕日韩在线观看-国产福利诱惑在线网站-国产中文字幕一区在线-亚洲欧美精品日韩一区-久久国产精品国产精品国产-国产精久久久久久一区二区三区-欧美亚洲国产精品久久久久

rocketmq源碼解析 rocketmq源碼部署( 二 )

1.4.1.1 MQClientAPIImpl#sendMessageSync:發(fā)送同步消息 。
// MQClientAPIImpl#sendMessageSync private SendResult sendMessageSync( final String addr, final String brokerName, final Message msg, final long timeoutMillis, final RemotingCommand request ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null; return this.processSendResponse(brokerName, msg, response); }1.4.1.1.1 NettyRemotingClient#invokeSync:構(gòu)造RemotingCommand,調(diào)用的方式是同步 。
// NettyRemotingClient#invokeSyncRemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); if (this.rpcHook != null) { this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response); } return response;三.源碼分析 – Consumer接收順序消息(一)
1 DefaultMQPushConsumer#registerMessageListener:把Consumer傳入的消息監(jiān)聽器加入到messageListener中 。
// DefaultMQPushConsumer#registerMessageListener public void registerMessageListener(MessageListenerOrderly messageListener) { this.messageListener = messageListener; this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); }1.1 DefaultMQPushConsumerImpl#registerMessageListener:把Consumer傳入的消息監(jiān)聽器加入到messageListenerInner中 。
// DefaultMQPushConsumerImpl#registerMessageListener public void registerMessageListener(MessageListener messageListener) { this.messageListenerInner = messageListener; }2 DefaultMQPushConsumer#start:啟動Consumer 。
// DefaultMQPushConsumer#start public void start() throws MQClientException { this.defaultMQPushConsumerImpl.start(); }2.1 DefaultMQPushConsumerImpl#start:啟動ConsumerImpl 。
// DefaultMQPushConsumerImpl#start switch (this.serviceState) { case CREATE_JUST:// 剛剛創(chuàng)建 ...... if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {// 有序消息服務(wù) this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {// 并發(fā)無序消息服務(wù) this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } ...... this.consumeMessageService.start();// 啟動消息服務(wù) ...... mQClientFactory.start();// 啟動MQClientInstance ......2.1.1 new
ConsumeMessageOrderlyService():構(gòu)造順序消息服務(wù) 。
// ConsumeMessageOrderlyService#ConsumeMessageOrderlyService public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerOrderly messageListener) { this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this.messageListener = messageListener; this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer(); this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>(); this.consumeExecutor = new ThreadPoolExecutor(// 主消息消費(fèi)線程池,正常執(zhí)行收到的ConsumeRequest 。多線程 this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_")); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_")); }2.1.2
ConsumeMessageOrderlyService#start:啟動消息隊(duì)列客戶端實(shí)例 。
// DefaultMQPushConsumerImpl#start this.consumeMessageService.start(); // ConsumeMessageOrderlyService#start public void start() { if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { ConsumeMessageOrderlyService.this.lockMQPeriodically();// 定時(shí)向broker發(fā)送批量鎖住當(dāng)前正在消費(fèi)的隊(duì)列集合的消息 } }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); } }

推薦閱讀