BlockingQueue可以作为生产者和消费者的内存缓冲区,但其不是一个高性能的实现。它使用锁和阻塞等待来实现线程间的同步,在高并发下它的性能并非优越。而ConcurrentLinkedQueue是一个高性能的队列,它大量使用了无锁的CAS操作。
1、无锁缓存框架Disruptor
Disruptor是一个开源的并发框架,能够在无锁的情况下实现网络的Queue并发操作。
Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量级JMS),也可以认为是一个观察者模式的实现,或者事件监听模式的实现。
Disruptor是一个高效的无锁内存队列,它使用无锁的方式实现了一个环形队列(RingBuffer)。这个环形队列内部实现是一个普通的数组,环形队列只需要对外提供一个档期位置cursor,利用这个指针就可以进行入队和出队操作。由于队列是环形的,队列的大小必须事先指定,不能动态扩展。为了快速从一个序列对应到数组的实际位置,Disruptor要求必须将数组的大小设置为2的整数次方,通过sequence&(queueSize-1)就能立即定位到实际的元素位置index。
POM引入Disruptor的jar:
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>

RingBuffer中生产者向缓冲区中写入数据,消费者从中读取数据。写入数据时使用CAS操作,读取数据时,为了防止多个消费者处理同一数据,也使用CAS操作进行数据保护。使用固定大小的环形队列可以做到完全的内存复用,在运行过程中不会有新的空间需要分配或者回收老的空间,减少了系统分配空间或回收空间带来的开销。
Disruptor术语说明:
RingBuffer:Disruptor最主要组件,从3.0开始RingBuffer仅仅负责存储和更新在Disruptor中流通的数据。对一些特殊的使用场景能够被用户(其他数据结构)完全替代。
它是一个环(首尾相接的环),可以把它用作在不同上下文(线程)间传递数据的buffer。RingBuffer拥有一个序号,这个序号(sequence)指向数组中下一个可用元素。随着你不停地填充这个buffer(可能会有相应的读取),这个序号一直增长,直到绕过这个环。要找到数组中当前序号指向的元素,可以通过mod操作:sequence mod array.length = array.index(取模操作)。如果槽的个数是2的N次方更有利于基于二进制的计算机计算。
Sequence:Disruptor使用Sequence来表示一个特殊组件处理的序号。和Disruptor一样,每个消费者(EventProcessor)都维持着一个Sequence。大部分的并发代码依赖这些Sequence值得运转,因此Sequence支持多种当前为AtomicLong类的特性。
Sequencer:这是Disruptor真正的核心。实现了这个接口的两种生产者(单生产者和多生产者)均实现了所有的并发算法,为了在生产者和消费者之间进行准确快速的数据传递。
SequenceBarrier:由Sequence生成,并且包含了已经发布的Sequence的引用,这些Sequence源于Sequenceer和一些独立的消费者的Sequence。它包含了决定是否有供消费者来消费者的Event的逻辑。
WaitStrategy:由Sequence生成,并且包含了已经发布的Sequence的引用,这些Sequence源于Sequenceer和一些独立的消费者的Sequence。它包含了决定是否有供消费者来消费者的Event的逻辑。
WaitStrategy:决定了一个消费者将如何等待生产者将Event置于Disruptor。
Event:从生产者到消费者过程中所处理的数据单元。Disruptor中没有代码表示Event,因此它完全是由用户定义。
EventProcessor:主要时间循环,处理Disruptor中的Event,并且拥有消费者的Sequence。他有一个实现类是BatchEventProcessor,包含了event loop有效的实现,并且将回调到一个EventHandler接口的实现对象。
EventHandler:由用户实现并且代表了Disruptor中的一个消费者的接口。
Producer:由用户实现,它调用RingBuffer来插入事件(Event),在Disruptor中没有相应的实现代码,由用户实现。
WorkProcessor:确保每个sequence只被一个processor消费,在同一个WorkPool中的处理多个WorkProcessor不会消费同样的sequence。
WorkerPool:一个WorkProcessor池,其中WorkProcessor将消费Sequence,所以任务可以在实现WorkHandler接口的worker之间移交。
LifecycleAware:当BatchEventProcessor启动和停止时,实现这个接口用于接收通知。



2、选择合适的策略
当有新数据在Disruptor的环形缓冲区中产生时,消费者如何监控缓冲区中的信息?Disruptor提供了几种策略,这些策略由WaitStrategy接口封装,实现方式如下:
BlockingWaitStrategy:默认的策略。BlockingWaitStrategy和BlockingQueue类似,都使用锁和条件(Condition)进行数据监控和线程的唤醒。因涉及到线程切换,BlockingWaitStrategy策略最节省CPU,但在高并发下是性能最差的一种等待策略。
SleepingWaitStrategy:它在循环中不断等待数据,先进行自旋等待,如果不成功,则使用Thread.yield()让出CPU,并使用LockSupport.parkNanos(1)进行线程休眠,确保不占用过多CPU数据。此策略对于数据处理可能产生较高的平均延时,它适合对于延时要求不高的场景,优点是它对生产者线程的影响最小,比如异步日志。
YieldingWaitStrategy:此策略用于低延时的场景。消费者线程会不断循环监控缓冲区变化,在循环内部,它会使用Thread.yield()让出CPU。适合于对延时有较高要求的系统。使用此策略,相当于消费者线程变成了一个内部执行了Thread.yield()的死循环,最后又多余消费者线程数量的逻辑CPU数量。
BusySpinWaitStrategy:它是一个死循环,消费者线程会尽最大努力疯狂监控缓冲区的变化。它会吃掉所有的CPU资源,适用于对延迟非常严苛的场景。
3、CPU Cache的优化:解决伪共享问题
伪共享问题:为了提高CPU速度,CPU有一个高速缓存cache,在高速缓存中,读写数据的最小单位为缓存行(Cache Line),它是从主存(Memory)中复制到缓存(Cache)的最小单位,一般为32~128字节。如果两个变量放在一个缓存行中,在并发访问中可能会相互影响彼此性能。假设X和Y在同一缓存行,运行在CPU1上的线程更新了X,则CPU2上的缓存行会失效,同一样的Y即使没有修改也会变成无效,导致Cache无法命中。此时如果在CPU2上的线程更新了Y,则导致CPU1上的缓存行失效。这种情况反复发生,如果CPU经常不能命中缓存,那么系统的吞吐量将会急剧下降。

为了避免这种情况,一种方法是在X变量的前后空间都先占据一定的位置,这样当内存被读入缓存中时,这个缓存行中只有X一个变量有效,这样就不会发生多个线程同时修改缓存行中不同变量而导致变量全部失效的情况。


--参考文献《实战Java高并发程序设计》
网友评论