基于RocketMQ消息灰度解决思路

架构小魔方 2024-04-29 19:20:53
背景

稳定性建设的三板斧:可观察、可回滚、可灰度,这要求在系统设计之初就要考虑这些稳定性的需求,针对于RPC和Http的灰度方案,很多在网关层面或者是在路由层面就能很好的支持,在实际的应用架构中,经常性的使用MQ进行系统之间的消息传递和异步化的解耦,因此仅实现RPC层面的灰度,还不能称之为系统具有可灰度的能力,那么在MQ层面又将如何做到可灰度呢?

如何实现Rocketmq灰度?

从整个Rocketmq技术架构来看,分为消费者端和生产者端,要实现Rocketmq的灰度方案,那就需要实现生产者和消费者针对于消息的灰度,并且能在整个链路中贯穿下去。

生产者如何实现消息的灰度?

主要从这两方面考虑:

1、如果当前请求是灰度请求,那么所对应的消息应该标识为灰度消息

2、如果当前的实例是灰度实例,那么所对应的消息应该标识为灰度消息

Rocketmq要实现消息灰度的打标比较简单,目前有两种方案:基于Tag和基于user-property,但是Tag很多情况下是具有业务语义,因此在这里推荐使用基于user-property的方案。

而Rocketmq在消息发送提供SendMessageHook钩子,可以在此钩子增加灰度标识,这样生产端的灰度标识就完成了。

消费端如何实现灰度消费?

消费端要实现灰度消费较为生产者要复杂些,目前有基于客户端的过滤和基于服务端的实现方案。

基于客户端实现方式:

在消费端最简单的方式针对灰度和正式使用两套不同的Consumer Group ,在基于客户端的FilterMessgeHook增加灰度过滤逻辑,那么就可实现灰度环境过滤了正式环境的消息,而正式环境过滤灰度环境的消息,比较简单的实现了消费端的灰度过滤。

但存在问题

灰度和正式环境均投递了全量的消息,将过滤的压力集中在客户端,哪怕是灰度环境实际消息量很小,也要被迫的接收全量的消息过滤,对于灰度的客户端流量是一个不小的考量,另外服务端也要针对于不同的环境进行两次的消息分发动作,该方案虽然实现简单,但无论从客户端来看,还是从服务端来看,性能损耗比较大。

基于服务端实现方式:主要是基于Tag的过滤和基于SQL92过滤。

从整个Rocketmq的消费订阅模型来看,是按照主题和消费者分组,在结合订阅关系进行消息投递

1、基于Tag的服务端过滤

Tag 过滤的实现中,RocketMQ 消费者向 server 端订阅的时候,会传递订阅信息到服务端。订阅信息为 SubscribtionData,其中包含四个字段:

topictagSetexpressionType=tag:表达式的类型,这里是 tag 过滤,所以值为tagclient version:此次订阅的版本号

Client 会不断向 server 端发送心跳,默认情况下 30 秒一次。过程中 SubscribtionData 可以动态变化,如果对tagSet或表达式的类型进行过更改,则会增加 client version 的值。服务端收到心跳之后,发现心跳里的 SubscribtionData 版本号改变,意味着订阅规则也有所变化,此时会更新客户端的订阅逻辑,决定服务端过滤变化的推送。

Server 端处理服务端的灰度过滤逻辑:

public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) { if (null == tagsCode || null == subscriptionData) { return true; } if (subscriptionData.isClassFilterMode()) { return true; } return subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL) || subscriptionData.getCodeSet().contains(tagsCode.intValue()); }

RocketMQ 中有一个 MessageFilter 类,首先会进行 consumer queue 进行比对,如果匹配成功,则进行 tagscode 的比对。两次比对都匹配才会将消息推送到 client 消费者端。

2、基于SQL92消息过滤方式

如果灰度信息保存在 user-property 字段,可以通过 SQL 92 的方式进行过滤,服务端 ConsumerFilterManager 保存了每一个 topic 对应的FilterDataMapByTopic,而 FilterDataMapByTopic 里保存了不同 consumer group 对应的消费逻辑ConsumerFilterData,ConsumerFilterData 里包含了 consumer group、topic、表达式以及 client version,因此可以借此来过滤。

public ConsumerFilterData { private String consumerGroup; private String topic; private String expression; private String expressionType; private transient Expression compiledExpression; private long bornTime; private long deadTime = 0; private BloomFilterData bloomFilterData; private long clientVersion;}

SQL 92 是一种可以写复杂表达式的过滤规则,比如消息中user-property中的version为gray的消息,我们可以写成(version is gray)

以上方式可以避免了灰度环境拉取所有消息,能够有效减轻灰度环境消费者的负担。同时,服务端不会将所有消息都推送两遍,大大降低了服务端压力。

写在最后

当然要实现Rocketmq的灰度方案思路还有很多,有基于Rocketmq消费队列的模型的方案(可以参考文章:https://segmentfault.com/a/1190000042651505?sort=votes),也有基于影子Topic形式实现方式,类似于死信队列的实现原理。

并且要实现Rocketmq灰度,不仅仅只是考虑MQ层面的事情,还需要考虑整个灰度链路上面的灰度可传递性,比如如果消息生产者是一个独立的线程池,那么灰度标记如何进行传递?比如如果灰度发布回滚了,那么灰度消息如何消费等等,要考虑问题还很多,需要一个一个的去解决,才能真正意义上实现消息的可灰度。

0 阅读:0

架构小魔方

简介:感谢大家的关注