如何去实现RingBuffer

文引

RingBuffer, 名如其意: 环形缓存区/环形队列,不同于一般的队列,特征是首尾相接。

RingBuffer 如何工作的

RingBuffer 是一个有界的循环的数据结构,主要用于多线程下进行的数据缓存。在持续的写入数据的时候,到达末尾的时候链接到头,效果上达成一个环状。

实现它的方式

它是有界的数组实现,如图。

ringbuffer数组实现

而且我们还要关注到 reader指针writer指针、 头尾相连

  • reader pointer 下一个可读元素
  • writer pointer 下一个可插入的元素 slot
  • 数组头 和 数组尾 相互链接

讨论 ringbuffer 的运行方式

ringbuffer 的关键参数, read seq & write seq

  • reader pointer seq => 从 0 开始,随着读取消耗一个元素 +1
  • writer pointer seq => 从 -1 开始,插入一个元素时候 +1

ringbuffer获取数组Index

可以看出两种 Seq 对 容量进行 Mod 操作可以将 seq 映射到 ringbufferindex 上.

 array_index = seq % capacity

基于上面的思想,我们看 ringbuffer 的核心操作

  • 插入元素 buffer[++writeSeq % capacity] = element, 在插入元素时候先将 writeSeq + 1, 然后Mod操作 插入元素
  • 消费元素 element = buffer[readSeq++ % capacity], 先获取元素,再 redSeq + 1 =》 reqSeq 指向下一个 Slot

消费元素,被消费的元素并不会被删除,而是保留在数组中,直到下次被覆盖。

Empty Buffer 和 Full Buffer

我们要使用 RingBuffer 的时候,因为是环形的,可能出现 writer pointer 追上 reader pointer 的情况。 这时候需要区分场景来处理。

  • 如果数据是旧的值或者中间值(e.g:股票代码价格), 可以直接覆盖数据,不用等待数据被消费。
  • 如果 Reader Pointer 必须消费 Buffer 中所有的旧值(e.g:电商交易),那就需要做 waiting(阻塞等等),直到Buffer有可用的操做的 slot.

Empty Buffer 判定

Writer Seq 比 Reader Seq 小, 则 缓冲区为空

 isEmpty = writerSeq < readSeq

empty-buffer

Full Buffer 判定

Buffer Size 和 Capacity 相等,缓冲区已满,大小等于未读元素的数量

 size = writerSeq - readSeq + 1
 isFull = (size == capacity)

empty-buffer

RingBuffer 的优缺点

RingBuffer 优点:

  • 高效的 FIFO 缓冲区,预分配固定大小的数组,高效率的内存访问模式。
  • Buffer 的操作时间复杂度都是 O(1)(特别消耗元素、不需要移动元素)

RingBuffer 缺点:

  • 设置 Buffer 的大小至关重要, 缓冲区过小,读取速度慢,写入操作阻塞很长。
  • 当动态调整的时候,需要移动数据

代码实现

定义

    public CircularBuffer(int capacity) {
        this.capacity = (capacity < 1) ? DEFAULT_CAPACITY : capacity;
        this.data = (E[]) new Object[this.capacity];
        this.readSeq = 0;
        this.writeSeq = -1;
    }

offer 插入

    public boolean offer(E element) {
        boolean isFull = writeSeq - readSeq + 1 == capacity;
        if (!isFull) {
            int nextWriteSeq = writeSeq + 1;
            data[nextWriteSeq % capacity] = element;
            writeSeq++;
            return true;
        }
        return false;
    }
    

poll 读取

    public E poll() {
        boolean isEmpty = writeSeq < readSeq;
        if (!isEmpty) {
            E nextElement = data[readSeq % capacity];
            readSeq++;
            return nextElement;
        }
        return null;
    }

UT

    @BeforeEach
    public void initBuffer() {
        buffer = new CircularBuffer(10);
        assertTrue(buffer.offer("Wocao"));
    }

    @Test
    public void testOffer() {
        assertTrue(buffer.offer("mmp"));
        assertEquals(2, buffer.size());
    }

    @Test
    public void testPoll() {
        assertTrue(buffer.offer("mmp2"));
        assertEquals("Wocao", buffer.poll());
        assertEquals("mmp2", buffer.poll());
    }

线程安全的

这边只考虑 单一生产者和单一消费者情况, 所以在数组上无争用,可以不使用锁。只需要保证 两个 Seq 的可见性

  • producer 生产数据写入 buffer 并增加 writeSeq
  • consumerbuffer 消费数据,并增加 readSeq

只需要给 writeSeqreadSeq 保证 volatile

    private volatile int readSeq = 0, writeSeq = -1;
    CircularBuffer buffer =  new CircularBuffer(5);

    @Test
    public void testProducerConsumer() {


        new Thread(() -> {
            for (int i = 0; i < 10;) {
                if (buffer.offer("element:" + i)) {
                    System.out.println("produced " + i );
                    i++;
                }
            }
        }).start();

        new Thread(() -> {
            for (int i =0; i < 10; ) {
                String c = (String) buffer.poll();
                if ( c != null) {
                    System.out.println("consumed:" + c);
                }
            }
        }).start();


        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

输出

produced 0
produced 1
produced 2
produced 3
produced 4
produced 5
consumed:element:0
consumed:element:1
consumed:element:2
produced 6
consumed:element:3
produced 7
consumed:element:4
produced 8
consumed:element:5
consumed:element:6
consumed:element:7
consumed:element:8
consumed:element:9
produced 9

可以看出,当生产者线程发现无法写入的时候(在 循环中 等待一个空的 slot),消费者线程从 buffer 中获取到 null,继续执行,不打印。

上面代码实现在 github 代码

comments powered by Disqus