RocketMQ源码阅读 Producer

在消息发送的时候, 我们先看 Producer 的启动代码

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.start();
        for (int i = 0; i < 128; i++){
            try {
                Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }  
        }
        producer.shutdown();
    }
}

非常简单的代码构建了一个 Producer 下面肯定有疑问:

  • Producer 如何启动的
  • Producer 发送消息流程
  • ProducerNameSever 如何交互
  • Producer 发送消息如何保证顺序
  • Producer 负载均衡如何实现的
  • Producer 延迟消息(发送的时候设置 Message 的延迟级别)

Producer 端 如何启动的

第一步

可以看到启动一个 Producer 的代码, 核心类 DefaultMQProducer ,构造方法中又 New 了一个新的类 DefaultMQProducerImpl, 这个类是给真正的核心的类

    public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
        this.namespace = namespace;
        this.producerGroup = producerGroup;
        defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
    }

后面入口走到 start 方法, 内部启动的 defaultMQProducerImpl.start 方法

    public void start() throws MQClientException {
        //启动 Producer
        this.setProducerGroup(withNamespace(this.producerGroup));
        //真实的启动
        this.defaultMQProducerImpl.start();
        //.... 下面代码 省略 只看核心代码
    }

第二步 this.defaultMQProducerImpl.start();

public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            //第一次 默认 new Producer 是 CreateJust 状态
            case CREATE_JUST:
                // 完全启动后 置成 ServiceState.RUNNING
                this.serviceState = ServiceState.START_FAILED;
                //检查配置     group 是否填写、是不是默认的名字、长度是不是超出限制......
                this.checkConfig();
                //更改 instanceName 为 PID
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
                /**
                 *   单例模式, 获取 MQClientInstance 对象, 客户端实例
                 *   MQClientManager JVM只有一个 静态常量
                 *   MQClientManager 内部维护着 clientId - MQClientInstance 的缓存 factoryTable
                 */
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                //将 Producer 注册到 Producer Group 中, 注册的 使用 producerTable 维护
                ///**
                // *  private final ConcurrentMap<String,MQProducerInner > producerTable
                // *  = new ConcurrentHashMap<String, MQProducerInner>();
                // */
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                // 存储路由信息的对象  TopicPublishInfo 是路由信息
                // this.defaultMQProducer.getCreateTopicKey() 获取 message 中的 topic, 但是在此,还没那样 message 进入。看具体的
                // 是个默认的Key, 启动的时候为了测试和demo运行
                //     /**
                //     * Just for testing or demo program
                //     */
                //    private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

                //启动生产
                if (startFactory) {
                    //MQClientInstance 的 start   真正的启动核心类
                    mQClientFactory.start();
                }

                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

        //启动向所有的 broker 发送心态
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

        // 每隔 3 秒 扫描过期的请求
        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    RequestFutureTable.scanExpiredRequest();
                } catch (Throwable e) {
                    log.error("scan RequestFutureTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }

上述代码看注释

    1. serviceStatenew producer 默认为 CreateJust 状态, 先设置为 START_FAILE
    1. checkConfig 检查配置, 看 group 是否填写, 是不是默认的名称, 长度是否超过限制
    1. 更改 InstanceName 为 PID
    1. 构建 MQClientManager(单例模式)提供获取 MQClientInstance 实例, 内部维护者一个 ClientIdMQClientInstance 的缓存
    1. 注册 ProducerProducerGroup 中 (线程安全的Map)
    1. 存储路由信息的对象(TopicPublishInfo) 这边只是为了测试和demo程序
    1. 后面启动真实的 mQClientFactory.start()
    1. 更改 serviceStateRUNNING
    1. 向所有的 Broker 发送心跳
  • 10.每隔 3 秒 扫描过期的请求

第三步 MQClientInstance 核心启动方法 mQClientFactory.start();

    public void start() throws MQClientException {
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    // 从 配置中找寻 NameServer 地址
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    // 开启 request-response 通道(代码上复用的通信组件 Netty 链接)
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks  | 开启 很多的定时任务
                    /**
                     * 定时任务有:
                     * 1. 每隔 2 分钟检测 NameServer 的变化
                     * 2. 每隔 30 秒从 NameServer 获取 Topic 路由信息变化 和 新的 Topic 路由信息
                     * 3. 每隔 30 秒清理 下线的 Broker
                     * 4. 每隔 5 秒 持久化所有的消费进度
                     * 5. 每隔 1 分钟 检车线程池大小是否需要调整
                     */
                    this.startScheduledTask();
                    // Start pull service
                    // 启动拉取消息的服务
                    this.pullMessageService.start();
                    // Start rebalance service
                    // 启动 Rebalance 负载均衡服务
                    this.rebalanceService.start();
                    // Start push service
                    /**
                     * 这里又调用了 DefaultMQProducerImpl 的 start 方法, 因为传入的 false, 不会进入循环启动
                     * 未报错的话是为了先将 DefaultMQProducerImpl 实例内的状态机状态 先变更为 ServiceState.RUNNING
                     */
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    //启动完毕, 变更状态
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

这边的代码主要是开启多个服务了

    1. 从 配置中找寻 NameServer 地址
    1. 开启 request response 通道
    1. 开启很多的定时任务
    • 每隔 2 分钟检测 NameServer 的变化
    • 每隔 30 秒从 NameServer 获取 Topic 路由信息变化 和 新的 Topic 路由信息
    • 每隔 30 秒清理 下线的 Broker
    • 每隔 5 秒 持久化所有的消费进度
    • 每隔 1 分钟 检查线程池大小是否需要调整
    1. 启动拉取消息的服务
    1. 启动 Rebalance 负载均衡服务
    1. Start push service 调用了 DefaultMQProducerImpl.start
    1. 启动完毕, 变更状态

Producer 发送消息流程

发送逻辑路径 DefaultMQProducer.send -> DefaultMQProducerImpl.send -> DefaultMQProducerImpl.sendDefaultImpl -> DefaultMQProducerImpl.sendKernelImpl

 // 入口
 SendResult sendResult = producer.send(msg);

DefaultMQProducer.send

        //  检查 msg 的 topic 基础信息( topic 名称不为空, topic 名称不包含非法字符, topic 名称长度<127)
        @Override
        public SendResult send(
                Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
                //参数校验
                Validators.checkMessage(msg, this);
                //设置 topic
                msg.setTopic(withNamespace(msg.getTopic()));
                //发送消息
                return this.defaultMQProducerImpl.send(msg);
        }

DefaultMQProducerImpl.sendDefaultImpl

private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        //检查 Producer 状态OK
        this.makeSureStateOK();
        //检查 message 信息
        Validators.checkMessage(msg, this.defaultMQProducer);
        final long invokeID = random.nextLong();
        //记录 开始时间
        long beginTimestampFirst = System.currentTimeMillis();
        //上此的开始时间默认为开始时间
        long beginTimestampPrev = beginTimestampFirst;
        //结束时间默认为开始时间
        long endTimestamp = beginTimestampFirst;
        //获取 Topic 对应的路由信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            //根据模式, 同步 还是 异步, 决定发送失败的重试的总次数
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            //循环发送, 成功发送的跳出循环
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                //选择合适的队列  核心的方法( 开启 容错策略的话的核心方法)
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                //队列不为空进行发送
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        //记录上次的开始时间
                        beginTimestampPrev = System.currentTimeMillis();
                        if (times > 0) {
                            //Reset topic with namespace during resend.
                            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                        }
                        //发送已经耗费的总时间, 如果在规定的时间内, 还没有发送成功, 跳出重试
                        long costTime = beginTimestampPrev - beginTimestampFirst;
                        if (timeout < costTime) {
                            callTimeout = true;
                            break;
                        }
                        //发送
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        endTimestamp = System.currentTimeMillis();
                        //根据发送耗费的时间决定是不是要将该 Broker 加入到故障列表缓存中
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        //根据模式决定做什么动作
                        switch (communicationMode) {
                            case ASYNC:
                                //异步 什么都不做
                                return null;
                            case ONEWAY:
                                //oneway 什么都不做
                                return null;
                            case SYNC:
                                //同步发送 根据发送返回状态决定是否重试
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }
                                //发送成功直接结束方法
                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException e) {
                        //出现异常加入故障列表, 进行重试
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQClientException e) {
                        //出现异常加入故障列表, 进行重试
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQBrokerException e) {
                        //出现异常加入故障列表, 进行重试
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        switch (e.getResponseCode()) {
                            case ResponseCode.TOPIC_NOT_EXIST:
                            case ResponseCode.SERVICE_NOT_AVAILABLE:
                            case ResponseCode.SYSTEM_ERROR:
                            case ResponseCode.NO_PERMISSION:
                            case ResponseCode.NO_BUYER_ID:
                            case ResponseCode.NOT_IN_CURRENT_UNIT:
                                continue;
                            default:
                                if (sendResult != null) {
                                    return sendResult;
                                }

                                throw e;
                        }
                    } catch (InterruptedException e) {
                        //出现异常加入故障列表, 进行重试
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());

                        log.warn("sendKernelImpl exception", e);
                        log.warn(msg.toString());
                        throw e;
                    }
                } else {
                    break;
                }
            }

            if (sendResult != null) {
                return sendResult;
            }

            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                times,
                System.currentTimeMillis() - beginTimestampFirst,
                msg.getTopic(),
                Arrays.toString(brokersSent));

            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

            MQClientException mqClientException = new MQClientException(info, exception);
            if (callTimeout) {
                throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
            }

            if (exception instanceof MQBrokerException) {
                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
            } else if (exception instanceof RemotingConnectException) {
                mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
            } else if (exception instanceof RemotingTimeoutException) {
                mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
            } else if (exception instanceof MQClientException) {
                mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
            }

            throw mqClientException;
        }
        //.... 下面代码 省略 只看核心代码
    }

DefaultMQProducerImpl.sendKernelImpl 这个方法是核心发送方法

private SendResult sendKernelImpl(final Message msg,
        final MessageQueue mq,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        //找寻broker地址
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        if (null == brokerAddr) {
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

        SendMessageContext context = null;
        if (brokerAddr != null) {
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

            byte[] prevBody = msg.getBody();
            try {
                //for MessageBatch,ID has been set in the generating process
                if (!(msg instanceof MessageBatch)) {
                    MessageClientIDSetter.setUniqID(msg);
                }

                boolean topicWithNamespace = false;
                if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
                    msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
                    topicWithNamespace = true;
                }

                int sysFlag = 0;
                boolean msgBodyCompressed = false;
                if (this.tryToCompressMessage(msg)) {
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                    msgBodyCompressed = true;
                }

                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }

                if (hasCheckForbiddenHook()) {
                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                    checkForbiddenContext.setCommunicationMode(communicationMode);
                    checkForbiddenContext.setBrokerAddr(brokerAddr);
                    checkForbiddenContext.setMessage(msg);
                    checkForbiddenContext.setMq(mq);
                    checkForbiddenContext.setUnitMode(this.isUnitMode());
                    this.executeCheckForbiddenHook(checkForbiddenContext);
                }

                if (this.hasSendMessageHook()) {
                    context = new SendMessageContext();
                    context.setProducer(this);
                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    context.setCommunicationMode(communicationMode);
                    context.setBornHost(this.defaultMQProducer.getClientIP());
                    context.setBrokerAddr(brokerAddr);
                    context.setMessage(msg);
                    context.setMq(mq);
                    context.setNamespace(this.defaultMQProducer.getNamespace());
                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (isTrans != null && isTrans.equals("true")) {
                        context.setMsgType(MessageType.Trans_Msg_Half);
                    }

                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                        context.setMsgType(MessageType.Delay_Msg);
                    }
                    this.executeSendMessageHookBefore(context);
                }

                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }

                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }

                SendResult sendResult = null;
                switch (communicationMode) {
                    case ASYNC:
                        Message tmpMessage = msg;
                        boolean messageCloned = false;
                        if (msgBodyCompressed) {
                            //If msg body was compressed, msgbody should be reset using prevBody.
                            //Clone new message using commpressed message body and recover origin massage.
                            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                            msg.setBody(prevBody);
                        }

                        if (topicWithNamespace) {
                            if (!messageCloned) {
                                tmpMessage = MessageAccessor.cloneMessage(msg);
                                messageCloned = true;
                            }
                            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                        }

                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeAsync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        //真正的核心发送方法 是 rocketMQ 通用的网络层工具在签名介绍过
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
                        break;
                    case ONEWAY:
                    case SYNC:
                        long costTimeSync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeSync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout - costTimeSync,
                            communicationMode,
                            context,
                            this);
                        break;
                    default:
                        assert false;
                        break;
                }

                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }

                return sendResult;
            } catch (RemotingException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (MQBrokerException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (InterruptedException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } finally {
                msg.setBody(prevBody);
                msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
            }
        }

        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }

MQClientAPIImpl.sendMessage 和 MQClientAPIImpl.sendMessageSync

可以发现 下面的发送 MQClientAPIImpl.sendMessage -> MQClientAPIImpl.sendMessageSync ,最后委托给 netty 的,当然在里面还有 如何实现 AsyncSyncOneway 发送的,想深究的话可以具体深入.


    public SendResult sendMessage(
        final String addr,
        final String brokerName,
        final Message msg,
        final SendMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,
        final int retryTimesWhenSendFailed,
        final SendMessageContext context,
        final DefaultMQProducerImpl producer
    ) throws RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        RemotingCommand request = null;
        String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
        boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
        if (isReply) {
            if (sendSmartMsg) {
                SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
            } else {
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
            }
        } else {
            if (sendSmartMsg || msg instanceof MessageBatch) {
                SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
                request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
            } else {
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
            }
        }
        request.setBody(msg.getBody());

        switch (communicationMode) {
            case ONEWAY:
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeAsync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                    retryTimesWhenSendFailed, times, context, producer);
                return null;
            case SYNC:
                long costTimeSync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeSync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
            default:
                assert false;
                break;
        }

        return null;
    }

    private SendResult sendMessageSync(
        final String addr,
        final String brokerName,
        final Message msg,
        final long timeoutMillis,
        final RemotingCommand request
    ) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert response != null;
        return this.processSendResponse(brokerName, msg, response,addr);
    }

Producer 和 NameSever 如何交互

  • Producer 启动 和 发送消息的 代码走读的时候,应该留意了 TopicPublishInfo 这个类

DefaultMQProducerImpl.sendDefaultImpl -> DefaultMQProducerImpl.tryToFindTopicPublishInfo -> MQClientInstance.updateTopicRouteInfoFromNameServer -> MQClientAPIImpl.getTopicRouteInfoFromNameServer

看到先从缓存获取路由信息,发现没有的话 再从 NameServer 获取(this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);)

     /**
     * 获取 topic 对应的路由信息
     * @param topic
     * @return
     */
    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        //从缓存中获取路由信息
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        //如果本地缓存的路由信息无 或者 不是将康的
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            //从 NameServer 获取 Topic信息进行更新更新到本地缓存中
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            //从 NameServer 获取 Topic信息进行更新更新到本地缓存中
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }
//.... 上面代码 省略 只看核心代码
        TopicRouteData topicRouteData;
        if (isDefault && defaultMQProducer != null) {
            topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3);
            if (topicRouteData != null) {
                for (QueueData data : topicRouteData.getQueueDatas()) {
                    int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                    data.setReadQueueNums(queueNums);
                    data.setWriteQueueNums(queueNums);
                }
            }
        } else {
            topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
        }
//.... 下面代码 省略 只看核心代码
    public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
        boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
        GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
        requestHeader.setTopic(topic);
        //构建向 NameServer 发送的消息体
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);
        //开始发送请求并且是同步的方式
        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);assert response != null;
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.TOPIC_NOT_EXIST: {
                if (allowTopicNotExist) {
                    log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
                }

                break;
            }
            case ResponseCode.SUCCESS: {
                byte[] body = response.getBody();
                if (body != null) {
                    return TopicRouteData.decode(body, TopicRouteData.class);
                }
            }
            default:
                break;
        }

        throw new MQClientException(response.getCode(), response.getRemark());
    }

Producer 发送消息如何保证顺序

RocketMQ 发送的消息支持是有序的,消费消息的顺序和发送消息的顺序一致的。当然 Producer 发送的消息要有序,是局部消息有序,要保证发送到同一个Topic下的同一个Queue,这样 Consumer 可以按照 Producer 发送的消息顺序进行消费。

  • 普通顺序消息

正常情况下保证完全的顺序消息, 但是如果发生通讯异常(Broker重启等),由于队列的总数发送变化。哈希取模后定位的队列也会发生变化,产生的消息可能短暂的消息顺序不一致。

  • 严格顺序消息

无论正常异常情况都保证顺序一致, 但是牺牲了分布式的 Failover 特性(Broker集群只要有一台不可用,整个集群都不可用, 服务可用性大大降低) 要保证严格的顺序消息,需要保证 Producer -> MQServer -> Consumer 都是一对一的关系

在 Producer 端保证发送顺序的一致性的是通过保证 顺序消息把同类型的消息发送到同一个 MessageQueue 实现的。核心代码在从 send 方法入手 Producer.send 有个重载的方法 send(Message msg, MessageQueueSelector selector, Object arg), 参数为 MessageQueueSelector selector , 只要实现这个接口,可以通过

    @Override
    public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        msg.setTopic(withNamespace(msg.getTopic()));
        return this.defaultMQProducerImpl.send(msg, selector, arg);
    }
public interface MessageQueueSelector {
    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

Producer 负载均衡如何实现的

DefaultMQProducerImpl.send -> DefaultMQProducerImpl.sendDefaultImpl —> DefaultMQProducerImpl.selectOneMessageQueue -> MQFaultStrategy.selectOneMessageQueue (这边是 是否开启容错策略,即负载均衡策略在内部) -> TopicPublishInfo.selectOneMessageQueue

下面在选择低延迟的 Broker,根据 faultItem 的可用性和开始时间选择的

 /**
     * 在 rocketMQ 中 一个 Topic 包含多个 MessageQueue, 分散在不同的 Broker。
     *
     * 一个 Producer 发送消息的时候顺序是这样的
     *  - 1. Producer 获取消息路由信息( 当本地缓存无路由信息时候、从 NameSrv 获取路由信息 )
     *  - 2. Producer 发送消息到 Broker
     *  - 3. Broker 存储发送的消息  ( 根据不同 Broker 配置, 可同步或异步存储发送消息 )
     *  - 4. Broker 向 Producer 反馈的发送消息的结果
     *
     *  在这个发送过程中可能存在发送失败
     *  队列的选择和容错策略
     *  - 在不开启容错下, 轮询队列进行发送, 如果失败了, 重试的时候过滤失败的 Broker
     *  - 如果开启了容错, 会通过 RocketMQ 的预测机制来预测一个 Broker 是否可用
     *  - 如果上次失败的 Broker 可用, 那么还继续选择该 Broker 队列
     *  - 如果上述失败, 则随机选择一个 Broker 进行发送
     *  - 在发送消息的时候会记录一下调用的时间与发送是否错误,根据时间去预测 故障的Broker 可用时间
     *
     * @param tpInfo
     * @param lastBrokerName
     * @return
     */
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        // 是否开启容错策略
        if (this.sendLatencyFaultEnable) {
            try {
                //选择一个队列, topicInfo ThreadLocalIndex 都 + 1
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                //与队列的长度取模, 根据最后的 pos 取一个队列
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    //判断获取到的队列的Broker 是否在故障列表中, 非故障的则返回
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        return mq;
                }
                //代码走到这边, 说明没有非故障的队列 只能从故障列表中获取一个队列
                //如果这个 topic 下的所有的队列都是故障的话, 那么就从故障列表中取出一个 Broker
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                //获取这个 Broker 的可写队列数, 如果该 Broker 没有可写的队列, 则返回 -1
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    // 选择队列, 通过与队列的长度取模确定队列的位置
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    //没有可写的队列,直接从故障列表中移除
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
            //如果故障列表中没有可写的队列, 直接从Topic Publish Info中取一个
            return tpInfo.selectOneMessageQueue();
        }
        // 如果没有开启故障延迟, 直接从 Topic Publish Info 通过取模的方式获取队列,
        // 如果LastBrokerName不为空,则需要过滤掉brokerName=lastBrokerName的队列
        // 没有开启故障延迟的话,走轮询模式
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

核心在下述方法中,TopicPublishInfo.selectOneMessageQueue

    /**
     *  轮询模式
     */
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int index = this.sendWhichQueue.getAndIncrement();
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }

Producer 延迟消息(发送的时候设置Message的延迟级别)

延迟消息

消息发送到 Broker, 不能立刻被 Consumer 消费,只有到了特定的时间点或者等待特定的时间后,才能被消费。

延迟消息两种类型

  • 定时消息 (特定的时间 level
  • 任意的时间精度的延迟消息

RocketMQ 不支持 任意时间度的延迟消息, 因为如果需要支持,在Broker层面需要做大量的消息排序,还需要持久化,那么会产生巨大的性能开销。 所以 RocketMQ 只支持 定时消息。

回头看 Message#setDelayTimeLevel方法

     public void setDelayTimeLevel(int level) {
        this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
    }
comments powered by Disqus