
一. Disruptor简介
1.简介
2.Disruptor官方文档及项目地址
-
gitHub项目地址:https://github.com/LMAX-Exchange/disruptor官方文档地址:https://lmax-exchange.github.io/disruptor/#_read_this_firstuser-guide:https://lmax-exchange.github.io/disruptor/user-guide/index.htmlchangelog:https://lmax-exchange.github.io/disruptor/changelog.html -
3.原理图

二. disruptor-spring-boot-start启动器使用教程
1.项目中引入依赖如下
1.1 gitee坐标
<dependency><groupId>io.gitee.bigbigfeifeigroupId><artifactId>disruptor-spring-boot-startartifactId><version>1.0version>dependency>
1.2 github坐标
<dependency><groupId>io.github.bigbigfeifeigroupId><artifactId>disruptor-spring-boot-startartifactId><version>1.0version>dependency>
<dependency><groupId>com.lmaxgroupId><artifactId>disruptorartifactId><version>3.4.4version>dependency>
启动类上加上@EnableZlfDisruptor注解
3.使用Demo
3.1. DisruptorEventHandler类
package org.example.service.impl;import com.alibaba.fastjson.JSON;import com.lmax.disruptor.EventHandler;import com.zlf.event.DisruptorEvent;import lombok.extern.slf4j.Slf4j;4jpublic class DisruptorEventHandler implements EventHandler<DisruptorEvent> {public void onEvent(DisruptorEvent event, long sequence, boolean endOfBatch) throws Exception {log.info("DisruptorEventHandler.event:{}", JSON.toJSONString(event));//这里测试用,抛异常会被CustomExceptionHandler处理(CustomExceptionHandler会发送springBoot的CustomExceptionHandlerEvent事件,业务监听处理该事件就可以了)throw new RuntimeException(JSON.toJSONString(event));}}
3.2. DisruptorBizListener类
package org.example.service.impl;import com.alibaba.fastjson.JSON;import com.zlf.event.CustomExceptionHandlerEvent;import lombok.extern.slf4j.Slf4j;import org.springframework.context.event.EventListener;import org.springframework.stereotype.Component;@Component@Slf4jpublic class DisruptorBizListener {@EventListenerpublic void disruptorEventListener(CustomExceptionHandlerEvent event) {log.info("DisruptorBizListener.disruptorEventListener.event:{}", JSON.toJSONString(event));}}
3.3. DisruptorHandlerImpl类
package org.example.service.impl;import com.lmax.disruptor.dsl.Disruptor;import com.zlf.handler.ClearingEventHandler;import com.zlf.handler.CustomExceptionHandler;import com.zlf.handler.DisruptorHandler;import lombok.extern.slf4j.Slf4j;4jpublic class DisruptorHandlerImpl implements DisruptorHandler {public void buildHandler(String key, Disruptor disruptor) {log.info("===key:{}自定义handler处理链开始=====", key);disruptor.handleExceptionsWith(new CustomExceptionHandler());disruptor.handleEventsWith(new DisruptorEventHandler()).then(new ClearingEventHandler());log.info("===key:{}自定义handler处理链结束=====", key);}}
3.4. DisruptorController类
package org.example.controller;import cn.hutool.core.lang.Tuple;import com.lmax.disruptor.dsl.ProducerType;import com.zlf.builder.CustomThreadBuilder;import com.zlf.builder.ThreadPoolExecutorBuilder;import com.zlf.builder.WaitStrategyBuilder;import com.zlf.dto.DisruptorCreate;import com.zlf.enums.BlockingQueueTypeEnum;import com.zlf.enums.DisruptorCreateMethodEnum;import com.zlf.enums.RejectedPolicyTypeEnum;import com.zlf.enums.WaitStrategyEnum;import com.zlf.factory.CustomThreadFactory;import com.zlf.factory.ThreadPoolExecutorFactory;import com.zlf.service.DisruptorService;import lombok.extern.slf4j.Slf4j;import org.apache.commons.lang3.StringUtils;import org.example.service.impl.DisruptorHandlerImpl;import org.example.service.impl.ThreadPoolService;import org.springframework.beans.BeansException;import org.springframework.beans.factory.InitializingBean;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.SpringApplication;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;import org.springframework.context.ConfigurableApplicationContext;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.util.Objects;import java.util.concurrent.ThreadFactory;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;/*** 一:初始化disruptor姿势* https://blog.csdn.net/2401_84048205/article/details/137853949** 二:初始化的一些方式:* 1. 静态代码块* static {** }* 2. 构造方法或构造方法注入* public DisruptorController(){** }* 3.@PostConstruct注解* @PostConstruct* public void init() {* //执行构造方法前执行* }** 3. 实现InitializingBean 接口(bean初始化属性设置之后调用afterPropertiesSet)* 4. 实现CommandLineRunner或ApplicationRunner接口* 在run方法中初始化disruptor也是可以的* https://www.freexyz.cn/dev/47789.html* 5. 非静态代码块中* {}* 6. 监听springBoot容器启动完成Event** 三:静态变量、静态代码块、非静态代码块、构造方法的执行顺序* 1. 执行顺序:先父类子类,* 2. 静态的东西属于类:类加载的时候只执行一次* 3. 非静态代码块、构造方法属于实例,* 4. 当类new的时候先执行非静态代码块(非静态代码块按从上到下顺序执行,跟书写位置没有关系,可以写类的任意地方)* 6. 后执行构造方法* 类的实例可以调用类的静态方法、不推荐这种方式*/public class DisruptorController implements InitializingBean, ApplicationContextAware {private ApplicationContext applicationContext;private DisruptorService disruptorService;private DisruptorHandlerImpl disruptorHandler = new DisruptorHandlerImpl();private static final ThreadPoolExecutor executor1 = ThreadPoolService.getInstance();private static final ThreadFactory threadFactory = new CustomThreadFactory(CustomThreadBuilder.builder().name("test888-thread-factory").isDaemon(Boolean.TRUE).build());private static final ThreadPoolExecutor executor2 = new ThreadPoolExecutorFactory(ThreadPoolExecutorBuilder.builder().blockingQueueTypeEnum(BlockingQueueTypeEnum.ARRAY_BLOCKING_QUEUE).defaultCoreSize(100).keepAliveTime(60).unit(TimeUnit.SECONDS).maxQueueSize(800).rejectedPolicyTypeEnum(RejectedPolicyTypeEnum.SYNC_PUT_QUEUE_POLICY).threadFactory(new CustomThreadFactory(CustomThreadBuilder.builder().name("test666-thread-factory").isDaemon(Boolean.TRUE).build())).build()).createThreadPoolExecutor();private static String KEY0;private static String KEY1;private static String KEY2;private static String KEY3;private static String KEY4;private static String KEY5;private static String KEY6;private static String KEY7;/*** @throws Exception*/public void afterPropertiesSet() throws Exception {//方法pushEvent0的disruptor初始化启动方法disruptorInitStart0();//方法pushEvent1的disruptor初始化启动方法disruptorInitStart1();//方法pushEvent2的disruptor初始化启动方法disruptorInitStart2();//方法pushEvent3的disruptor初始化启动方法disruptorInitStart3();//方法pushEvent4的disruptor初始化启动方法disruptorInitStart4();//方法pushEvent5的disruptor初始化启动方法disruptorInitStart5();//方法pushEvent6的disruptor初始化启动方法disruptorInitStart6();//方法pushEvent7的disruptor初始化启动方法disruptorInitStart7();}private Tuple disruptorInitStart0() {Tuple tuple0 = disruptorService.buildKey(DisruptorCreateMethodEnum.CREATE0, "test0");DisruptorCreateMethodEnum.createDcbMaps(tuple0);KEY0 = tuple0.get(1);return disruptorService.createAddHandlerStart1(KEY0, disruptorHandler);}private Tuple disruptorInitStart1() {Tuple tuple1 = disruptorService.buildKey(DisruptorCreateMethodEnum.CREATE1, "test1");DisruptorCreateMethodEnum.createDcbMaps(tuple1);KEY1 = tuple1.get(1);return disruptorService.createAddHandlerStart1(KEY1, disruptorHandler);}private Tuple disruptorInitStart2() {Tuple tuple2 = disruptorService.buildKey(DisruptorCreateMethodEnum.CREATE2, "test2");DisruptorCreateMethodEnum.createDcbMaps(tuple2);KEY2 = tuple2.get(1);return disruptorService.createAddHandlerStart1(KEY2, disruptorHandler);}private Tuple disruptorInitStart3() {Tuple tuple3 = disruptorService.buildKey(DisruptorCreateMethodEnum.CREATE3, "test3");DisruptorCreateMethodEnum.createDcbMaps(tuple3);KEY3 = tuple3.get(1);return disruptorService.createAddHandlerStart1(KEY3, disruptorHandler);}private Tuple disruptorInitStart4() {Tuple tuple4 = disruptorService.buildKey(DisruptorCreateMethodEnum.CREATE4, "test4");DisruptorCreateMethodEnum.createDcbMaps(tuple4);KEY4 = tuple4.get(1);return disruptorService.createAddHandlerStart1(KEY4, disruptorHandler);}private Tuple disruptorInitStart5() {Tuple tuple5 = disruptorService.buildKey(DisruptorCreateMethodEnum.CREATE4, "test5");WaitStrategyBuilder waitStrategyBuilder5 = DisruptorCreateMethodEnum.createWaitStrategyMaps(tuple5);waitStrategyBuilder5.setWaitStrategyEnum(WaitStrategyEnum.YIELD);DisruptorCreateMethodEnum.createExecutorMaps(tuple5, executor1);DisruptorCreate disruptorCreate5 = DisruptorCreateMethodEnum.createDcbMaps(tuple5);disruptorCreate5.setProducerType(ProducerType.MULTI);KEY5 = tuple5.get(1);return disruptorService.createAddHandlerStart1(KEY5, disruptorHandler);}private Tuple disruptorInitStart6() {Tuple tuple6 = disruptorService.buildKey(DisruptorCreateMethodEnum.CREATE4, "test6");WaitStrategyBuilder waitStrategyBuilder6 = DisruptorCreateMethodEnum.createWaitStrategyMaps(tuple6);waitStrategyBuilder6.setWaitStrategyEnum(WaitStrategyEnum.YIELD);DisruptorCreateMethodEnum.createExecutorMaps(tuple6, executor2);DisruptorCreate disruptorCreate6 = DisruptorCreateMethodEnum.createDcbMaps(tuple6);disruptorCreate6.setProducerType(ProducerType.MULTI);KEY6 = tuple6.get(1);return disruptorService.createAddHandlerStart1(KEY6, disruptorHandler);}private Tuple disruptorInitStart7() {Tuple tuple7 = disruptorService.buildKey(DisruptorCreateMethodEnum.CREATE3, "test7");WaitStrategyBuilder waitStrategyBuilder7 = DisruptorCreateMethodEnum.createWaitStrategyMaps(tuple7);waitStrategyBuilder7.setWaitStrategyEnum(WaitStrategyEnum.YIELD);DisruptorCreateMethodEnum.createThreadFactoryMaps(tuple7, threadFactory);DisruptorCreate disruptorCreate7 = DisruptorCreateMethodEnum.createDcbMaps(tuple7);disruptorCreate7.setProducerType(ProducerType.MULTI);KEY7 = tuple7.get(1);return disruptorService.createAddHandlerStart1(KEY7, disruptorHandler);}public String disruptorInitStart(int index) {if (Objects.isNull(index)) {return "index不为空!";}if (0 == index) {disruptorInitStart0();} else if (1 == index) {disruptorInitStart1();} else if (2 == index) {disruptorInitStart2();} else if (3 == index) {disruptorInitStart3();} else if (4 == index) {disruptorInitStart4();} else if (5 == index) {disruptorInitStart5();} else if (6 == index) {disruptorInitStart6();} else if (7 == index) {disruptorInitStart7();}return "ok";}public String showdown(String key) {if (StringUtils.isEmpty(key)) {return "key不为空!";}Tuple tuple = disruptorService.getKeyMaps().get(key);if (Objects.nonNull(tuple) && StringUtils.isNotBlank(tuple.get(1))) {disruptorService.shutdown(tuple.get(1));}return "ok";}/*** 关闭方法1*/public void destroy() {// 在需要关闭容器的时候调用SpringApplication.exit(applicationContext);}/*** 关闭方法2*/public void destroy2() {ConfigurableApplicationContext context = (ConfigurableApplicationContext) applicationContext;context.close();}public String pushEvent0() {disruptorService.pushEvent0(KEY0, "你好,disruptor0");return "ok";}public String pushEvent1() {disruptorService.pushEvent0(KEY1, "你好,disruptor1");return "ok";}public String pushEvent2() {disruptorService.pushEvent0(KEY2, "你好,disruptor2");return "ok";}public String pushEvent3() {disruptorService.pushEvent0(KEY3, "你好,disruptor3");return "ok";}public String pushEvent4() {disruptorService.pushEvent0(KEY4, "你好,disruptor4");return "ok";}public String pushEvent5() {disruptorService.pushEvent0(KEY5, "你好,disruptor5");return "ok";}public String pushEvent6() {disruptorService.pushEvent0(KEY6, "你好,disruptor6");return "ok";}public String pushEvent7() {disruptorService.pushEvent0(KEY7, "你好,disruptor7");return "ok";}public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}}
三.总结
https://github.com/hiwepy/disruptor-spring-boot-starter好文分享
https://developer.aliyun.com/article/1409939https://www.cnblogs.com/konghuanxi/p/17324988.htmlhttps://blog.csdn.net/weixin_43996530/article/details/132721172https://blog.csdn.net/qq_39939541/article/details/131508396https://www.cnblogs.com/konghuanxi/p/17303118.html
https://developer.aliyun.com/article/1409939