RocketMQ源码阅读 Consumer

consumer

RocketMQ Consumer 分为两类,

  • DefaultMQPullConsumer 标记为弃用
  • DefaultMQPushConsumer

我们这次只分析 DefaultMQPushConsumer

当前先看个 PushConsumer 代码

public class PushConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        /**
         * 设置订阅的 可以对指定消息进行过滤,例如:"TopicTest","tagl||tag2||tag3",*或null表示topic所有消息
         */
        consumer.subscribe("TopicTest", "*");
        /**
         * CONSUME_FROM_LAST_OFFSET  第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费 
         * CONSUME_FROM_FIRST_OFFSET 第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费
         * CONSUME_FROM_TIMESTAMP    第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //wrong time format 2017_0422_221800
        consumer.setConsumeTimestamp("20181109221800");

        /**
         * 注册并发消费(消费监听)
         */
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            /**
             * ConsumeConcurrentlyStatus.CONSUME_SUCCESS   成功消费
             * ConsumeConcurrentlyStatus.RECONSUME_LATER   broker会根据设置的messageDelayLevel发起重试,默认16次
             * @param msgs
             * @param context
             * @return
             */
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

OK, 消费者的启动流程

  • 创建一个 DefaultMQPushConsumer, 内部构建了 DefaultMQPushConsumerImpl, 大部分核心流程都在 DefaultMQPushConsumerImpl
    public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
        AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
        this.consumerGroup = consumerGroup;
        this.namespace = namespace;
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
        defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
    }
  • 其他的都是设置参数和配置
  • 核心的一个 consumer.registerMessageListener 方法, 可以看出也是调用的 DefaultMQPushConsumerImpl#registerMessageListener 方法
   /**
     * Register a callback to execute on message arrival for concurrent consuming.
     * @param messageListener message handling callback.
     */
    @Override
    public void registerMessageListener(MessageListenerConcurrently messageListener) {
        this.messageListener = messageListener;
        this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
    }
  • 后面是 consumer.start() 启动,核心方法 DefaultMQPushConsumerImpl#start()
public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                this.serviceState = ServiceState.START_FAILED;
                // 检查各个配置项是否合法( consumerGroup, AllocateMessageQueueStrategy, MessageModel)
                this.checkConfig();
                // 将当前的 defaultMQPushConsumer 中的订阅关心复制到 RebalanceImpl(负载均衡器,决定 Conumer 从哪些 Queue 中消费信息)
                this.copySubscription();
                // 集群模式的话, 将当前的 defaultMQPushConsumer 实例名称改为 PID
                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }
                // 实例化 MQClientInstance
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
                // 初始化 rebalance 变量
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

                // 初始化 PullAPIWrapper(长链接,  从 Broker 拉取消息,利用Listener执行消息消费逻辑)
                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

                // 构建 offsetStore消息进度存储对象
                // 看下面的代码两种模式
                // Local  存储在本地磁盘上,适用于 BROADCASTING
                // Remote 存储在Broker上,适用于 CLUSTERING,
                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                //加载消费进度
                this.offsetStore.load();

                // 看是哪种消费模式
                // 顺序消费 ,实例化 ConsumeMessageOrderlyService
                // 普通消费 ,实例化 ConsumeMessageConcurrentlyService
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }
                //启动消息消费服务
                this.consumeMessageService.start();

                //注册consumer
                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                
                //启动
                mQClientFactory.start();
                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
        //更新 topicSubcibeInfo 信息
        /**
         * 
         * 将从 NameServer 获取的 TopicRouteData 和 topicRouteTable 中的 TopicRouteData 进行对比,
         * 先通过 topicRouteDataIsChange 方法判断是否有变化,没有的话再通过 isNeedUpdateTopicRouteInfo 方法进一步对比
         * 若 有变化 则更新 brokerAddrTable,
         * - 遍历 producerTable 执行 updateTopicPublishInfo(topic, publishInfo),
         * - 遍历 consumerTable 执行 updateTopicPublishInfo(topic, subscribeInfo)
         *
         * 最后将 cloneTopicRouteData 更新到 topicRouteTable
         */
        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        // 检查 consumer 配置
        this.mQClientFactory.checkClientInBroker();
        // 向每个 broker 发送心跳信息
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        // 立即触发一次 rebalance(内部 通过 rebalanceService#wakeup 触发唤醒一个 RebalanceService 线程触发)
        this.mQClientFactory.rebalanceImmediately();
    }

mQClientFactory.start()

public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    // 从 配置中找寻 NameServer 地址
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    // 开启 request-response 通道(代码上复用的通信组件 Netty 链接)
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks  | 开启 很多的定时任务
                    /**
                     * 定时任务有:
                     * 1. 每隔 2 分钟检测 NameServer 的变化
                     * 2. 每隔 30 秒从 NameServer 获取 Topic 路由信息变化 和 新的 Topic 路由信息
                     * 3. 每隔 30 秒清理 下线的 Broker  && 向所有的Broker发送心跳信息(包含了订阅关系等)
                     * 4. 每隔 5 秒 持久化所有的消费进度(广播模式的话,存储到本地,集群的存储到 Broker)
                     * 5. 每隔 1 分钟 检查线程池大小是否需要调整
                     */
                    this.startScheduledTask();
                    // Start pull service
                    // 启动拉取消息的服务
                    this.pullMessageService.start();
                    // Start rebalance service
                    // 启动 Rebalance 负载均衡服务
                    this.rebalanceService.start();
                    // Start push service
                    /**
                     * 这里又调用了 DefaultMQProducerImpl 的 start 方法, 因为传入的 false, 不会进入循环启动
                     * 未报错的话是为了先将 DefaultMQProducerImpl 实例内的状态机状态 先变更为 ServiceState.RUNNING
                     */
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    //启动完毕, 变更状态
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

好了,整个消费端启动完毕。 可以看出在加入注册消费监听器的时候,被存储缓存起来了。 后面来深入到 PullAPIWrapper (为了到 broker 拉取信息)

this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
// ........
this.consumeMessageService.start(); //消费消息服务
// .....
// 立即触发一次 rebalance(内部 通过 rebalanceService#wakeup 触发唤醒一个 RebalanceService 线程触发)
this.mQClientFactory.rebalanceImmediately();

现在 深入到 拉取消息的 consumeMessageService.start() 代码 进入我们看 order 有序的消费

    // 每隔20秒 做了全局锁
    public void start() {
        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    ConsumeMessageOrderlyService.this.lockMQPeriodically();
                }
            }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
        }
    }

真实的是 this.mQClientFactory.rebalanceImmediately(); 触发 wakeup 的唤醒 rebalanceService

    public void rebalanceImmediately() {
        this.rebalanceService.wakeup();
    }
    public void wakeup() {
        if (hasNotified.compareAndSet(false, true)) {
            waitPoint.countDown(); // notify
        }
    }

回头再看 RebalanceServicerun 方法, 因为线程调度执行

  /**
     * 20000 每隔 20 s 做了一次负载均衡
     */
    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            
            this.mqClientFactory.doRebalance();
        }

        log.info(this.getServiceName() + " service end");
    }

后面深入调用的 MQClientInstance#doRebalance

    public void doRebalance() {
        for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                try {
                    // # 真实的 MQConsumerInner -> DefaultMQPushConsumerImpl.doRebalance()
                    impl.doRebalance();
                } catch (Throwable e) {
                    log.error("doRebalance exception", e);
                }
            }
        }
    }

内部调用的 MQConsumerInner#doRebalance() 这是接口,我们看的 是Push 方式,所以最终调用的是 DefaultMQPushConsumerImpl#doRebalance()

    public void doRebalance() {
        if (!this.pause) {
            this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
        }
    }

层层进入的 是 RebalanceImpl#doRebalance

    public void doRebalance(final boolean isOrder) {
        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                    this.rebalanceByTopic(topic, isOrder);
                } catch (Throwable e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("rebalanceByTopic Exception", e);
                    }
                }
            }
        }

        this.truncateMessageQueueNotMyTopic();
    }
    
    private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            // 广播模式没有负载均衡
            case BROADCASTING: {
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                if (mqSet != null) {
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, mqSet);
                        log.info("messageQueueChanged {} {} {} {}",
                            consumerGroup,
                            topic,
                            mqSet,
                            mqSet);
                    }
                } else {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                }
                break;
            }
            // 集群模式的话
            case CLUSTERING: {
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                // 根据 topic 和 消费组 得到所有的消费者客户端
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                if (null == mqSet) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                    }
                }

                if (null == cidAll) {
                    log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
                }

                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);

                    //排序 为了后面做取模运算
                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
                        //负载均衡,计算出本消费者应该去哪个MessageQueue拉取消息。默认的是平均算法
                        /**
                         *                (mod=0)       (mod!=0)      无法都分配
                         *     4个queue   2个consumer     3个consumer     5个 consumer
                         *  | queue[0] | consumer[0] | consumer[0] |  consumer[0] |
                         *  | queue[1] | consumer[1] | consumer[0] |  consumer[1] |
                         *  | queue[2] | consumer[1] | consumer[1] |  consumer[2] |
                         *  | queue[3] | consumer[1] | consumer[2] |  consumer[3] |
                         */
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                            e);
                        return;
                    }

                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }

                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        log.info(
                            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                            allocateResultSet.size(), allocateResultSet);
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }

可以看到,不管集群还是 本地秒模式 都调用了 RebalanceImpl#updateProcessQueueTableInRebalance 方法。深入查看一下代码

RebalanceImpl

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
        final boolean isOrder) {
        boolean changed = false;

        Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<MessageQueue, ProcessQueue> next = it.next();
            MessageQueue mq = next.getKey();
            ProcessQueue pq = next.getValue();

            if (mq.getTopic().equals(topic)) {
                if (!mqSet.contains(mq)) {
                    pq.setDropped(true);
                    if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                        it.remove();
                        changed = true;
                        log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
                    }
                } else if (pq.isPullExpired()) {
                    switch (this.consumeType()) {
                        case CONSUME_ACTIVELY:
                            break;
                        case CONSUME_PASSIVELY:
                            pq.setDropped(true);
                            if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                                it.remove();
                                changed = true;
                                log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                                    consumerGroup, mq);
                            }
                            break;
                        default:
                            break;
                    }
                }
            }
        }
        // 构建 PullRequest 列表
        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        for (MessageQueue mq : mqSet) {
            if (!this.processQueueTable.containsKey(mq)) {
                if (isOrder && !this.lock(mq)) {
                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                    continue;
                }

                this.removeDirtyOffset(mq);
                ProcessQueue pq = new ProcessQueue();
                long nextOffset = this.computePullFromWhere(mq);
                if (nextOffset >= 0) {
                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                    if (pre != null) {
                        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                    } else {
                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                        PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(consumerGroup);
                        pullRequest.setNextOffset(nextOffset);
                        pullRequest.setMessageQueue(mq);
                        pullRequest.setProcessQueue(pq);
                        pullRequestList.add(pullRequest);
                        changed = true;
                    }
                } else {
                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                }
            }
        }
        // 转发 PullRequest
        this.dispatchPullRequest(pullRequestList);

        return changed;
    }

    @Override
    public void dispatchPullRequest(List<PullRequest> pullRequestList) {
        for (PullRequest pullRequest : pullRequestList) {
            this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
            log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
        }
    }

最终发现构建的 PullRequest 添加到PullMessageServicepullRequestQueue 了。

PullMessageService#executePullRequestImmediately

    public void executePullRequestImmediately(final PullRequest pullRequest) {
        try {
            this.pullRequestQueue.put(pullRequest);
        } catch (InterruptedException e) {
            log.error("executePullRequestImmediately pullRequestQueue.put", e);
        }
    }

后面就是异步调度,从 pullRequestQueue.take() 出请求,进行执行拉取

PullMessageService#run

    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        //pullMessageService会在获取PullRequest时一直阻塞在pullRequestQueue,直到rebalance将PullRequest放入此阻塞队列
        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }

后面看 DefaultMQPushConsumerImpl#pullMessage -> PullAPIWrapper#pullKernelImpl -> MQClientAPIImpl#pullMessage -> MQClientAPIImpl#pullMessageAsync or MQClientAPIImpl#pullMessageSync -> RemotingClient#invokeAsync or RemotingClient#invokeSync

RemotingClient#invokeAsync

    private void pullMessageAsync(
        final String addr,
        final RemotingCommand request,
        final long timeoutMillis,
        final PullCallback pullCallback
    ) throws RemotingException, InterruptedException {
        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
            @Override
            public void operationComplete(ResponseFuture responseFuture) {
                RemotingCommand response = responseFuture.getResponseCommand();
                if (response != null) {
                    try {
                        PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr);
                        assert pullResult != null;
                        # invoke listener success
                        pullCallback.onSuccess(pullResult);
                    } catch (Exception e) {
                        # invoke listener exception
                        pullCallback.onException(e);
                    }
                } else {
                    if (!responseFuture.isSendRequestOK()) {
                        pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
                    } else if (responseFuture.isTimeout()) {
                        pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
                            responseFuture.getCause()));
                    } else {
                        pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
                    }
                }
            }
        });
    }

这样完成整个 Consumer 消费的.

comments powered by Disqus