RocketMQ源码阅读 Consumer
RocketMQ
Consumer
分为两类,
DefaultMQPullConsumer
标记为弃用DefaultMQPushConsumer
我们这次只分析 DefaultMQPushConsumer
当前先看个 PushConsumer
代码
public class PushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
/**
* 设置订阅的 可以对指定消息进行过滤,例如:"TopicTest","tagl||tag2||tag3",*或null表示topic所有消息
*/
consumer.subscribe("TopicTest", "*");
/**
* CONSUME_FROM_LAST_OFFSET 第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费
* CONSUME_FROM_FIRST_OFFSET 第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费
* CONSUME_FROM_TIMESTAMP 第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20181109221800");
/**
* 注册并发消费(消费监听)
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* ConsumeConcurrentlyStatus.CONSUME_SUCCESS 成功消费
* ConsumeConcurrentlyStatus.RECONSUME_LATER broker会根据设置的messageDelayLevel发起重试,默认16次
* @param msgs
* @param context
* @return
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
OK, 消费者的启动流程
- 创建一个
DefaultMQPushConsumer
, 内部构建了DefaultMQPushConsumerImpl
, 大部分核心流程都在DefaultMQPushConsumerImpl
内
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
- 其他的都是设置参数和配置
- 核心的一个
consumer.registerMessageListener
方法, 可以看出也是调用的DefaultMQPushConsumerImpl#registerMessageListener
方法
/**
* Register a callback to execute on message arrival for concurrent consuming.
* @param messageListener message handling callback.
*/
@Override
public void registerMessageListener(MessageListenerConcurrently messageListener) {
this.messageListener = messageListener;
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}
- 后面是
consumer.start()
启动,核心方法DefaultMQPushConsumerImpl#start()
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
// 检查各个配置项是否合法( consumerGroup, AllocateMessageQueueStrategy, MessageModel)
this.checkConfig();
// 将当前的 defaultMQPushConsumer 中的订阅关心复制到 RebalanceImpl(负载均衡器,决定 Conumer 从哪些 Queue 中消费信息)
this.copySubscription();
// 集群模式的话, 将当前的 defaultMQPushConsumer 实例名称改为 PID
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
// 实例化 MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
// 初始化 rebalance 变量
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
// 初始化 PullAPIWrapper(长链接, 从 Broker 拉取消息,利用Listener执行消息消费逻辑)
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
// 构建 offsetStore消息进度存储对象
// 看下面的代码两种模式
// Local 存储在本地磁盘上,适用于 BROADCASTING
// Remote 存储在Broker上,适用于 CLUSTERING,
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
//加载消费进度
this.offsetStore.load();
// 看是哪种消费模式
// 顺序消费 ,实例化 ConsumeMessageOrderlyService
// 普通消费 ,实例化 ConsumeMessageConcurrentlyService
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
//启动消息消费服务
this.consumeMessageService.start();
//注册consumer
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
//启动
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
//更新 topicSubcibeInfo 信息
/**
*
* 将从 NameServer 获取的 TopicRouteData 和 topicRouteTable 中的 TopicRouteData 进行对比,
* 先通过 topicRouteDataIsChange 方法判断是否有变化,没有的话再通过 isNeedUpdateTopicRouteInfo 方法进一步对比
* 若 有变化 则更新 brokerAddrTable,
* - 遍历 producerTable 执行 updateTopicPublishInfo(topic, publishInfo),
* - 遍历 consumerTable 执行 updateTopicPublishInfo(topic, subscribeInfo)
*
* 最后将 cloneTopicRouteData 更新到 topicRouteTable
*/
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
// 检查 consumer 配置
this.mQClientFactory.checkClientInBroker();
// 向每个 broker 发送心跳信息
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 立即触发一次 rebalance(内部 通过 rebalanceService#wakeup 触发唤醒一个 RebalanceService 线程触发)
this.mQClientFactory.rebalanceImmediately();
}
mQClientFactory.start()
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
// 从 配置中找寻 NameServer 地址
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
// 开启 request-response 通道(代码上复用的通信组件 Netty 链接)
this.mQClientAPIImpl.start();
// Start various schedule tasks | 开启 很多的定时任务
/**
* 定时任务有:
* 1. 每隔 2 分钟检测 NameServer 的变化
* 2. 每隔 30 秒从 NameServer 获取 Topic 路由信息变化 和 新的 Topic 路由信息
* 3. 每隔 30 秒清理 下线的 Broker && 向所有的Broker发送心跳信息(包含了订阅关系等)
* 4. 每隔 5 秒 持久化所有的消费进度(广播模式的话,存储到本地,集群的存储到 Broker)
* 5. 每隔 1 分钟 检查线程池大小是否需要调整
*/
this.startScheduledTask();
// Start pull service
// 启动拉取消息的服务
this.pullMessageService.start();
// Start rebalance service
// 启动 Rebalance 负载均衡服务
this.rebalanceService.start();
// Start push service
/**
* 这里又调用了 DefaultMQProducerImpl 的 start 方法, 因为传入的 false, 不会进入循环启动
* 未报错的话是为了先将 DefaultMQProducerImpl 实例内的状态机状态 先变更为 ServiceState.RUNNING
*/
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
//启动完毕, 变更状态
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
好了,整个消费端启动完毕。 可以看出在加入注册消费监听器的时候,被存储缓存起来了。
后面来深入到 PullAPIWrapper
(为了到 broker
拉取信息)
this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
// ........
this.consumeMessageService.start(); //消费消息服务
// .....
// 立即触发一次 rebalance(内部 通过 rebalanceService#wakeup 触发唤醒一个 RebalanceService 线程触发)
this.mQClientFactory.rebalanceImmediately();
现在 深入到 拉取消息的 consumeMessageService.start()
代码 进入我们看 order 有序的消费
// 每隔20秒 做了全局锁
public void start() {
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
ConsumeMessageOrderlyService.this.lockMQPeriodically();
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
}
真实的是 this.mQClientFactory.rebalanceImmediately();
触发 wakeup
的唤醒 rebalanceService
public void rebalanceImmediately() {
this.rebalanceService.wakeup();
}
public void wakeup() {
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}
回头再看 RebalanceService
的 run
方法, 因为线程调度执行
/**
* 20000 每隔 20 s 做了一次负载均衡
*/
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
后面深入调用的 MQClientInstance#doRebalance
public void doRebalance() {
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
// # 真实的 MQConsumerInner -> DefaultMQPushConsumerImpl.doRebalance()
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
内部调用的 MQConsumerInner#doRebalance()
这是接口,我们看的 是Push 方式,所以最终调用的是 DefaultMQPushConsumerImpl#doRebalance()
public void doRebalance() {
if (!this.pause) {
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}
层层进入的 是 RebalanceImpl#doRebalance
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
// 广播模式没有负载均衡
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
// 集群模式的话
case CLUSTERING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 根据 topic 和 消费组 得到所有的消费者客户端
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
//排序 为了后面做取模运算
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
//负载均衡,计算出本消费者应该去哪个MessageQueue拉取消息。默认的是平均算法
/**
* (mod=0) (mod!=0) 无法都分配
* 4个queue 2个consumer 3个consumer 5个 consumer
* | queue[0] | consumer[0] | consumer[0] | consumer[0] |
* | queue[1] | consumer[1] | consumer[0] | consumer[1] |
* | queue[2] | consumer[1] | consumer[1] | consumer[2] |
* | queue[3] | consumer[1] | consumer[2] | consumer[3] |
*/
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
可以看到,不管集群还是 本地秒模式 都调用了 RebalanceImpl#updateProcessQueueTableInRebalance
方法。深入查看一下代码
RebalanceImpl
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
if (!mqSet.contains(mq)) {
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
// 构建 PullRequest 列表
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
// 转发 PullRequest
this.dispatchPullRequest(pullRequestList);
return changed;
}
@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
}
}
最终发现构建的 PullRequest
添加到PullMessageService
的 pullRequestQueue
了。
PullMessageService#executePullRequestImmediately
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
后面就是异步调度,从 pullRequestQueue.take()
出请求,进行执行拉取
PullMessageService#run
@Override
public void run() {
log.info(this.getServiceName() + " service started");
//pullMessageService会在获取PullRequest时一直阻塞在pullRequestQueue,直到rebalance将PullRequest放入此阻塞队列
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
后面看 DefaultMQPushConsumerImpl#pullMessage
-> PullAPIWrapper#pullKernelImpl
-> MQClientAPIImpl#pullMessage
-> MQClientAPIImpl#pullMessageAsync
or MQClientAPIImpl#pullMessageSync
-> RemotingClient#invokeAsync
or RemotingClient#invokeSync
RemotingClient#invokeAsync
private void pullMessageAsync(
final String addr,
final RemotingCommand request,
final long timeoutMillis,
final PullCallback pullCallback
) throws RemotingException, InterruptedException {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr);
assert pullResult != null;
# invoke listener success
pullCallback.onSuccess(pullResult);
} catch (Exception e) {
# invoke listener exception
pullCallback.onException(e);
}
} else {
if (!responseFuture.isSendRequestOK()) {
pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
} else if (responseFuture.isTimeout()) {
pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
responseFuture.getCause()));
} else {
pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
}
}
}
});
}
这样完成整个 Consumer 消费的.