package demo.test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.Before;
import org.junit.Test;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.MultiThreadedClaimStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.SingleThreadedClaimStrategy;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
public class DisruptorConceptProofTest {
private long _journalistCount = 0L;
// 用来验证日志线程看到的最后一个事件值
private int _lastEventValue = 0;
// 用来验证日志线程看到了所有生产线程产生的事件
private int _journalistValueSum = 0;
// 将事件保存在硬盘里的书记员线程
final EventHandler<TicketEvent> _journalist = new EventHandler<TicketEvent>() {
public void onEvent(final TicketEvent event, final long sequence,
final boolean endOfBatch) throws Exception {
_journalistCount++;
_lastEventValue = event.getValue();
_journalistValueSum += _lastEventValue;
}
};
private long _replicatorCount = 0L;
// 用来验证备份线程看到了所有生产线程产生的事件
private int _replicatorValueSum = 0;
// 将事件发送到备份服务器保存的备份线程
final EventHandler<TicketEvent> _replicator = new EventHandler<TicketEvent>() {
public void onEvent(final TicketEvent event, final long sequence,
final boolean endOfBatch) throws Exception {
_replicatorCount++;
_replicatorValueSum += event.getValue();
}
};
final EventHandler<TicketEvent> _eventProcessor = new EventHandler<TicketEvent>() {
public void onEvent(final TicketEvent event, final long sequence,
final boolean endOfBatch) throws Exception {
System.out.println("[processor] ===="
+ Thread.currentThread().getName() + "===>"
+ sequence);
}
};
private int RING_SIZE = 128;
private final ExecutorService EXECUTOR = Executors.newCachedThreadPool();
@Before
public void setUp() throws Exception {
_journalistCount = _replicatorCount = _lastEventValue = 0;
}
@Test
public void demo() throws Exception {
演示disruptor的基本用法();
}
private void 演示disruptor的基本用法() throws Exception {
RingBuffer<TicketEvent> ringBuffer = new RingBuffer<TicketEvent>(
TicketPoolService.INSTANCE, new MultiThreadedClaimStrategy(
RING_SIZE), new YieldingWaitStrategy());
SequenceBarrier barrier = ringBuffer.newBarrier();
System.err.println(Thread.currentThread().getName());
// // 注册日志线程
// BatchEventProcessor<TicketEvent> journalist = new BatchEventProcessor<TicketEvent>(
// ringBuffer, barrier, _journalist);
// ringBuffer.setGatingSequences(journalist.getSequence());
// EXECUTOR.submit(journalist);
//
// // 注册备份线程
// BatchEventProcessor<TicketEvent> replicator = new BatchEventProcessor<TicketEvent>(
// ringBuffer, barrier, _replicator);
// ringBuffer.setGatingSequences(replicator.getSequence());
// EXECUTOR.submit(replicator);
BatchEventProcessor<TicketEvent> _event = new BatchEventProcessor<TicketEvent>(
ringBuffer, barrier, _eventProcessor);
ringBuffer.setGatingSequences(_event.getSequence());
EXECUTOR.submit(_event);
for (int i = 0; i < RING_SIZE; ++i) {
long sequence = ringBuffer.next();
TicketEvent event = ringBuffer.get(sequence);
event.setValue(i);
ringBuffer.publish(sequence);
}
// 显式等待两个线程执行完毕,因为我现在还不知道如何更好的等待
// 理论上来说,应该是在某个时候使用eventprocessor.halt函数的
// 因为这个系统应该是不停循环处理的
// Thread.sleep(1000);
// assertEquals(RING_SIZE, _journalistCount);
// assertEquals(RING_SIZE, _replicatorCount);
// 对于日志和备份线程,应该是串行执行每一个事件的
// assertEquals(RING_SIZE - 1, _lastEventValue);
// int expected = (0 + RING_SIZE - 1) * RING_SIZE / 2;
// assertEquals(expected, _journalistValueSum);
// assertEquals(expected, _replicatorValueSum);
}
// 下面这个测试用例和"演示disruptor的基本用法"的作用一致
@SuppressWarnings("unchecked")
private void 演示disruptor的dsl用法() throws Exception {
Disruptor<TicketEvent> disruptor = new Disruptor<TicketEvent>(
TicketPoolService.INSTANCE, EXECUTOR,
new MultiThreadedClaimStrategy(RING_SIZE),
new YieldingWaitStrategy());
// 注册日志和备份线程
disruptor.handleEventsWith(_journalist);
disruptor.handleEventsWith(_replicator);
disruptor.handleEventsWith(_eventProcessor);
// 启动disruptor,等待publish事件
RingBuffer<TicketEvent> ringBuffer = disruptor.start();
// 添加一些事件
for (int i = 0; i < RING_SIZE; ++i) {
long sequence = ringBuffer.next();
TicketEvent event = ringBuffer.get(sequence);
event.setValue(i);
ringBuffer.publish(sequence);
}
// 显式等待两个线程执行完毕,因为我现在还不知道如何更好的等待
// 理论上来说,应该是在某个时候使用eventprocessor.halt函数的
// 因为这个系统应该是不停循环处理的
// Thread.sleep(1000);
// assertEquals(RING_SIZE, _journalistCount);
// assertEquals(RING_SIZE, _replicatorCount);
// 对于日志和备份线程,应该是串行执行每一个事件的
// assertEquals(RING_SIZE - 1, _lastEventValue);
// 还有一个问题,就是确认所有事件是否真的已经处理了?
// int expected = (0 + RING_SIZE - 1) * RING_SIZE / 2;
// assertEquals(expected, _journalistValueSum);
// assertEquals(expected, _replicatorValueSum);
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
disruptor jar包+Demo+Api
共122个文件
html:111个
java:4个
jar:3个
4星 · 超过85%的资源 需积分: 40 130 下载量 136 浏览量
2013-04-10
09:56:05
上传
评论 3
收藏 581KB ZIP 举报
温馨提示
disruptor 的jar包和api 自带了一个简单的例子。带注释 不是官方的例子。 不过是转别人的。 看了之后很容易明白。
资源推荐
资源详情
资源评论
收起资源包目录
disruptor jar包+Demo+Api (122个子文件)
stylesheet.css 1KB
inherit.gif 57B
index-all.html 94KB
Sequence.html 50KB
Disruptor.html 32KB
package-summary.html 28KB
Sequencer.html 25KB
SequenceBarrier.html 25KB
SingleThreadedClaimStrategy.html 24KB
AbstractMultithreadedClaimStrategy.html 23KB
WorkerPool.html 20KB
package-use.html 20KB
Histogram.html 19KB
EventHandler.html 19KB
AlertException.html 19KB
BatchEventProcessor.html 18KB
EventHandlerGroup.html 18KB
ClaimStrategy.html 18KB
SleepingWaitStrategy.html 18KB
BusySpinWaitStrategy.html 18KB
BlockingWaitStrategy.html 17KB
YieldingWaitStrategy.html 17KB
MultiThreadedClaimStrategy.html 17KB
RingBuffer.html 17KB
WorkProcessor.html 17KB
ExceptionHandler.html 17KB
SequenceGroup.html 16KB
AggregateEventHandler.html 16KB
FatalExceptionHandler.html 15KB
IgnoreExceptionHandler.html 15KB
RingBuffer.html 15KB
WaitStrategy.html 15KB
EventProcessor.html 15KB
NoOpEventProcessor.html 15KB
PaddedLong.html 14KB
EventHandlerGroup.html 14KB
ClaimStrategy.html 14KB
WaitStrategy.html 14KB
overview-tree.html 14KB
SequenceBarrier.html 14KB
MultiThreadedLowContentionClaimStrategy.html 14KB
package-tree.html 14KB
Util.html 13KB
EventFactory.html 13KB
EventPublisher.html 13KB
Sequence.html 13KB
AlertException.html 11KB
WorkHandler.html 11KB
ExceptionHandler.html 11KB
TimeoutException.html 11KB
InsufficientCapacityException.html 11KB
EventProcessor.html 11KB
MutableLong.html 11KB
InsufficientCapacityException.html 11KB
SequenceReportingEventHandler.html 11KB
EventTranslator.html 10KB
EventHandler.html 10KB
BatchDescriptor.html 10KB
BatchDescriptor.html 10KB
help-doc.html 10KB
ExceptionHandlerSetting.html 9KB
WorkHandler.html 9KB
LifecycleAware.html 9KB
EventTranslator.html 9KB
Sequencer.html 9KB
AbstractMultithreadedClaimStrategy.html 8KB
EventFactory.html 8KB
ExceptionHandlerSetting.html 8KB
Histogram.html 8KB
LifecycleAware.html 8KB
TimeoutException.html 8KB
MutableLong.html 8KB
package-use.html 7KB
package-summary.html 7KB
package-use.html 7KB
serialized-form.html 7KB
package-use.html 7KB
package-summary.html 7KB
package-tree.html 6KB
allclasses-frame.html 6KB
package-summary.html 6KB
package-tree.html 6KB
constant-values.html 6KB
package-tree.html 6KB
overview-summary.html 6KB
MultiThreadedLowContentionClaimStrategy.html 6KB
SequenceReportingEventHandler.html 6KB
SingleThreadedClaimStrategy.html 6KB
MultiThreadedClaimStrategy.html 6KB
deprecated-list.html 6KB
IgnoreExceptionHandler.html 6KB
FatalExceptionHandler.html 6KB
AggregateEventHandler.html 6KB
PaddedLong.html 6KB
SleepingWaitStrategy.html 6KB
BlockingWaitStrategy.html 6KB
YieldingWaitStrategy.html 6KB
BusySpinWaitStrategy.html 6KB
BatchEventProcessor.html 6KB
Disruptor.html 6KB
共 122 条
- 1
- 2
tim.huang
- 粉丝: 61
- 资源: 4
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
- 1
- 2
- 3
前往页