高性能队列Disruptor

背景

Disruptor 是 外汇交易公司LMAX开发的高性能队列、研发是为了解决内存队列延迟问题。 Disruptor 一般用于线程间的消息传递。 Disruptor GitHub 地址

Disruptor 介绍

理解 Disruptor 最好的方式,选择一个最接近熟悉的样本进行比较。在这个前提下,可以选择 Java 中的 BlockingQueue. 和队列相似,Disruptor 也是在同一个进程中不同的线程之间进行传递数据的(例如消息或者事件),同时 Disruptor 提供了一些将关键功能和队列分开的特性:

  • 向消费者发送多播事件
  • 消息者依赖关系图
  • 预先为事件分配内存
  • 可选的(无锁)

Disruptor 核心概念

在我们理解Disruptor如何工作之前,了解下核心概念

  • Ring Buffer 环形数组设计,为了避免垃圾回收,采用的数组结构,从3.0开始,环形缓冲区主要存储和更新在Disruptor中移动的数据(事件)
  • Sequence Disruptor 每个消费者(EventProcessor)维护一个 Sequence,并发的大多数代码都依赖 Sequence 值的改动,所以 Sequence 支持 AtomicLong 的大部分也行, 唯一不同的是 Sequence 包含额外的功能来阻止Sequence和其他值之间的伪共享(false sharing)
  • Sequencer
    Disruptor 核心逻辑, 两个实现: 单生产者和多生产者。他们实现了生产者与消费者之间的快速传递的并发算法。
  • Sequence BarrierSequencer 生成,包含此 Sequencer 发布的 Sequence 指针以及依赖的其他消费者的 Sequence。包含了消费者检查是否有可用的事件的代码。
  • Wait Strategy 消费者等待事件的策略,这个事件由生产者放入,决定了消费者怎么等待生产者将事件放入 Disruptor
  • Event 生产者与消费者传递的事件,完全由用户定义
  • EventProcessor 处理事件的主要循环(main event loop),包含了一个 Sequeuece. 有一个具体的实现类 BatchEventProcessor
  • EventHandler 用户实现的接口,代表一个消费者。处理事件。
  • Producer 生产者、先获得占位,然后提交事件。

Disruptor设计组件图

  • 事件广播(Multicast Events)

    事件广播是 DisruptorQueue 最大的区别,当你有多个消费者监听一个 Disruptor, 所有的事件将会发布到这个所有的消费者。 Disruptor 这一特性被用来需要对同一数据进行多个并行操作的情况。 如在LMAX系统中有三个操作可以同时进行:日志(将数据持久到日志文件中),复制(将数据发送到其他的机器上,以确保存在数据远程副本),业务逻辑处理。 也可以使用WokrerPool来并行处理不同的事件。

    如上图。可以看到有3个事件处理程序正在侦听Disruptor(JournalConsumerReplicationConsumerApplicationConsumer), 这些事件处理程序中的每个将接收Disruptor中所有可用的消息(按相同顺序)。这允许这些消费者中的每一个并行工作。

  • 消费者依赖关系图(Consumer Dependency Graph)

    为了支持实现业务并行处理流程,Disruptor 提供了多个消费者之间的协作功能。回到上面的例子中,我们可以将 journallingreplication 消费完成他们的业务,后再继续执行业务逻辑流程。 我们称呼这个功能为 gating , gating 发生在两种场景下:

    • 确保 Producer 不能运行超过 Consumer ,可以通多调用 RingBuffer.addGatingConsumers() 来增加相关的消费者来完成
    • 之前所说的场景,通过必须先完成的ConsumerSequenceSequenceBarrier来实现。
  • 事件预分配(Event Preallocation)

    Disruptor 的一个目标就是在低延时环境下,减少或异常内存的占用。(在JAVA环境下,需要较少GC停顿的次数)(C/C++环境下,大量的内存分配也是一个问题)

  • 可选择的无锁(Optionally Lock-free)

    无锁的 Disruptor 的低延迟的无锁的特性实现细节是都是基于 内存屏障 和 CAS 操作实现的,只有一个场景 BlockingWaitStrategy 中使用的 Lock 是为了使用 Lock 里面的 Condition, 方便消费者线程被 Park 时候等待新的事件来触发。许多低延迟系统使用自旋(busy-wait)来避免使用 Condition造成的抖动 然而,太多的 busy-wait 会导致性能下降,特别在CPU资源受限的情况下。

Disruptor 几个核心的设计

Sequence 设计

Sequence 设计

Sequence 真正计数是 value 采用缓冲行防止 false sharing。在value的前后有7个 long 型的填充值,做CPU cache line填充防止伪共享。

RingBuffer 设计

RingBuffer 设计

RingBuffer 是一个环(首尾相接),可以用作不同的上下文(线程)间传递数据的 Buffer环形设计,每个元素都有个坐标,取得元素通过取mod操作。 是数组设计、非链表。

一般是2^N次方,这样 sequence & (array length - 1 ) = array index。哈希Map也是这种位运算做的。

RingBuffer 特点

  • 数组实现、快速访问
  • 元素是覆盖式的,不主动清除
  • 神奇的缓存行(缓存是由缓存行组成的,通常64个字节、一个JAVA long 类型 8 字节)

消费者依赖设计

缓存内存加载过程

缓存加载过程 缓存加载过程 缓存加载过程

神奇的解决方式—– 缓存行填充

public long p1, p2, p3, p4, p5, p6, p7; // cache line padding
private volatile long cursor = INITIAL_CURSOR_VALUE;
public long p8, p9, p10, p11, p12, p13, p14; // cache line padding 

Disruptor 使用Demo

TransactionOrder

public class TransactionOrder {
       private String id;
       private double price;
}

TransactionHandler

public class TransactionHandler implements EventHandler<TransactionOrder>, WorkHandler<TransactionOrder> {
    public void onEvent(TransactionOrder transactionOrder,  long sequence, boolean endOfBatch) throws Exception {
        this.onEvent(transactionOrder);
    }

    public void onEvent(TransactionOrder transactionOrder) throws Exception {
        //具体的消费逻辑
        transactionOrder.setId(UUID.randomUUID().toString());
    }
}

Demo1

public class Demo1 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        int BUFFER_SIZE = 1024;
        int THREAD_NUM = 4;

        //createSingleProducer 创建单生产者的 RingBuffer
        final RingBuffer<TransactionOrder> ringBuffer =
                RingBuffer.createSingleProducer(new EventFactory<TransactionOrder>() {
                     public TransactionOrder newInstance() {
                        return new TransactionOrder();
                    }
                }, BUFFER_SIZE, new YieldingWaitStrategy());

        //创建线程池
        ExecutorService service = Executors.newFixedThreadPool(THREAD_NUM);

        //创建 SequenceBarrier
        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();

        //创建消息处理器
        BatchEventProcessor<TransactionOrder> eventProcessor =
                new BatchEventProcessor<TransactionOrder>(ringBuffer, sequenceBarrier, new TransactionHandler());

        //这一部分是让 RingBuffer根据消费者状态进行gating, 只有一个消费者的话可以省略
        ringBuffer.addGatingSequences(eventProcessor.getSequence());

        //把消息处理器提交到线程池
        service.submit(eventProcessor);

        Future<?> future = service.submit(new Callable<Void>() {
            public Void call() throws Exception {
                long seq;
                for (int i = 0; i<10000; i++) {
                    seq = ringBuffer.next(); //ringbuffer 的一个可用区块
                    ringBuffer.get(seq).setPrice(Math.random() *9999); // 给这个区块放入数据
                    ringBuffer.publish(seq); //发布数据使得 consumer 可以获取该数据
                }
                return null;
            }
        });
        future.get(); //等待生产者结束

        eventProcessor.halt(); //通知事件

        service.shutdown(); //终止线程

    }
}

Demo2

public class Demo2 {
    public static void main(String[] args) {
        int BUFFER_SIZE = 1024;
        int THREAD_NUM = 4;

        EventFactory<TransactionOrder> eventFactory = new EventFactory<TransactionOrder>() {
            public TransactionOrder newInstance() {
                return new TransactionOrder();
            }
        };

        RingBuffer<TransactionOrder> ringBuffer = RingBuffer.createSingleProducer(eventFactory, BUFFER_SIZE);

        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();

        ExecutorService service = Executors.newFixedThreadPool(THREAD_NUM);

        WorkHandler<TransactionOrder> workHandler = new TransactionHandler();

        WorkerPool<TransactionOrder> workerPool =
                new WorkerPool<TransactionOrder>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(),
                        workHandler);
        //序列协调者
        ringBuffer.addGatingSequences(workerPool.getWorkerSequences());

        workerPool.start(service);

        for (int i=0; i<8; i++) {
            long seq = ringBuffer.next();
            ringBuffer.get(seq).setPrice(Math.random() * 9999);
            ringBuffer.publish(seq);
        }

        workerPool.halt();
        service.shutdown();
    }
}

Demo3

public class Demo3 {
    public static void main(String[] args) throws InterruptedException {

        long start = System.currentTimeMillis();
        int BUFFER_SIZE = 1024;
        int THREAD_NUM = 4;

        ExecutorService service = Executors.newFixedThreadPool(THREAD_NUM);


        Disruptor<TransactionOrder> disruptor = new Disruptor<TransactionOrder>(new EventFactory<TransactionOrder>() {
            public TransactionOrder newInstance() {
                return new TransactionOrder();
            }
        }, BUFFER_SIZE, service, ProducerType.SINGLE, new BusySpinWaitStrategy());

        /**
         * 菱形操作
         */
        //使用 disruptor 创建消费组 C1 与 C2
        EventHandlerGroup<TransactionOrder> eventHandlerGroup =
                disruptor.handleEventsWith(new TransactionHandler(), new TransactionVasConsumer());
        //C3
        TransactionJmsNotifyHandler jmsNotifyHandler = new TransactionJmsNotifyHandler();
        //声明在 C1 和 C2 完事后 执行 JMS消息发送操作(C3)
        eventHandlerGroup.then(jmsNotifyHandler);

        /**
         * 顺序执行
         */
//        disruptor.handleEventsWith(new TransactionHandler())
//                .then(new TransactionVasConsumer())
//                .then(new TransactionJmsNotifyHandler());
        /**
         * 六边形操作
         */
//        TransactionHandler h1 = new TransactionHandler();
//        TransactionHandler h2 = new TransactionHandler();
//        TransactionHandler h3 = new TransactionHandler();
//        TransactionHandler h4 = new TransactionHandler();
//        TransactionHandler h5 = new TransactionHandler();
//        TransactionHandler h6 = new TransactionHandler();
//        disruptor.handleEventsWith(h1, h2);
//        disruptor.after(h1).handleEventsWith(h4);
//        disruptor.after(h2).handleEventsWith(h5);
//        disruptor.after(h4, h5).handleEventsWith(h3);


        //启动
        disruptor.start();

        CountDownLatch latch = new CountDownLatch(1);
        //生产者准备
        service.submit(new TransactionPubllisher(disruptor, latch));

        latch.await(); //等待生产者完事

        disruptor.shutdown();
        service.shutdown();

        System.out.println("总耗时:"+ (System.currentTimeMillis() - start));
    }
}
comments powered by Disqus