基于高性能Disruptor队列实现本地消息

架构小魔方 2024-04-17 03:12:15
一、背景

在企业中台化日益普及的时代,中台作为企业业务架构的指导,通过不断的对于业务领域的细分和过程性的沉淀,在此过程中对于同类通用的业务能力进行重新聚合和重构,在根据相关的限界上下文和业务内聚的原则建立了领域模型,DDD在这个过程中作为领域建模的方法论,发挥了巨大的作用,而各个领域聚合通过领域事件进行解耦,目前主流的做法就是通过异步化消息中间件,比如Rocketmq、kafka进行消息中间件进行通讯,但是在有些多个聚合根同一微服务的场景下,这种跨微服务的消息中间件,又显得有点笨重,不够灵活。本文给大家推荐一款基于高性能Disruptor队列实现的本地消息,一样也能在多个聚合根之间进行事件解耦。

二、什么是Disruptor

Disruptor是一个开源框架,初衷是为了解决高并发下队列锁的问题,能够在无锁的情况下实现队列的并发操作,和JVM目前一些加锁的队列有些不同(ArrayBlockingQueue、LinkedBlockingQueue、ConcurrentLinkedQueue),目前广泛应用于各个开源框架中,比如log4j2用于作为日志的缓冲区,比如Rocketmq中用于定时消息的处理,比如Disruptor 与 Netty 结合大幅提高数据处理性能等

Disruptor框架主要以下几部分

1、RingBuffer——Disruptor底层的数据存储,是各个线程间交换数据的中转地

2、EventProcessor——事件处理器,监听RingBuffer的事件,并消费可用事件,从RingBuffer读取的事件会交由实际的生产者实现类来消费;它会一直侦听下一个可用的序号,直到该序号对应的事件已经准备好。

3、EventHandler——业务处理器,是实际消费者的接口,完成具体的业务逻辑实现,第三方实现该接口;代表着消费者。

4、Producer——生产者接口,第三方线程充当该角色,producer向RingBuffer写入事件。

5、Sequencer——序号管理器,生产同步的实现者,负责消费者/生产者各自序号、序号栅栏的管理和协调,Sequencer有单生产者,多生产者两种不同的模式,里面实现了各种同步的算法;

6、Sequence——序号,声明一个序号,用于跟踪ringbuffer中任务的变化和消费者的消费情况,disruptor里面大部分的并发代码都是通过对Sequence的值同步修改实现的,而非锁,这是disruptor高性能的一个主要原因;

7、SequenceBarrier——序号栅栏,管理和协调生产者的游标序号和各个消费者的序号,确保生产者不会覆盖消费者未来得及处理的消息,确保存在依赖的消费者之间能够按照正确的顺序处理

8、Wait Strategy:Wait Strategy决定了一个消费者怎么等待生产者将事件(Event)放入Disruptor中。

三、Disruptor与Spring boot 整合

要实现一个基于Disruptor的内存消息队列主要分为队列、消费者和生产者。

第一步:基于Disruptor构建消息队列

@Configuration@ConfigurationProperties(prefix = "disruptor.event")public DisruptorConfig { private Integer threadCount = 3000; //指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率), //否则将影响效率 private Integer bufferSize = 1024 * 256; @Bean("disruptor") public Disruptor disruptor() { //定义用于事件处理的线程池, // Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理 ExecutorService executor = Executors.newFixedThreadPool(threadCount); //指定事件工厂 EventFactory factory = new EventFactory(); //单线程模式,获取额外的性能 Disruptor<EventMessage> disruptor = new Disruptor(factory, bufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy()); //设置事件业务处理器---消费者 EventHandler[] consumers = new EventHandler[threadCount]; for (int i = 0; i < threadCount; i++) { consumers[i] = new EventHandler(); } //多消费者不重复消费 disruptor.handleEventsWithWorkerPool(consumers); disruptor.start(); return disruptor; } @Bean("ringBuffer") public RingBuffer<EventMessage> ringBuffer(@Autowired Disruptor disruptor) { //获取ringbuffer环,用于接取生产者生产的事件 return disruptor.getRingBuffer(); }}

这里有几个要特别说明的点

1、如果是基于异步消费,可以定义异步消费线程池ExecutorService,线程池的参数可以通过配置去调整。

2、定义生产者模式,如果消费没有多个生产来源,则指定为单线程模式(SINGLE),这样可以提升生产端的效率,获取额外的性能,定义消费者等待策略(Wait Strategy),目前内置有以下几种,也可以自己去现实

「BlockingWaitStrategy」

Disruptor的默认策略是BlockingWaitStrategy。在BlockingWaitStrategy内部是使用锁和condition来控制线程的唤醒。BlockingWaitStrategy是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现。

「SleepingWaitStrategy」

SleepingWaitStrategy 的性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,通过使用LockSupport.parkNanos(1)来实现循环等待。

「YieldingWaitStrategy」

YieldingWaitStrategy是可以使用在低延迟系统的策略之一。YieldingWaitStrategy将自旋以等待序列增加到适当的值。在循环体内,将调用Thread.yield()以允许其他排队的线程运行。在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性。

「BusySpinWaitStrategy」

性能最好,适合用于低延迟的系统。在要求极高性能且事件处理线程数小于CPU逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性。

「PhasedBackoffWaitStrategy」

自旋 + yield + 自定义策略,CPU资源紧缺,吞吐量和延迟并不重要的场景。

3、设置消费者,这里有不同的消费模式

多消费者不重复消费模式:类似于Rocketmq的集群消息,就是指所有消费者共享同一批消息,同一个消息只会被一个消费者消费,文中上面的例子就是多消费者不重复消费的模式。

//设置事件业务处理器---消费者 EventHandler[] consumers = new EventHandler[threadCount]; for (int i = 0; i < threadCount; i++) { consumers[i] = new EventHandler(); } //多消费者不重复消费 disruptor.handleEventsWithWorkerPool(consumers);

多消费者重复消费模式:类似于Rocketmq的广播消费,就是所有消息均会被所有的消费者都消费一边

//设置事件业务处理器---消费者 EventHandler[] consumers = new EventHandler[threadCount]; for (int i = 0; i < threadCount; i++) { consumers[i] = new EventHandler(); } //多消费者重复消费 disruptor.handleEventsWith(consumers);

消费者编排消费模式:也可以通过EventHandlerGroup 针对同一消息不同消费者之间进行业务组合编排

第二步:实现业务消费者,定义消息事件类,实现消息工厂

@Datapublic EventMessage { private String message; private String type;}public EventFactory implements EventFactory<EventMessage> { @Override public EventMessage newInstance() { return new EventMessage(); }}@Slf4jpublic EventHandler implements WorkHandler<EventMessage>, EventHandler<EventMessage> { @Override public void onEvent(EventMessage eventMessage) { ServiceHandler serviceHandler = BeanUtil.getBean(eventMessage.getType() + "ServiceHandler", ServiceHandler.class); if (serviceHandler == null) { log.error("serviceHandler is null"); return; } serviceHandler.handler(eventMessage.getMessage()); } @Override public void onEvent(EventMessage eventMessage, long sequence, boolean endOfBatch) throws Exception { this.onEvent(eventMessage); }}public interface ServiceHandler { /** * 真正业务处理方法 */ void handler(String message);}

在这里要强调一个地方就是EventHandler作为事件消费的中转枢纽,主要根据不同的消息类型,去找到对应的业务处理类,这里不能交由Spring IOC容器去管理,否则会出现定义多消费者的时候,只能实例化出一个实例,无法做到多消费者模式。如果业务实现类是被spring 托管的,可以通过实现spring 的ApplicationContextAware这个接口,从ApplicationContext中去获取对应的Bean信息。

第三步:构建生产者,产生消息,投递到队列中去。

@Servicepublic Producer { @Resource(name = "ringBuffer") private RingBuffer<EventMessage> ringBuffer; public void send(EventMessage message){ long sequencer = ringBuffer.next(); try { EventMessage event = ringBuffer.get(sequencer); event.setMessage(message.getMessage()); event.setType(message.getType()); } finally { ringBuffer.publish(sequencer); } }}

这里需要强调一点,就是投递消息一定要放到finally中去进行publish,防止出现投递消息阻塞,因为ringBuffer一定会等上一条消息投递完成后,在投递下一个消息。

四、最后

基于事件驱动的理念,能很好的进行各个业务模块之间的通信和解耦。通过异步的方式实现各个业务之间的松耦合,而基于Disruptor高性能框架的事件队列可能是目前在高并发场景下基于事件驱动的不二选择。

0 阅读:4

架构小魔方

简介:感谢大家的关注