RocketMQ源码阅读 通信组件

RocketMQ 核心基石

前面已经介绍了 RocketMQ 的基本的概念和组件。今天我们开启真正的源码的阅读诗篇, RocketMQ 消息系各个组件 ProducerConsumerBrokerNameSrv 通通离不开交互,那是使用的什么交互的呢。答案是TCP长链接。 而 RocketMQ 开源代码内部,对通信相关的进行了一次封装,都在 rocketmq-remoting 模块下,这个模块被其他 clientbrokernamesrv 应用。

直接先说 remoting 的实现是基于 netty 做了封装、启动了服务端和客户端,支持三种消息的发送方式:

  • 同步发送
  • 单向发送 (不需要关注响应)
  • 异步发送

下图为异步通信流程 rocketmq_remoting

remoting 包下的核心接口体系

remoting uml

接口 RemotingService

public interface RemotingService {
    // 开启
    void start();
    // 关闭
    void shutdown();
    // 注册 RPCHook
    void registerRPCHook(RPCHook rpcHook);
}

接口 RemotingServer

public interface RemotingServer extends RemotingService {

    // 注册请求类型的处理器 【common 模块的 org.apache.rocketmq.common.protocol.RequestCode]
    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
        final ExecutorService executor);
    // 注册默认的处理器
    void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);

    // 本地的端口
    int localListenPort();

    // 根据 requestCode 获取处理器和业务线程池
    Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);

    // 同步发送
    RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
        final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
        RemotingTimeoutException;
    // 异步发送
    void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback) throws InterruptedException,
        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
    // 单向发送
    void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException;

}

实现 NettyRemotingServer

这边选择性的进行摘取记录描述啊

public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
    // todo 此处代码省略号...
    // 构造
    public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
                               final ChannelEventListener channelEventListener) {
        //调用 NettyRemotingAbstract 的构造初始化相关配置
        super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
        //开启一个ServerBootstrap
        this.serverBootstrap = new ServerBootstrap();
        this.nettyServerConfig = nettyServerConfig;
        this.channelEventListener = channelEventListener;
        
        //默认的 React
        int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
        if (publicThreadNums <= 0) {
            publicThreadNums = 4;
        }

        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
            }
        });

        //根据系统环境或指的配置选择使用 Epoll 还是 Nio 模式
        if (useEpoll()) {
            //构建 EventLoopGroup 只有1个线程
            this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
                }
            });
            //构建 eventLoopGroupSelector
            this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        } else {
            // todo 此处代码省略号...
        }
        //夹在SSL上下文
        loadSslContext();
    }
    
    //todo 此处代码省略号...

    // 核心的 启动方法
    @Override
    public void start() {
        //构建 Worker线程,根据配置
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
                nettyServerConfig.getServerWorkerThreads(),
                new ThreadFactory() {

                    private AtomicInteger threadIndex = new AtomicInteger(0);

                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                    }
                });
        // 准备共享式的Hanlders 包含了SSL 、编解码、连接管理、业务handler
        prepareSharableHandlers();

        //开启一个 Netty Server
        ServerBootstrap childHandler =
                this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                        .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        .option(ChannelOption.SO_REUSEADDR, true)
                        .option(ChannelOption.SO_KEEPALIVE, false)
                        .childOption(ChannelOption.TCP_NODELAY, true)
                        .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                        .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                        .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline()
                                        .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                                        .addLast(defaultEventExecutorGroup,
                                                encoder,
                                                new NettyDecoder(),
                                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                                connectionManageHandler,
                                                serverHandler
                                        );
                            }
                        });

        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
            // 开启一个 Netty Server
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }

        // 开启扫描 ResponseTable 的检查线程
        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }
    
    //todo 此处代码省略号...

    // 这个是核心代码
    @ChannelHandler.Sharable
    class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            // 调用消息处理的方法  这个方法 在 NettyRemotingAbstract 类内上
            processMessageReceived(ctx, msg);
        }
    }
    //todo 此处代码省略号...
}

public abstract class NettyRemotingAbstract {
    
    // 根据消息的类型进行处理,是请求还是响应的CMD
    public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    //处理请求指令
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    //处理响应指令
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }
}

好了,到现在我们大体的 remoting server 端的大体的入口和核心处理方法差不多了,后面补充一下 RocketMQ Netty Reactor 多线程的设计

rocketmq_reactor_thread

简单概括: 1-N-M1-M2 模型

线程数线程名线程具体说明代码默认值
1NettyBoss_%dReactor主线程1
NNettyServerEPOLLSelector_%d_%dReactor线程池3
M1NettyServerCodecThread_%dWorker线程池8
M2RemotingExecutorThread_%d业务processor处理线程池8
  • [Reactor主线程] 1, 一个 Reactor主线程(eventLoopGroupBoss) 负责监听 TCP网络链接请求,建立好链接,创建SocketChannel、并注册到selector上。
  • [Reactor线程池] N=3, RocketMQ 根据OS或者配置选择NIO还是Epoll,然后监听真正的网络数据。后面拿到数据后,丢给 Reactor线程池 (eventLoopGroupSelector),
  • [Worker线程池] M1=8, 在执行业务逻辑之前的 SSL验证编解码空闲检查网络连接管理等等,这些工作交给了 defaultEventExecutorGroup
  • [业务processor处理线程池] M2=8, 处理业务操作的放在了业务线程池来执行,根据 RemotingCommand业务码 codeprocessorTable 缓存中获取到对应的 processor, 然后封装成Task任务,提交给业务processor处理线程池来执行 (eg: sendMessageExecutor,消息发送为例)

备注: 特别要说明 业务processor处理线程池 不一定是8,具体看代码:

public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer{
    //todo .....
    //注册Processor,一般 executor != null
    @Override
    public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
      ExecutorService executorThis = executor;
      if (null == executor) {
        executorThis = this.publicExecutor;
      }

      Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
      this.processorTable.put(requestCode, pair);
    }
    
    //注册默认的Processor,一般 executor != null
    @Override
    public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
      this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
    }
    //todo .....
}

这边结合理解差不多了。

RemotingCommand

上面看见了在处理消息的时候一个很核心的类 下面来看一下这个类 org.apache.rocketmq.remoting.protocol.RemotingCommand

它其实是 rocketmq的指令协议

Header字段类型Request说明Response说明
codeint请求操作码,应答方根据不同的请求码进行不同的业务处理应答响应码。0表示成功,非0则表示各种错误
languageLanguageCode请求方实现的语言应答方实现的语言
versionint请求方程序的版本应答方程序的版本
opaqueint相当于requestId,在同一个连接上的不同请求标识码,与响应消息中的相对应应答不做修改直接返回
flagint区分是普通RPC还是onewayRPC得标志区分是普通RPC还是onewayRPC得标志
remarkString传输自定义文本信息传输自定义文本信息
extFieldsHashMap<String, String>请求自定义扩展信息响应自定义扩展信息

协议编码后的样子 rocketmq_protocol_length

4个部分:

  • 消息长度: 总长度, 4 个字节存储, int 类型
  • 序列话类型&消息头长度: int 类型, 第一个字节表示序列化类型, 后面三个字节表示消息头长度
  • 消息头数据: 经过序列化后的消息头
  • 消息主体数据: 消息主体的二进制字节数据
public class RemotingCommand {
  public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type";
  public static final String SERIALIZE_TYPE_ENV = "ROCKETMQ_SERIALIZE_TYPE";
  public static final String REMOTING_VERSION_KEY = "rocketmq.remoting.version";
  private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
  private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND
  private static final int RPC_ONEWAY = 1; // 0, RPC
  private static final Map<Class<? extends CommandCustomHeader>, Field[]> CLASS_HASH_MAP =
          new HashMap<Class<? extends CommandCustomHeader>, Field[]>();
  private static final Map<Class, String> CANONICAL_NAME_CACHE = new HashMap<Class, String>();
  // 1, Oneway
  // 1, RESPONSE_COMMAND
  private static final Map<Field, Boolean> NULLABLE_FIELD_CACHE = new HashMap<Field, Boolean>();
  private static final String STRING_CANONICAL_NAME = String.class.getCanonicalName();
  private static final String DOUBLE_CANONICAL_NAME_1 = Double.class.getCanonicalName();
  private static final String DOUBLE_CANONICAL_NAME_2 = double.class.getCanonicalName();
  private static final String INTEGER_CANONICAL_NAME_1 = Integer.class.getCanonicalName();
  private static final String INTEGER_CANONICAL_NAME_2 = int.class.getCanonicalName();
  private static final String LONG_CANONICAL_NAME_1 = Long.class.getCanonicalName();
  private static final String LONG_CANONICAL_NAME_2 = long.class.getCanonicalName();
  private static final String BOOLEAN_CANONICAL_NAME_1 = Boolean.class.getCanonicalName();
  private static final String BOOLEAN_CANONICAL_NAME_2 = boolean.class.getCanonicalName();
  private static volatile int configVersion = -1;
  private static AtomicInteger requestId = new AtomicInteger(0);

  private static SerializeType serializeTypeConfigInThisServer = SerializeType.JSON;

  static {
    final String protocol = System.getProperty(SERIALIZE_TYPE_PROPERTY, System.getenv(SERIALIZE_TYPE_ENV));
    if (!isBlank(protocol)) {
      try {
        serializeTypeConfigInThisServer = SerializeType.valueOf(protocol);
      } catch (IllegalArgumentException e) {
        throw new RuntimeException("parser specified protocol error. protocol=" + protocol, e);
      }
    }
  }

  // 请求码操作
  private int code;
  // 语言类型
  private LanguageCode language = LanguageCode.JAVA;
  // 版本
  private int version = 0;
  // requestId,标记请求响应是一个映射的
  private int opaque = requestId.getAndIncrement();
  // 区分是普通RPC还是onewayRPC得标志
  private int flag = 0;
  // 传输自定义文本信息
  private String remark;
  // 请求自定义扩展信息
  private HashMap<String, String> extFields;
  private transient CommandCustomHeader customHeader;

  private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;

  private transient byte[] body;

  protected RemotingCommand() {
  }

  //创建 requestCommand
  public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
    RemotingCommand cmd = new RemotingCommand();
    cmd.setCode(code);
    cmd.customHeader = customHeader;
    setCmdVersion(cmd);
    return cmd;
  }

  //设置程序版本
  private static void setCmdVersion(RemotingCommand cmd) {
    if (configVersion >= 0) {
      cmd.setVersion(configVersion);
    } else {
      String v = System.getProperty(REMOTING_VERSION_KEY);
      if (v != null) {
        int value = Integer.parseInt(v);
        cmd.setVersion(value);
        configVersion = value;
      }
    }
  }

  public static RemotingCommand createResponseCommand(Class<? extends CommandCustomHeader> classHeader) {
    return createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, "not set any response code", classHeader);
  }

  public static RemotingCommand createResponseCommand(int code, String remark,
                                                      Class<? extends CommandCustomHeader> classHeader) {
    RemotingCommand cmd = new RemotingCommand();
    cmd.markResponseType();
    cmd.setCode(code);
    cmd.setRemark(remark);
    setCmdVersion(cmd);

    if (classHeader != null) {
      try {
        CommandCustomHeader objectHeader = classHeader.newInstance();
        cmd.customHeader = objectHeader;
      } catch (InstantiationException e) {
        return null;
      } catch (IllegalAccessException e) {
        return null;
      }
    }

    return cmd;
  }

  public static RemotingCommand createResponseCommand(int code, String remark) {
    return createResponseCommand(code, remark, null);
  }

  //解码
  public static RemotingCommand decode(final byte[] array) {
    ByteBuffer byteBuffer = ByteBuffer.wrap(array);
    return decode(byteBuffer);
  }

  //解码
  public static RemotingCommand decode(final ByteBuffer byteBuffer) {
    int length = byteBuffer.limit();
    int oriHeaderLen = byteBuffer.getInt();
    int headerLength = getHeaderLength(oriHeaderLen);

    byte[] headerData = new byte[headerLength];
    byteBuffer.get(headerData);

    RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

    int bodyLength = length - 4 - headerLength;
    byte[] bodyData = null;
    if (bodyLength > 0) {
      bodyData = new byte[bodyLength];
      byteBuffer.get(bodyData);
    }
    cmd.body = bodyData;

    return cmd;
  }

  public static int getHeaderLength(int length) {
    return length & 0xFFFFFF;
  }

  //消息头解码
  private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
    switch (type) {
      case JSON:
        RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
        resultJson.setSerializeTypeCurrentRPC(type);
        return resultJson;
      case ROCKETMQ:
        RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
        resultRMQ.setSerializeTypeCurrentRPC(type);
        return resultRMQ;
      default:
        break;
    }

    return null;
  }

  public static SerializeType getProtocolType(int source) {
    return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
  }

  public static int createNewRequestId() {
    return requestId.getAndIncrement();
  }

  public static SerializeType getSerializeTypeConfigInThisServer() {
    return serializeTypeConfigInThisServer;
  }

  private static boolean isBlank(String str) {
    int strLen;
    if (str == null || (strLen = str.length()) == 0) {
      return true;
    }
    for (int i = 0; i < strLen; i++) {
      if (!Character.isWhitespace(str.charAt(i))) {
        return false;
      }
    }
    return true;
  }

  //设置协议类型
  public static byte[] markProtocolType(int source, SerializeType type) {
    byte[] result = new byte[4];

    result[0] = type.getCode();
    result[1] = (byte) ((source >> 16) & 0xFF);
    result[2] = (byte) ((source >> 8) & 0xFF);
    result[3] = (byte) (source & 0xFF);
    return result;
  }

  public void markResponseType() {
    int bits = 1 << RPC_TYPE;
    this.flag |= bits;
  }

  public CommandCustomHeader readCustomHeader() {
    return customHeader;
  }

  public void writeCustomHeader(CommandCustomHeader customHeader) {
    this.customHeader = customHeader;
  }

  public CommandCustomHeader decodeCommandCustomHeader(
          Class<? extends CommandCustomHeader> classHeader) throws RemotingCommandException {
    CommandCustomHeader objectHeader;
    try {
      objectHeader = classHeader.newInstance();
    } catch (InstantiationException e) {
      return null;
    } catch (IllegalAccessException e) {
      return null;
    }

    //解码定制的扩展信息字段
    if (this.extFields != null) {

      Field[] fields = getClazzFields(classHeader);
      for (Field field : fields) {
        if (!Modifier.isStatic(field.getModifiers())) {
          String fieldName = field.getName();
          if (!fieldName.startsWith("this")) {
            try {
              String value = this.extFields.get(fieldName);
              if (null == value) {
                if (!isFieldNullable(field)) {
                  throw new RemotingCommandException("the custom field <" + fieldName + "> is null");
                }
                continue;
              }

              field.setAccessible(true);
              String type = getCanonicalName(field.getType());
              Object valueParsed;

              if (type.equals(STRING_CANONICAL_NAME)) {
                valueParsed = value;
              } else if (type.equals(INTEGER_CANONICAL_NAME_1) || type.equals(INTEGER_CANONICAL_NAME_2)) {
                valueParsed = Integer.parseInt(value);
              } else if (type.equals(LONG_CANONICAL_NAME_1) || type.equals(LONG_CANONICAL_NAME_2)) {
                valueParsed = Long.parseLong(value);
              } else if (type.equals(BOOLEAN_CANONICAL_NAME_1) || type.equals(BOOLEAN_CANONICAL_NAME_2)) {
                valueParsed = Boolean.parseBoolean(value);
              } else if (type.equals(DOUBLE_CANONICAL_NAME_1) || type.equals(DOUBLE_CANONICAL_NAME_2)) {
                valueParsed = Double.parseDouble(value);
              } else {
                throw new RemotingCommandException("the custom field <" + fieldName + "> type is not supported");
              }

              field.set(objectHeader, valueParsed);

            } catch (Throwable e) {
              log.error("Failed field [{}] decoding", fieldName, e);
            }
          }
        }
      }

      objectHeader.checkFields();
    }

    return objectHeader;
  }

  private Field[] getClazzFields(Class<? extends CommandCustomHeader> classHeader) {
    Field[] field = CLASS_HASH_MAP.get(classHeader);

    if (field == null) {
      field = classHeader.getDeclaredFields();
      synchronized (CLASS_HASH_MAP) {
        CLASS_HASH_MAP.put(classHeader, field);
      }
    }
    return field;
  }

  private boolean isFieldNullable(Field field) {
    if (!NULLABLE_FIELD_CACHE.containsKey(field)) {
      Annotation annotation = field.getAnnotation(CFNotNull.class);
      synchronized (NULLABLE_FIELD_CACHE) {
        NULLABLE_FIELD_CACHE.put(field, annotation == null);
      }
    }
    return NULLABLE_FIELD_CACHE.get(field);
  }

  private String getCanonicalName(Class clazz) {
    String name = CANONICAL_NAME_CACHE.get(clazz);

    if (name == null) {
      name = clazz.getCanonicalName();
      synchronized (CANONICAL_NAME_CACHE) {
        CANONICAL_NAME_CACHE.put(clazz, name);
      }
    }
    return name;
  }

  // 编码
  public ByteBuffer encode() {
    // 1> header length size
    int length = 4;

    // 2> header data length
    byte[] headerData = this.headerEncode();
    length += headerData.length;

    // 3> body data length
    if (this.body != null) {
      length += body.length;
    }

    ByteBuffer result = ByteBuffer.allocate(4 + length);

    // length
    result.putInt(length);

    // header length
    result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

    // header data
    result.put(headerData);

    // body data;
    if (this.body != null) {
      result.put(this.body);
    }

    result.flip();

    return result;
  }

  private byte[] headerEncode() {
    this.makeCustomHeaderToNet();
    if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
      return RocketMQSerializable.rocketMQProtocolEncode(this);
    } else {
      return RemotingSerializable.encode(this);
    }
  }

  public void makeCustomHeaderToNet() {
    if (this.customHeader != null) {
      Field[] fields = getClazzFields(customHeader.getClass());
      if (null == this.extFields) {
        this.extFields = new HashMap<String, String>();
      }

      for (Field field : fields) {
        if (!Modifier.isStatic(field.getModifiers())) {
          String name = field.getName();
          if (!name.startsWith("this")) {
            Object value = null;
            try {
              field.setAccessible(true);
              value = field.get(this.customHeader);
            } catch (Exception e) {
              log.error("Failed to access field [{}]", name, e);
            }

            if (value != null) {
              this.extFields.put(name, value.toString());
            }
          }
        }
      }
    }
  }

  public ByteBuffer encodeHeader() {
    return encodeHeader(this.body != null ? this.body.length : 0);
  }

  public ByteBuffer encodeHeader(final int bodyLength) {
    // 1> header length size
    int length = 4;

    // 2> header data length
    byte[] headerData;
    headerData = this.headerEncode();

    length += headerData.length;

    // 3> body data length
    length += bodyLength;

    ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);

    // length
    result.putInt(length);

    // header length
    result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

    // header data
    result.put(headerData);

    result.flip();

    return result;
  }

  public void markOnewayRPC() {
    int bits = 1 << RPC_ONEWAY;
    this.flag |= bits;
  }

  @JSONField(serialize = false)
  public boolean isOnewayRPC() {
    int bits = 1 << RPC_ONEWAY;
    return (this.flag & bits) == bits;
  }

  public int getCode() {
    return code;
  }

  public void setCode(int code) {
    this.code = code;
  }

  @JSONField(serialize = false)
  public RemotingCommandType getType() {
    if (this.isResponseType()) {
      return RemotingCommandType.RESPONSE_COMMAND;
    }

    return RemotingCommandType.REQUEST_COMMAND;
  }

  @JSONField(serialize = false)
  public boolean isResponseType() {
    int bits = 1 << RPC_TYPE;
    return (this.flag & bits) == bits;
  }

  public LanguageCode getLanguage() {
    return language;
  }

  public void setLanguage(LanguageCode language) {
    this.language = language;
  }

  public int getVersion() {
    return version;
  }

  public void setVersion(int version) {
    this.version = version;
  }

  public int getOpaque() {
    return opaque;
  }

  public void setOpaque(int opaque) {
    this.opaque = opaque;
  }

  public int getFlag() {
    return flag;
  }

  public void setFlag(int flag) {
    this.flag = flag;
  }

  public String getRemark() {
    return remark;
  }

  public void setRemark(String remark) {
    this.remark = remark;
  }

  public byte[] getBody() {
    return body;
  }

  public void setBody(byte[] body) {
    this.body = body;
  }

  public HashMap<String, String> getExtFields() {
    return extFields;
  }

  public void setExtFields(HashMap<String, String> extFields) {
    this.extFields = extFields;
  }

  public void addExtField(String key, String value) {
    if (null == extFields) {
      extFields = new HashMap<String, String>();
    }
    extFields.put(key, value);
  }

  @Override
  public String toString() {
    return "RemotingCommand [code=" + code + ", language=" + language + ", version=" + version + ", opaque=" + opaque + ", flag(B)="
            + Integer.toBinaryString(flag) + ", remark=" + remark + ", extFields=" + extFields + ", serializeTypeCurrentRPC="
            + serializeTypeCurrentRPC + "]";
  }

  public SerializeType getSerializeTypeCurrentRPC() {
    return serializeTypeCurrentRPC;
  }

  public void setSerializeTypeCurrentRPC(SerializeType serializeTypeCurrentRPC) {
    this.serializeTypeCurrentRPC = serializeTypeCurrentRPC;
  }
}

感谢 !

comments powered by Disqus