如何去实现RingBuffer
文引
RingBuffer
, 名如其意: 环形缓存区
/环形队列
,不同于一般的队列,特征是首尾相接。
RingBuffer
如何工作的
RingBuffer
是一个有界的循环的数据结构,主要用于多线程下进行的数据缓存。在持续的写入数据的时候,到达末尾的时候链接到头,效果上达成一个环状。
实现它的方式
它是有界的数组实现,如图。
而且我们还要关注到 reader指针
、 writer指针
、 头尾相连
reader pointer
下一个可读元素writer pointer
下一个可插入的元素slot
- 数组头 和 数组尾 相互链接
讨论 ringbuffer 的运行方式
ringbuffer
的关键参数, read seq
& write seq
reader pointer seq
=> 从0
开始,随着读取消耗一个元素+1
writer pointer seq
=> 从-1
开始,插入一个元素时候+1
可以看出两种 Seq
对 容量进行 Mod
操作可以将 seq
映射到 ringbuffer
的 index
上.
array_index = seq % capacity
基于上面的思想,我们看 ringbuffer
的核心操作
- 插入元素
buffer[++writeSeq % capacity] = element
, 在插入元素时候先将writeSeq + 1
, 然后Mod操作
插入元素 - 消费元素
element = buffer[readSeq++ % capacity]
, 先获取元素,再redSeq + 1
=》reqSeq
指向下一个Slot
消费元素,被消费的元素并不会被删除,而是保留在数组中,直到下次被覆盖。
Empty Buffer 和 Full Buffer
我们要使用 RingBuffer
的时候,因为是环形的,可能出现 writer pointer
追上 reader pointer
的情况。
这时候需要区分场景来处理。
- 如果数据是旧的值或者中间值(e.g:股票代码价格), 可以直接覆盖数据,不用等待数据被消费。
- 如果
Reader Pointer
必须消费Buffer
中所有的旧值(e.g:电商交易),那就需要做waiting
(阻塞等等),直到Buffer
有可用的操做的slot
.
Empty Buffer 判定
Writer Seq 比 Reader Seq 小, 则 缓冲区为空
isEmpty = writerSeq < readSeq
Full Buffer 判定
Buffer Size 和 Capacity 相等,缓冲区已满,大小等于未读元素的数量
size = writerSeq - readSeq + 1
isFull = (size == capacity)
RingBuffer 的优缺点
RingBuffer 优点:
- 高效的 FIFO 缓冲区,预分配固定大小的数组,高效率的内存访问模式。
- Buffer 的操作时间复杂度都是 O(1)(特别消耗元素、不需要移动元素)
RingBuffer 缺点:
- 设置 Buffer 的大小至关重要, 缓冲区过小,读取速度慢,写入操作阻塞很长。
- 当动态调整的时候,需要移动数据
代码实现
定义
public CircularBuffer(int capacity) {
this.capacity = (capacity < 1) ? DEFAULT_CAPACITY : capacity;
this.data = (E[]) new Object[this.capacity];
this.readSeq = 0;
this.writeSeq = -1;
}
offer 插入
public boolean offer(E element) {
boolean isFull = writeSeq - readSeq + 1 == capacity;
if (!isFull) {
int nextWriteSeq = writeSeq + 1;
data[nextWriteSeq % capacity] = element;
writeSeq++;
return true;
}
return false;
}
poll 读取
public E poll() {
boolean isEmpty = writeSeq < readSeq;
if (!isEmpty) {
E nextElement = data[readSeq % capacity];
readSeq++;
return nextElement;
}
return null;
}
UT
@BeforeEach
public void initBuffer() {
buffer = new CircularBuffer(10);
assertTrue(buffer.offer("Wocao"));
}
@Test
public void testOffer() {
assertTrue(buffer.offer("mmp"));
assertEquals(2, buffer.size());
}
@Test
public void testPoll() {
assertTrue(buffer.offer("mmp2"));
assertEquals("Wocao", buffer.poll());
assertEquals("mmp2", buffer.poll());
}
线程安全的
这边只考虑 单一生产者和单一消费者情况, 所以在数组上无争用,可以不使用锁。只需要保证 两个 Seq
的可见性
producer
生产数据写入buffer
并增加writeSeq
consumer
从buffer
消费数据,并增加readSeq
只需要给 writeSeq
,readSeq
保证 volatile
private volatile int readSeq = 0, writeSeq = -1;
CircularBuffer buffer = new CircularBuffer(5);
@Test
public void testProducerConsumer() {
new Thread(() -> {
for (int i = 0; i < 10;) {
if (buffer.offer("element:" + i)) {
System.out.println("produced " + i );
i++;
}
}
}).start();
new Thread(() -> {
for (int i =0; i < 10; ) {
String c = (String) buffer.poll();
if ( c != null) {
System.out.println("consumed:" + c);
}
}
}).start();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
输出
produced 0
produced 1
produced 2
produced 3
produced 4
produced 5
consumed:element:0
consumed:element:1
consumed:element:2
produced 6
consumed:element:3
produced 7
consumed:element:4
produced 8
consumed:element:5
consumed:element:6
consumed:element:7
consumed:element:8
consumed:element:9
produced 9
可以看出,当生产者线程发现无法写入的时候(在 循环中 等待一个空的 slot),消费者线程从 buffer 中获取到 null,继续执行,不打印。
上面代码实现在 github 代码