Disruptor和LinkedBlockingQueue性能对比以及分析

原创
2016/11/23 23:35
阅读数 1W

Disruptor和LinkedBlockingQueue简介
Disruptor是Java实现的用于线程间通信的消息组件,其核心是一个Lock-free(无锁)的Ringbuffer;LinkedBlockingQueue是java.util.concurrent包中提供的一个阻塞队列;因为二者之间有很多相同的地方,所以在此进行一次性能的对比。

压力测试
1.针对LinkedBlockingQueue的压测类

public class LinkedBlockingQueueTest {

    public static int eventNum = 5000000;

    public static void main(String[] args) {
        final BlockingQueue<LogEvent> queue = new LinkedBlockingQueue<LogEvent>();
        final long startTime = System.currentTimeMillis();
        new Thread(new Runnable() {

            @Override
            public void run() {
                int i = 0;
                while (i < eventNum) {
                    LogEvent logEvent = new LogEvent(i, "c" + i);
                    try {
                        queue.put(logEvent);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    i++;
                }
            }
        }).start();

        new Thread(new Runnable() {

            @Override
            public void run() {
                int k = 0;
                while (k < eventNum) {
                    try {
                        queue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    k++;
                }
                long endTime = System.currentTimeMillis();
                System.out
                        .println("costTime = " + (endTime - startTime) + "ms");
            }
        }).start();
    }
}

LinkedBlockingQueueTest 实现了一个简单的生产者-消费者模式,一条线程负责插入,另外一条线程负责读取。

public class LogEvent implements Serializable {

    private static final long serialVersionUID = 1L;
    private long logId;
    private String content;
    
    public LogEvent(){
        
    }
    
    public LogEvent(long logId, String content){
        this.logId = logId;
        this.content = content;
    }

    public long getLogId() {
        return logId;
    }

    public void setLogId(long logId) {
        this.logId = logId;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}

LogEvent实体类,Disruptor的压测类中也同样会用到

2.下面是针对Disruptor的压测类,需要引入Disruptor的jar包

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.3.6</version>
</dependency>

Disruptor的压测类

public class DisruptorTest {

    public static void main(String[] args) {
        LogEventFactory factory = new LogEventFactory();
        int ringBufferSize = 65536;
        final Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(factory,
                ringBufferSize, DaemonThreadFactory.INSTANCE,
                ProducerType.SINGLE, new BusySpinWaitStrategy());

        LogEventConsumer consumer = new LogEventConsumer();
        disruptor.handleEventsWith(consumer);
        disruptor.start();
        new Thread(new Runnable() {

            @Override
            public void run() {
                RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
                for (int i = 0; i < LinkedBlockingQueueTest.eventNum; i++) {
                    long seq = ringBuffer.next();
                    LogEvent logEvent = ringBuffer.get(seq);
                    logEvent.setLogId(i);
                    logEvent.setContent("c" + i);
                    ringBuffer.publish(seq);
                }
            }
        }).start();
    }
}

同样为了保证测试数据的准确性,Disruptor使用了ProducerType.SINGLE(单生产者)模式,同时也只使用了一个LogEventConsumer(消费者)

public class LogEventConsumer implements EventHandler<LogEvent> {

    private long startTime;
    private int i;

    public LogEventConsumer() {
        this.startTime = System.currentTimeMillis();
    }

    public void onEvent(LogEvent logEvent, long seq, boolean bool)
            throws Exception {
        i++;
        if (i == LinkedBlockingQueueTest.eventNum) {
            long endTime = System.currentTimeMillis();
            System.out.println(" costTime = " + (endTime - startTime) + "ms");
        }
    }

}

LogEventConsumer 中负责记录开始时间和结束时间以及接受消息的数量,方便统计时间

压测结果统计
测试环境:
操作系统:win7 32位
CPU:Intel Core i3-2350M 2.3GHz 4核
内存:3G
JDK:1.6

分别运行以上两个实例,运行多次取平均值,结果如下:

结果显示Disruptor是LinkedBlockingQueue的1.65倍,测试环境是本人的笔记本电脑,配置有点低所有差距并不是特别明显;同样在公司台式机(win7 64位 – Intel Core i5 4核 – 4g内存 – jdk1.7)显示的结果是3-4倍左右;官方提供的数据是在5倍左右:https://github.com/LMAX-Exchange/disruptor/wiki/Performance-Results

性能差距原因分析
1.lock和cas的差距
LinkedBlockingQueue中使用了锁,如下所示:

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

而Disruptor中提供了cas的无锁支持,提供了BusySpinWaitStrategy策略的支持

2.避免伪共享
缓存系统中是以缓存行(cache line)为单位存储的。缓存行是2的整数幂个连续字节,一般为32-256个字节。最常见的缓存行大小是64个字节。当多线程修改互相独立的变量时,如果这些变量共享同一个缓存行,就会无意中影响彼此的性能,这就是伪共享。

看一个实例:

public class FalseSharing implements Runnable {
    public final static int NUM_THREADS = 4;
    public final static long ITERATIONS = 50000000;
    private final int arrayIndex;

    private static VolatileLong[] longs = new VolatileLong[NUM_THREADS];
    static {
        for (int i = 0; i < longs.length; i++) {
            longs[i] = new VolatileLong();
        }
    }

    public FalseSharing(final int arrayIndex) {
        this.arrayIndex = arrayIndex;
    }

    public static void main(final String[] args) throws Exception {
        final long start = System.currentTimeMillis();
        runTest();
        System.out.println("costTime = " + (System.currentTimeMillis() - start) + "ms");
    }

    private static void runTest() throws InterruptedException {
        Thread[] threads = new Thread[NUM_THREADS];

        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(new FalseSharing(i));
        }

        for (Thread t : threads) {
            t.start();
        }

        for (Thread t : threads) {
            t.join();
        }
    }

    @Override
    public void run() {
        long i = ITERATIONS + 1;
        while (0 != --i) {
            longs[arrayIndex].value = i;
        }
    }

    public final static class VolatileLong {
        public volatile long value = 0L;
        public long p1, p2, p3, p4, p5, p6;
    }
}

分别注释掉VolatileLong 中的public long p1, p2, p3, p4, p5, p6;和不注释掉进行对比,发现不注释掉的性能居然是注释掉性能的4倍,原因就是缓存行大小是64个字节,不注释掉说明一个VolatileLong 对象刚好占用一个缓存行;注释掉的话一个缓存行会被多个变量占用,就会无意中影响彼此的性能。

查看Disruptor的源码会发现很多地方避免了伪共享,比如:

abstract class SingleProducerSequencerPad extends AbstractSequencer
{
    protected long p1, p2, p3, p4, p5, p6, p7;

    public SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy)
    {
        super(bufferSize, waitStrategy);
    }
}

3.Ringbuffer的使用
Disruptor选择使用Ringbuffer来构造lock-free队列,什么事Ringbuffer,可以参考wiki:https://zh.wikipedia.org/wiki/%E7%92%B0%E5%BD%A2%E7%B7%A9%E8%A1%9D%E5%8D%80
数组是预分配的,这样避免了Java GC带来的运行开销。生产者在生产消息或产生事件的时候对Ringbuffer元素中的属性进行更新,而不是替换Ringbuffer中的元素。

占时先整理这三条,肯定还有其他原因

总结
Disruptor的高性能早就被用在了一些第三方库中,比如log4j2,让log4j2在性能上有质的飞越,之前对三种主流日志性能对比:https://my.oschina.net/OutOfMemory/blog/789267

展开阅读全文
加载中
点击加入讨论🔥(5) 发布并加入讨论🔥
5 评论
6 收藏
1
分享
返回顶部
顶部