并发框架Disruptor使用

Disruptor描述

Disruptor是一个开源的并发框架,使用事件源驱动方式,能够在无锁的情况下实现网络的Queue并发操作

Disruptor为什么那么快

  • Disruptor不用锁,取而代之的是在需要确保操作线程安全的地方,我们使用CAS(Compare And Swap/Set)操作,它是一个CPU级别的指令(CPU去更新一个值,如果想改得值不再是原来的值,操作就失败,因为有其他操作先改变了这个值),CAS操作比锁消耗的资源少的多,因为它们之间在CPU上操作
  • 神奇的缓存行填充,通过增加补全来确保Ring Buffer的序列号不会和其他东西同时存在于一个缓存行中,因此没有伪共享和非预期的竞争
  • 使用内存屏障,它是CPU指令,它允许你对数据什么时候对其他进程可见性作出假设,在java里,你使用volatile关键字来实现内存屏障,使用volatile意味着你不用被迫选择加锁,并且还能让你获得性能上的提升,但是,你需要对你的设计进行一些更细致的思考,特别是你对volatile字段的使用有多频繁,以及对它们的读写有多频繁。

Disruptor工作原理

  • RingBuffer到底是什么?
    1.它是一个首尾相接的环,你可以把它用做在不同上下文间传递数据的buffer,它拥有一个序号,这个序号指向数组中下一个可用的元素,随着你不停的填充这个buffer,这个序号会一直增长。
    2.环形buffer是没有尾指针,我们只维护了一个指向下一个可用位置的序号,而且与常用的队列的区别是,我们不用删除buffer中的数据,也就是说这些数据一直存放在buffer中,直到新的数据覆盖它们。
    3.ringbuffer采用的数据结构是数组,所以要比链表快

Disruptor示例代码

在pom文件加入Disruptor的jar包
<dependency>
	<groupId>com.lmax</groupId>
	<artifactId>disruptor</artifactId>
	<version>3.2.0</version>
</dependency>

import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;

public class DisruptorExample {
	private static RingBuffer<ValueEvent> ringBuffer;

	public static void main(String[] args) {
		init();
		testStart();
	}
 
	/**
	 * 初始化Disruptor
	 */
	@SuppressWarnings("unchecked")
	public static void init() {
		ExecutorService exec = Executors.newCachedThreadPool();
		// preallocate RingBuffer with 1024 ValueEvents
		Disruptor<ValueEvent> disruptor = new Disruptor<ValueEvent>(
				ValueEvent.EVENT_FACTORY, 1024, exec);
		final EventHandler<ValueEvent> handler = new EventHandler<ValueEvent>() {
			// event will eventually be recycled by the Disruptor after it wraps
			public void onEvent(final ValueEvent event, final long sequence,
					final boolean endOfBatch) {
				handlerWrite(event);
			}
		};
		// Build dependency graph
		disruptor.handleEventsWith(handler);
		ringBuffer = disruptor.start();
	}

	/**
	 * 测试方法入口
	 */
	public static void testStart() {
		for (long i = 1; i < 2000; i++) {
			String uuid = UUID.randomUUID().toString();
			ValueEvent valueEvent = new ValueEvent();
			valueEvent.setValue(uuid);
			write(uuid);
		}
	}

	/**
	 * 写入数据
	 */
	public static void write(String value) {
		long seq = ringBuffer.next();
		ValueEvent valueEvent = ringBuffer.get(seq);
		valueEvent.setValue(value);
		valueEvent.setSeq(seq);
		ringBuffer.publish(seq);
	}

	/**
	 * 写入到数据库或者磁盘上,目前从控制台输出
	 * 
	 * @param valueEvent
	 */
	public static void handlerWrite(ValueEvent valueEvent) {
		System.out.println("Sequenece:" + valueEvent.getSeq());
		System.out.println("ValueEvent:" + valueEvent.getValue());
	}

}


import com.lmax.disruptor.EventFactory;
public class ValueEvent {
	private String value;
	private long seq;
	public long getSeq() {
		return seq;
	}
	public void setSeq(long seq) {
		this.seq = seq;
	}
	public String getValue() {
		return value;
	}
	public void setValue(String value) {
		this.value = value;
	}
	public final static EventFactory<ValueEvent> EVENT_FACTORY=new EventFactory<ValueEvent>(){
		public ValueEvent newInstance(){
			return new ValueEvent();
		}
	};
}

----EOF-----

Categories: concurrency Tags: concurrency