温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Disruptor-07 中有哪些代码范例

发布时间:2021-06-21 17:51:48 来源:亿速云 阅读:133 作者:Leah 栏目:大数据

这篇文章将为大家详细讲解有关Disruptor-07 中有哪些代码范例,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

public class Test {

	private static Logger logger = LogManager.getLogger();

	@SuppressWarnings("unchecked")
	public static void main(String[] args) throws InterruptedException {

		// The factory for the event
		TestEventFactory factory = new TestEventFactory();

		// Specify the size of the ring buffer, must be power of 2.
		int bufferSize = 1024;

		// Construct the Disruptor
		Disruptor<TestEvent> disruptor = new Disruptor<TestEvent>(factory, bufferSize,
				DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());

		// Create EventHandler
		TestEventHandler handler1 = new TestEventHandler("handler1");
		TestEventHandler handler2 = new TestEventHandler("handler2");
		TestEventHandler handler3 = new TestEventHandler("handler3");
		TestEventHandler handler4 = new TestEventHandler("handler4");

		// Connect the handler

		int count = 100;

		// Unicast采取WorkPool方式,3个WorkHandler 累计执行100次。
		// Event到达时,哪个WorkHandler被调度不确定。
		// disruptor.handleEventsWithWorkerPool(handler1, handler2, handler3);

		// MulticastTest并发处理方式,3个EventHandler,各执行100次,累计300次;
		// 每个Event到达时,EventHandler的处理顺序不确定。
		// 并发( handler1, handler2, handler3)
		// EventHandler:handler1--85:k-v
		// EventHandler:handler3--85:k-v
		// EventHandler:handler2--85:k-v

		// EventHandler:handler1--86:k-v
		// EventHandler:handler3--86:k-v
		// EventHandler:handler2--86:k-v <-----并发顺序不确定

		// EventHandler:handler2--87:k-v <-----并发顺序不确定
		// EventHandler:handler1--87:k-v
		// EventHandler:handler3--87:k-v

		// EventHandler:handler3--88:k-v
		// EventHandler:handler2--88:k-v
		// EventHandler:handler1--88:k-v

		// disruptor.handleEventsWith(handler1, handler2, handler3);

		// Pipeline串行处理方式,3个EventHandler,各执行100次,累计300次。
		// 每个Event到达时,EventHandler的处理顺序与handleEventsWith的顺序一致。
		// 顺序:handler1->handler2->handler3
		// EventHandler:handler1--97:k-v
		// EventHandler:handler2--97:k-v
		// EventHandler:handler3--97:k-v

		// EventHandler:handler1--98:k-v
		// EventHandler:handler2--98:k-v
		// EventHandler:handler3--98:k-v

		// EventHandler:handler1--99:k-v
		// EventHandler:handler2--99:k-v
		// EventHandler:handler3--99:k-v

		// EventHandler:handler1--100:k-v
		// EventHandler:handler2--100:k-v
		// EventHandler:handler3--100:k-v
		
		//disruptor.handleEventsWith(handler1).handleEventsWith(handler2).handleEventsWith(handler3);
		
		//Diamond
		//按照 handler1-> 并发(handler2, hander3) ->handler4 调度
		disruptor.handleEventsWith(handler1).handleEventsWith(handler2,handler3).handleEventsWith(handler4);

		// Start the Disruptor, starts all threads running
		disruptor.start();

		// Get the ring buffer from the Disruptor to be used for publishing.
		RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer();

		TestEventProducer producer = new TestEventProducer(ringBuffer);

		for (int i = 1; i <= count; i++) {
			producer.onEvent("k", "v");
			Thread.sleep(100);
		}

		Thread.sleep(10000);
	}

}
public class TestEvent implements Event {

	private String key;
	private String value;

	public String getKey() {
		return key;
	}

	public void setKey(String key) {
		this.key = key;
	}

	public String getValue() {
		return value;
	}

	public void setValue(String value) {
		this.value = value;
	}

}
package com.lands.disruptor.unicast;

import com.lmax.disruptor.EventFactory;

public class TestEventFactory implements EventFactory<TestEvent> {

	public TestEvent newInstance() {
		return new TestEvent();
	}

}
package com.lands.disruptor.unicast;

import java.util.concurrent.atomic.AtomicInteger;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;

public class TestEventHandler implements EventHandler<TestEvent>, WorkHandler<TestEvent> {

	private static Logger logger = LogManager.getLogger();

	private String handlerName;

	private AtomicInteger count = new AtomicInteger();

	public TestEventHandler(String name) {
		this.handlerName = name;
	}

	public String getHandlerName() {
		return handlerName;
	}

	public void onEvent(TestEvent event) throws Exception {

		logger.info("WorkHandler:" + this.handlerName + "-" + count.decrementAndGet() + ":" + event.getKey() + "-"
				+ event.getValue());

		//Thread.sleep(100);
	}

	public void onEvent(TestEvent event, long sequence, boolean endOfBatch) throws Exception {
		logger.info("EventHandler:" + this.handlerName + "-" + count.decrementAndGet() + ":" + event.getKey() + "-"
				+ event.getValue());
		//Thread.sleep(100);
	}

}
package com.lands.disruptor.unicast;

import com.lands.disruptor.EventProducer;
import com.lmax.disruptor.RingBuffer;

public class TestEventProducer extends EventProducer<TestEvent> {

	public TestEventProducer(RingBuffer<TestEvent> ringBuffer) {
		super(ringBuffer);
	}

	@Override
	public void process(TestEvent event, String... data) {

		event.setKey(data[0]);
		event.setValue(data[1]);

	}

}

关于Disruptor-07 中有哪些代码范例就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI