高性能队列Disruptor
背景
Disruptor
是 外汇交易公司LMAX
开发的高性能队列、研发是为了解决内存队列延迟问题。
Disruptor
一般用于线程间的消息传递。
Disruptor GitHub 地址
Disruptor
介绍
理解 Disruptor
最好的方式,选择一个最接近熟悉的样本进行比较。在这个前提下,可以选择 Java
中的 BlockingQueue
.
和队列相似,Disruptor
也是在同一个进程中不同的线程之间进行传递数据的(例如消息或者事件),同时 Disruptor
提供了一些将关键功能和队列分开的特性:
- 向消费者发送多播事件
- 消息者依赖关系图
- 预先为事件分配内存
- 可选的(无锁)
Disruptor
核心概念
在我们理解Disruptor
如何工作之前,了解下核心概念
- Ring Buffer
环形数组设计,为了避免垃圾回收,采用的数组结构,从3.0开始,环形缓冲区主要存储和更新在
Disruptor
中移动的数据(事件) - Sequence
Disruptor
每个消费者(EventProcessor
)维护一个Sequence
,并发的大多数代码都依赖Sequence
值的改动,所以Sequence
支持AtomicLong
的大部分也行, 唯一不同的是Sequence
包含额外的功能来阻止Sequence
和其他值之间的伪共享(false sharing
) - Sequencer
Disruptor
核心逻辑, 两个实现: 单生产者和多生产者。他们实现了生产者与消费者之间的快速传递的并发算法。 - Sequence Barrier
由
Sequencer
生成,包含此Sequencer
发布的Sequence
指针以及依赖的其他消费者的Sequence
。包含了消费者检查是否有可用的事件的代码。 - Wait Strategy
消费者等待事件的策略,这个事件由生产者放入,决定了消费者怎么等待生产者将事件放入
Disruptor
- Event 生产者与消费者传递的事件,完全由用户定义
- EventProcessor
处理事件的主要循环(
main event loop
),包含了一个Sequeuece
. 有一个具体的实现类BatchEventProcessor
- EventHandler 用户实现的接口,代表一个消费者。处理事件。
- Producer 生产者、先获得占位,然后提交事件。
事件广播(
Multicast Events
)事件广播是
Disruptor
与Queue
最大的区别,当你有多个消费者监听一个Disruptor
, 所有的事件将会发布到这个所有的消费者。Disruptor
这一特性被用来需要对同一数据进行多个并行操作的情况。 如在LMAX系统中有三个操作可以同时进行:日志(将数据持久到日志文件中),复制(将数据发送到其他的机器上,以确保存在数据远程副本),业务逻辑处理。 也可以使用WokrerPool
来并行处理不同的事件。如上图。可以看到有3个事件处理程序正在侦听
Disrupto
r(JournalConsumer
,ReplicationConsumer
和ApplicationConsumer
), 这些事件处理程序中的每个将接收Disruptor
中所有可用的消息(按相同顺序)。这允许这些消费者中的每一个并行工作。消费者依赖关系图(
Consumer Dependency Graph
)为了支持实现业务并行处理流程,
Disruptor
提供了多个消费者之间的协作功能。回到上面的例子中,我们可以将journalling
和replication
消费完成他们的业务,后再继续执行业务逻辑流程。 我们称呼这个功能为gating
,gating
发生在两种场景下:- 确保
Producer
不能运行超过Consumer
,可以通多调用RingBuffer.addGatingConsumers()
来增加相关的消费者来完成 - 之前所说的场景,通过必须先完成的
Consumer
的Sequence
的SequenceBarrier
来实现。
- 确保
事件预分配(
Event Preallocation
)Disruptor
的一个目标就是在低延时环境下,减少或异常内存的占用。(在JAVA环境下,需要较少GC停顿的次数)(C/C++环境下,大量的内存分配也是一个问题)可选择的无锁(
Optionally Lock-free
)无锁的
Disruptor
的低延迟的无锁的特性实现细节是都是基于 内存屏障 和 CAS 操作实现的,只有一个场景BlockingWaitStrategy
中使用的Lock
是为了使用Lock
里面的Condition
, 方便消费者线程被Park
时候等待新的事件来触发。许多低延迟系统使用自旋(busy-wait
)来避免使用Condition
造成的抖动 然而,太多的busy-wait
会导致性能下降,特别在CPU资源受限的情况下。
Disruptor
几个核心的设计
Sequence
设计
Sequence 真正计数是 value
采用缓冲行防止 false sharing
。在value
的前后有7个 long
型的填充值,做CPU cache line
填充防止伪共享。
RingBuffer
设计
RingBuffer
是一个环(首尾相接),可以用作不同的上下文(线程)间传递数据的 Buffer
环形设计,每个元素都有个坐标,取得元素通过取mod操作。
是数组设计、非链表。
一般是2^N
次方,这样 sequence & (array length - 1 ) = array index。哈希Map也是这种位运算做的。
RingBuffer
特点
- 数组实现、快速访问
- 元素是覆盖式的,不主动清除
- 神奇的缓存行(缓存是由缓存行组成的,通常64个字节、一个JAVA long 类型 8 字节)
消费者依赖设计
缓存内存加载过程
public long p1, p2, p3, p4, p5, p6, p7; // cache line padding
private volatile long cursor = INITIAL_CURSOR_VALUE;
public long p8, p9, p10, p11, p12, p13, p14; // cache line padding
Disruptor
使用Demo
TransactionOrder
public class TransactionOrder {
private String id;
private double price;
}
TransactionHandler
public class TransactionHandler implements EventHandler<TransactionOrder>, WorkHandler<TransactionOrder> {
public void onEvent(TransactionOrder transactionOrder, long sequence, boolean endOfBatch) throws Exception {
this.onEvent(transactionOrder);
}
public void onEvent(TransactionOrder transactionOrder) throws Exception {
//具体的消费逻辑
transactionOrder.setId(UUID.randomUUID().toString());
}
}
Demo1
public class Demo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
int BUFFER_SIZE = 1024;
int THREAD_NUM = 4;
//createSingleProducer 创建单生产者的 RingBuffer
final RingBuffer<TransactionOrder> ringBuffer =
RingBuffer.createSingleProducer(new EventFactory<TransactionOrder>() {
public TransactionOrder newInstance() {
return new TransactionOrder();
}
}, BUFFER_SIZE, new YieldingWaitStrategy());
//创建线程池
ExecutorService service = Executors.newFixedThreadPool(THREAD_NUM);
//创建 SequenceBarrier
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
//创建消息处理器
BatchEventProcessor<TransactionOrder> eventProcessor =
new BatchEventProcessor<TransactionOrder>(ringBuffer, sequenceBarrier, new TransactionHandler());
//这一部分是让 RingBuffer根据消费者状态进行gating, 只有一个消费者的话可以省略
ringBuffer.addGatingSequences(eventProcessor.getSequence());
//把消息处理器提交到线程池
service.submit(eventProcessor);
Future<?> future = service.submit(new Callable<Void>() {
public Void call() throws Exception {
long seq;
for (int i = 0; i<10000; i++) {
seq = ringBuffer.next(); //ringbuffer 的一个可用区块
ringBuffer.get(seq).setPrice(Math.random() *9999); // 给这个区块放入数据
ringBuffer.publish(seq); //发布数据使得 consumer 可以获取该数据
}
return null;
}
});
future.get(); //等待生产者结束
eventProcessor.halt(); //通知事件
service.shutdown(); //终止线程
}
}
Demo2
public class Demo2 {
public static void main(String[] args) {
int BUFFER_SIZE = 1024;
int THREAD_NUM = 4;
EventFactory<TransactionOrder> eventFactory = new EventFactory<TransactionOrder>() {
public TransactionOrder newInstance() {
return new TransactionOrder();
}
};
RingBuffer<TransactionOrder> ringBuffer = RingBuffer.createSingleProducer(eventFactory, BUFFER_SIZE);
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
ExecutorService service = Executors.newFixedThreadPool(THREAD_NUM);
WorkHandler<TransactionOrder> workHandler = new TransactionHandler();
WorkerPool<TransactionOrder> workerPool =
new WorkerPool<TransactionOrder>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(),
workHandler);
//序列协调者
ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
workerPool.start(service);
for (int i=0; i<8; i++) {
long seq = ringBuffer.next();
ringBuffer.get(seq).setPrice(Math.random() * 9999);
ringBuffer.publish(seq);
}
workerPool.halt();
service.shutdown();
}
}
Demo3
public class Demo3 {
public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
int BUFFER_SIZE = 1024;
int THREAD_NUM = 4;
ExecutorService service = Executors.newFixedThreadPool(THREAD_NUM);
Disruptor<TransactionOrder> disruptor = new Disruptor<TransactionOrder>(new EventFactory<TransactionOrder>() {
public TransactionOrder newInstance() {
return new TransactionOrder();
}
}, BUFFER_SIZE, service, ProducerType.SINGLE, new BusySpinWaitStrategy());
/**
* 菱形操作
*/
//使用 disruptor 创建消费组 C1 与 C2
EventHandlerGroup<TransactionOrder> eventHandlerGroup =
disruptor.handleEventsWith(new TransactionHandler(), new TransactionVasConsumer());
//C3
TransactionJmsNotifyHandler jmsNotifyHandler = new TransactionJmsNotifyHandler();
//声明在 C1 和 C2 完事后 执行 JMS消息发送操作(C3)
eventHandlerGroup.then(jmsNotifyHandler);
/**
* 顺序执行
*/
// disruptor.handleEventsWith(new TransactionHandler())
// .then(new TransactionVasConsumer())
// .then(new TransactionJmsNotifyHandler());
/**
* 六边形操作
*/
// TransactionHandler h1 = new TransactionHandler();
// TransactionHandler h2 = new TransactionHandler();
// TransactionHandler h3 = new TransactionHandler();
// TransactionHandler h4 = new TransactionHandler();
// TransactionHandler h5 = new TransactionHandler();
// TransactionHandler h6 = new TransactionHandler();
// disruptor.handleEventsWith(h1, h2);
// disruptor.after(h1).handleEventsWith(h4);
// disruptor.after(h2).handleEventsWith(h5);
// disruptor.after(h4, h5).handleEventsWith(h3);
//启动
disruptor.start();
CountDownLatch latch = new CountDownLatch(1);
//生产者准备
service.submit(new TransactionPubllisher(disruptor, latch));
latch.await(); //等待生产者完事
disruptor.shutdown();
service.shutdown();
System.out.println("总耗时:"+ (System.currentTimeMillis() - start));
}
}