RocketMQ源码阅读 通信组件
RocketMQ 核心基石
前面已经介绍了 RocketMQ
的基本的概念和组件。今天我们开启真正的源码的阅读诗篇, RocketMQ
消息系各个组件 Producer
、Consumer
、Broker
、NameSrv
通通离不开交互,那是使用的什么交互的呢。答案是TCP长链接。
而 RocketMQ
开源代码内部,对通信相关的进行了一次封装,都在 rocketmq-remoting 模块下,这个模块被其他 client
、broker
、namesrv
应用。
直接先说 remoting
的实现是基于 netty
做了封装、启动了服务端和客户端,支持三种消息的发送方式:
- 同步发送
- 单向发送 (不需要关注响应)
- 异步发送
下图为异步通信流程
remoting 包下的核心接口体系
接口 RemotingService
public interface RemotingService {
// 开启
void start();
// 关闭
void shutdown();
// 注册 RPCHook
void registerRPCHook(RPCHook rpcHook);
}
接口 RemotingServer
public interface RemotingServer extends RemotingService {
// 注册请求类型的处理器 【common 模块的 org.apache.rocketmq.common.protocol.RequestCode]
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
// 注册默认的处理器
void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
// 本地的端口
int localListenPort();
// 根据 requestCode 获取处理器和业务线程池
Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
// 同步发送
RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
RemotingTimeoutException;
// 异步发送
void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
// 单向发送
void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException;
}
实现 NettyRemotingServer
这边选择性的进行摘取记录描述啊
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
// todo 此处代码省略号...
// 构造
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
//调用 NettyRemotingAbstract 的构造初始化相关配置
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
//开启一个ServerBootstrap
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
this.channelEventListener = channelEventListener;
//默认的 React
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
//根据系统环境或指的配置选择使用 Epoll 还是 Nio 模式
if (useEpoll()) {
//构建 EventLoopGroup 只有1个线程
this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
}
});
//构建 eventLoopGroupSelector
this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
} else {
// todo 此处代码省略号...
}
//夹在SSL上下文
loadSslContext();
}
//todo 此处代码省略号...
// 核心的 启动方法
@Override
public void start() {
//构建 Worker线程,根据配置
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
// 准备共享式的Hanlders 包含了SSL 、编解码、连接管理、业务handler
prepareSharableHandlers();
//开启一个 Netty Server
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
// 开启一个 Netty Server
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
// 开启扫描 ResponseTable 的检查线程
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
//todo 此处代码省略号...
// 这个是核心代码
@ChannelHandler.Sharable
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
// 调用消息处理的方法 这个方法 在 NettyRemotingAbstract 类内上
processMessageReceived(ctx, msg);
}
}
//todo 此处代码省略号...
}
public abstract class NettyRemotingAbstract {
// 根据消息的类型进行处理,是请求还是响应的CMD
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
//处理请求指令
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
//处理响应指令
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
}
好了,到现在我们大体的 remoting server 端的大体的入口和核心处理方法差不多了,后面补充一下 RocketMQ Netty Reactor 多线程的设计
简单概括: 1-N-M1-M2
模型
线程数 | 线程名 | 线程具体说明 | 代码默认值 |
---|---|---|---|
1 | NettyBoss_%d | Reactor主线程 | 1 |
N | NettyServerEPOLLSelector_%d_%d | Reactor线程池 | 3 |
M1 | NettyServerCodecThread_%d | Worker线程池 | 8 |
M2 | RemotingExecutorThread_%d | 业务processor 处理线程池 | 8 |
- [Reactor主线程]
1
, 一个Reactor主线程
(eventLoopGroupBoss
) 负责监听TCP
网络链接请求,建立好链接,创建SocketChannel
、并注册到selector
上。 - [Reactor线程池]
N=3
,RocketMQ
根据OS或者配置
选择NIO
还是Epoll
,然后监听真正的网络数据。后面拿到数据后,丢给Reactor线程池
(eventLoopGroupSelector
), - [Worker线程池]
M1=8
, 在执行业务逻辑之前的SSL验证
、编解码
、空闲检查
、网络连接管理
等等,这些工作交给了defaultEventExecutorGroup
- [业务
processor
处理线程池]M2=8
, 处理业务操作的放在了业务线程池来执行,根据RemotingCommand
的业务码 code
在processorTable
缓存中获取到对应的processor
, 然后封装成Task任务
,提交给业务processor
处理线程池来执行 (eg:sendMessageExecutor
,消息发送为例)
备注: 特别要说明 业务processor
处理线程池 不一定是8
,具体看代码:
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer{
//todo .....
//注册Processor,一般 executor != null
@Override
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
ExecutorService executorThis = executor;
if (null == executor) {
executorThis = this.publicExecutor;
}
Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
this.processorTable.put(requestCode, pair);
}
//注册默认的Processor,一般 executor != null
@Override
public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
}
//todo .....
}
这边结合理解差不多了。
RemotingCommand
上面看见了在处理消息的时候一个很核心的类 下面来看一下这个类 org.apache.rocketmq.remoting.protocol.RemotingCommand
它其实是 rocketmq的指令协议
Header字段 | 类型 | Request说明 | Response说明 |
---|---|---|---|
code | int | 请求操作码,应答方根据不同的请求码进行不同的业务处理 | 应答响应码。0表示成功,非0则表示各种错误 |
language | LanguageCode | 请求方实现的语言 | 应答方实现的语言 |
version | int | 请求方程序的版本 | 应答方程序的版本 |
opaque | int | 相当于requestId,在同一个连接上的不同请求标识码,与响应消息中的相对应 | 应答不做修改直接返回 |
flag | int | 区分是普通RPC还是onewayRPC得标志 | 区分是普通RPC还是onewayRPC得标志 |
remark | String | 传输自定义文本信息 | 传输自定义文本信息 |
extFields | HashMap<String, String> | 请求自定义扩展信息 | 响应自定义扩展信息 |
协议编码后的样子
4个部分:
- 消息长度: 总长度, 4 个字节存储, int 类型
- 序列话类型&消息头长度: int 类型, 第一个字节表示序列化类型, 后面三个字节表示消息头长度
- 消息头数据: 经过序列化后的消息头
- 消息主体数据: 消息主体的二进制字节数据
public class RemotingCommand {
public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type";
public static final String SERIALIZE_TYPE_ENV = "ROCKETMQ_SERIALIZE_TYPE";
public static final String REMOTING_VERSION_KEY = "rocketmq.remoting.version";
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND
private static final int RPC_ONEWAY = 1; // 0, RPC
private static final Map<Class<? extends CommandCustomHeader>, Field[]> CLASS_HASH_MAP =
new HashMap<Class<? extends CommandCustomHeader>, Field[]>();
private static final Map<Class, String> CANONICAL_NAME_CACHE = new HashMap<Class, String>();
// 1, Oneway
// 1, RESPONSE_COMMAND
private static final Map<Field, Boolean> NULLABLE_FIELD_CACHE = new HashMap<Field, Boolean>();
private static final String STRING_CANONICAL_NAME = String.class.getCanonicalName();
private static final String DOUBLE_CANONICAL_NAME_1 = Double.class.getCanonicalName();
private static final String DOUBLE_CANONICAL_NAME_2 = double.class.getCanonicalName();
private static final String INTEGER_CANONICAL_NAME_1 = Integer.class.getCanonicalName();
private static final String INTEGER_CANONICAL_NAME_2 = int.class.getCanonicalName();
private static final String LONG_CANONICAL_NAME_1 = Long.class.getCanonicalName();
private static final String LONG_CANONICAL_NAME_2 = long.class.getCanonicalName();
private static final String BOOLEAN_CANONICAL_NAME_1 = Boolean.class.getCanonicalName();
private static final String BOOLEAN_CANONICAL_NAME_2 = boolean.class.getCanonicalName();
private static volatile int configVersion = -1;
private static AtomicInteger requestId = new AtomicInteger(0);
private static SerializeType serializeTypeConfigInThisServer = SerializeType.JSON;
static {
final String protocol = System.getProperty(SERIALIZE_TYPE_PROPERTY, System.getenv(SERIALIZE_TYPE_ENV));
if (!isBlank(protocol)) {
try {
serializeTypeConfigInThisServer = SerializeType.valueOf(protocol);
} catch (IllegalArgumentException e) {
throw new RuntimeException("parser specified protocol error. protocol=" + protocol, e);
}
}
}
// 请求码操作
private int code;
// 语言类型
private LanguageCode language = LanguageCode.JAVA;
// 版本
private int version = 0;
// requestId,标记请求响应是一个映射的
private int opaque = requestId.getAndIncrement();
// 区分是普通RPC还是onewayRPC得标志
private int flag = 0;
// 传输自定义文本信息
private String remark;
// 请求自定义扩展信息
private HashMap<String, String> extFields;
private transient CommandCustomHeader customHeader;
private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
private transient byte[] body;
protected RemotingCommand() {
}
//创建 requestCommand
public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
RemotingCommand cmd = new RemotingCommand();
cmd.setCode(code);
cmd.customHeader = customHeader;
setCmdVersion(cmd);
return cmd;
}
//设置程序版本
private static void setCmdVersion(RemotingCommand cmd) {
if (configVersion >= 0) {
cmd.setVersion(configVersion);
} else {
String v = System.getProperty(REMOTING_VERSION_KEY);
if (v != null) {
int value = Integer.parseInt(v);
cmd.setVersion(value);
configVersion = value;
}
}
}
public static RemotingCommand createResponseCommand(Class<? extends CommandCustomHeader> classHeader) {
return createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, "not set any response code", classHeader);
}
public static RemotingCommand createResponseCommand(int code, String remark,
Class<? extends CommandCustomHeader> classHeader) {
RemotingCommand cmd = new RemotingCommand();
cmd.markResponseType();
cmd.setCode(code);
cmd.setRemark(remark);
setCmdVersion(cmd);
if (classHeader != null) {
try {
CommandCustomHeader objectHeader = classHeader.newInstance();
cmd.customHeader = objectHeader;
} catch (InstantiationException e) {
return null;
} catch (IllegalAccessException e) {
return null;
}
}
return cmd;
}
public static RemotingCommand createResponseCommand(int code, String remark) {
return createResponseCommand(code, remark, null);
}
//解码
public static RemotingCommand decode(final byte[] array) {
ByteBuffer byteBuffer = ByteBuffer.wrap(array);
return decode(byteBuffer);
}
//解码
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
int length = byteBuffer.limit();
int oriHeaderLen = byteBuffer.getInt();
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;
}
public static int getHeaderLength(int length) {
return length & 0xFFFFFF;
}
//消息头解码
private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
switch (type) {
case JSON:
RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
resultJson.setSerializeTypeCurrentRPC(type);
return resultJson;
case ROCKETMQ:
RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
resultRMQ.setSerializeTypeCurrentRPC(type);
return resultRMQ;
default:
break;
}
return null;
}
public static SerializeType getProtocolType(int source) {
return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
}
public static int createNewRequestId() {
return requestId.getAndIncrement();
}
public static SerializeType getSerializeTypeConfigInThisServer() {
return serializeTypeConfigInThisServer;
}
private static boolean isBlank(String str) {
int strLen;
if (str == null || (strLen = str.length()) == 0) {
return true;
}
for (int i = 0; i < strLen; i++) {
if (!Character.isWhitespace(str.charAt(i))) {
return false;
}
}
return true;
}
//设置协议类型
public static byte[] markProtocolType(int source, SerializeType type) {
byte[] result = new byte[4];
result[0] = type.getCode();
result[1] = (byte) ((source >> 16) & 0xFF);
result[2] = (byte) ((source >> 8) & 0xFF);
result[3] = (byte) (source & 0xFF);
return result;
}
public void markResponseType() {
int bits = 1 << RPC_TYPE;
this.flag |= bits;
}
public CommandCustomHeader readCustomHeader() {
return customHeader;
}
public void writeCustomHeader(CommandCustomHeader customHeader) {
this.customHeader = customHeader;
}
public CommandCustomHeader decodeCommandCustomHeader(
Class<? extends CommandCustomHeader> classHeader) throws RemotingCommandException {
CommandCustomHeader objectHeader;
try {
objectHeader = classHeader.newInstance();
} catch (InstantiationException e) {
return null;
} catch (IllegalAccessException e) {
return null;
}
//解码定制的扩展信息字段
if (this.extFields != null) {
Field[] fields = getClazzFields(classHeader);
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
String fieldName = field.getName();
if (!fieldName.startsWith("this")) {
try {
String value = this.extFields.get(fieldName);
if (null == value) {
if (!isFieldNullable(field)) {
throw new RemotingCommandException("the custom field <" + fieldName + "> is null");
}
continue;
}
field.setAccessible(true);
String type = getCanonicalName(field.getType());
Object valueParsed;
if (type.equals(STRING_CANONICAL_NAME)) {
valueParsed = value;
} else if (type.equals(INTEGER_CANONICAL_NAME_1) || type.equals(INTEGER_CANONICAL_NAME_2)) {
valueParsed = Integer.parseInt(value);
} else if (type.equals(LONG_CANONICAL_NAME_1) || type.equals(LONG_CANONICAL_NAME_2)) {
valueParsed = Long.parseLong(value);
} else if (type.equals(BOOLEAN_CANONICAL_NAME_1) || type.equals(BOOLEAN_CANONICAL_NAME_2)) {
valueParsed = Boolean.parseBoolean(value);
} else if (type.equals(DOUBLE_CANONICAL_NAME_1) || type.equals(DOUBLE_CANONICAL_NAME_2)) {
valueParsed = Double.parseDouble(value);
} else {
throw new RemotingCommandException("the custom field <" + fieldName + "> type is not supported");
}
field.set(objectHeader, valueParsed);
} catch (Throwable e) {
log.error("Failed field [{}] decoding", fieldName, e);
}
}
}
}
objectHeader.checkFields();
}
return objectHeader;
}
private Field[] getClazzFields(Class<? extends CommandCustomHeader> classHeader) {
Field[] field = CLASS_HASH_MAP.get(classHeader);
if (field == null) {
field = classHeader.getDeclaredFields();
synchronized (CLASS_HASH_MAP) {
CLASS_HASH_MAP.put(classHeader, field);
}
}
return field;
}
private boolean isFieldNullable(Field field) {
if (!NULLABLE_FIELD_CACHE.containsKey(field)) {
Annotation annotation = field.getAnnotation(CFNotNull.class);
synchronized (NULLABLE_FIELD_CACHE) {
NULLABLE_FIELD_CACHE.put(field, annotation == null);
}
}
return NULLABLE_FIELD_CACHE.get(field);
}
private String getCanonicalName(Class clazz) {
String name = CANONICAL_NAME_CACHE.get(clazz);
if (name == null) {
name = clazz.getCanonicalName();
synchronized (CANONICAL_NAME_CACHE) {
CANONICAL_NAME_CACHE.put(clazz, name);
}
}
return name;
}
// 编码
public ByteBuffer encode() {
// 1> header length size
int length = 4;
// 2> header data length
byte[] headerData = this.headerEncode();
length += headerData.length;
// 3> body data length
if (this.body != null) {
length += body.length;
}
ByteBuffer result = ByteBuffer.allocate(4 + length);
// length
result.putInt(length);
// header length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
result.put(headerData);
// body data;
if (this.body != null) {
result.put(this.body);
}
result.flip();
return result;
}
private byte[] headerEncode() {
this.makeCustomHeaderToNet();
if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
return RocketMQSerializable.rocketMQProtocolEncode(this);
} else {
return RemotingSerializable.encode(this);
}
}
public void makeCustomHeaderToNet() {
if (this.customHeader != null) {
Field[] fields = getClazzFields(customHeader.getClass());
if (null == this.extFields) {
this.extFields = new HashMap<String, String>();
}
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
String name = field.getName();
if (!name.startsWith("this")) {
Object value = null;
try {
field.setAccessible(true);
value = field.get(this.customHeader);
} catch (Exception e) {
log.error("Failed to access field [{}]", name, e);
}
if (value != null) {
this.extFields.put(name, value.toString());
}
}
}
}
}
}
public ByteBuffer encodeHeader() {
return encodeHeader(this.body != null ? this.body.length : 0);
}
public ByteBuffer encodeHeader(final int bodyLength) {
// 1> header length size
int length = 4;
// 2> header data length
byte[] headerData;
headerData = this.headerEncode();
length += headerData.length;
// 3> body data length
length += bodyLength;
ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
// length
result.putInt(length);
// header length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
result.put(headerData);
result.flip();
return result;
}
public void markOnewayRPC() {
int bits = 1 << RPC_ONEWAY;
this.flag |= bits;
}
@JSONField(serialize = false)
public boolean isOnewayRPC() {
int bits = 1 << RPC_ONEWAY;
return (this.flag & bits) == bits;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
@JSONField(serialize = false)
public RemotingCommandType getType() {
if (this.isResponseType()) {
return RemotingCommandType.RESPONSE_COMMAND;
}
return RemotingCommandType.REQUEST_COMMAND;
}
@JSONField(serialize = false)
public boolean isResponseType() {
int bits = 1 << RPC_TYPE;
return (this.flag & bits) == bits;
}
public LanguageCode getLanguage() {
return language;
}
public void setLanguage(LanguageCode language) {
this.language = language;
}
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
public int getOpaque() {
return opaque;
}
public void setOpaque(int opaque) {
this.opaque = opaque;
}
public int getFlag() {
return flag;
}
public void setFlag(int flag) {
this.flag = flag;
}
public String getRemark() {
return remark;
}
public void setRemark(String remark) {
this.remark = remark;
}
public byte[] getBody() {
return body;
}
public void setBody(byte[] body) {
this.body = body;
}
public HashMap<String, String> getExtFields() {
return extFields;
}
public void setExtFields(HashMap<String, String> extFields) {
this.extFields = extFields;
}
public void addExtField(String key, String value) {
if (null == extFields) {
extFields = new HashMap<String, String>();
}
extFields.put(key, value);
}
@Override
public String toString() {
return "RemotingCommand [code=" + code + ", language=" + language + ", version=" + version + ", opaque=" + opaque + ", flag(B)="
+ Integer.toBinaryString(flag) + ", remark=" + remark + ", extFields=" + extFields + ", serializeTypeCurrentRPC="
+ serializeTypeCurrentRPC + "]";
}
public SerializeType getSerializeTypeCurrentRPC() {
return serializeTypeCurrentRPC;
}
public void setSerializeTypeCurrentRPC(SerializeType serializeTypeCurrentRPC) {
this.serializeTypeCurrentRPC = serializeTypeCurrentRPC;
}
}