这篇文章将为大家详细讲解有关Disruptor-07 中有哪些代码范例,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
public class Test {
private static Logger logger = LogManager.getLogger();
@SuppressWarnings("unchecked")
public static void main(String[] args) throws InterruptedException {
// The factory for the event
TestEventFactory factory = new TestEventFactory();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<TestEvent> disruptor = new Disruptor<TestEvent>(factory, bufferSize,
DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());
// Create EventHandler
TestEventHandler handler1 = new TestEventHandler("handler1");
TestEventHandler handler2 = new TestEventHandler("handler2");
TestEventHandler handler3 = new TestEventHandler("handler3");
TestEventHandler handler4 = new TestEventHandler("handler4");
// Connect the handler
int count = 100;
// Unicast采取WorkPool方式,3个WorkHandler 累计执行100次。
// Event到达时,哪个WorkHandler被调度不确定。
// disruptor.handleEventsWithWorkerPool(handler1, handler2, handler3);
// MulticastTest并发处理方式,3个EventHandler,各执行100次,累计300次;
// 每个Event到达时,EventHandler的处理顺序不确定。
// 并发( handler1, handler2, handler3)
// EventHandler:handler1--85:k-v
// EventHandler:handler3--85:k-v
// EventHandler:handler2--85:k-v
// EventHandler:handler1--86:k-v
// EventHandler:handler3--86:k-v
// EventHandler:handler2--86:k-v <-----并发顺序不确定
// EventHandler:handler2--87:k-v <-----并发顺序不确定
// EventHandler:handler1--87:k-v
// EventHandler:handler3--87:k-v
// EventHandler:handler3--88:k-v
// EventHandler:handler2--88:k-v
// EventHandler:handler1--88:k-v
// disruptor.handleEventsWith(handler1, handler2, handler3);
// Pipeline串行处理方式,3个EventHandler,各执行100次,累计300次。
// 每个Event到达时,EventHandler的处理顺序与handleEventsWith的顺序一致。
// 顺序:handler1->handler2->handler3
// EventHandler:handler1--97:k-v
// EventHandler:handler2--97:k-v
// EventHandler:handler3--97:k-v
// EventHandler:handler1--98:k-v
// EventHandler:handler2--98:k-v
// EventHandler:handler3--98:k-v
// EventHandler:handler1--99:k-v
// EventHandler:handler2--99:k-v
// EventHandler:handler3--99:k-v
// EventHandler:handler1--100:k-v
// EventHandler:handler2--100:k-v
// EventHandler:handler3--100:k-v
//disruptor.handleEventsWith(handler1).handleEventsWith(handler2).handleEventsWith(handler3);
//Diamond
//按照 handler1-> 并发(handler2, hander3) ->handler4 调度
disruptor.handleEventsWith(handler1).handleEventsWith(handler2,handler3).handleEventsWith(handler4);
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer();
TestEventProducer producer = new TestEventProducer(ringBuffer);
for (int i = 1; i <= count; i++) {
producer.onEvent("k", "v");
Thread.sleep(100);
}
Thread.sleep(10000);
}
}
public class TestEvent implements Event {
private String key;
private String value;
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
package com.lands.disruptor.unicast;
import com.lmax.disruptor.EventFactory;
public class TestEventFactory implements EventFactory<TestEvent> {
public TestEvent newInstance() {
return new TestEvent();
}
}
package com.lands.disruptor.unicast;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
public class TestEventHandler implements EventHandler<TestEvent>, WorkHandler<TestEvent> {
private static Logger logger = LogManager.getLogger();
private String handlerName;
private AtomicInteger count = new AtomicInteger();
public TestEventHandler(String name) {
this.handlerName = name;
}
public String getHandlerName() {
return handlerName;
}
public void onEvent(TestEvent event) throws Exception {
logger.info("WorkHandler:" + this.handlerName + "-" + count.decrementAndGet() + ":" + event.getKey() + "-"
+ event.getValue());
//Thread.sleep(100);
}
public void onEvent(TestEvent event, long sequence, boolean endOfBatch) throws Exception {
logger.info("EventHandler:" + this.handlerName + "-" + count.decrementAndGet() + ":" + event.getKey() + "-"
+ event.getValue());
//Thread.sleep(100);
}
}
package com.lands.disruptor.unicast;
import com.lands.disruptor.EventProducer;
import com.lmax.disruptor.RingBuffer;
public class TestEventProducer extends EventProducer<TestEvent> {
public TestEventProducer(RingBuffer<TestEvent> ringBuffer) {
super(ringBuffer);
}
@Override
public void process(TestEvent event, String... data) {
event.setKey(data[0]);
event.setValue(data[1]);
}
}
关于Disruptor-07 中有哪些代码范例就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/blacklands/blog/3086092