巧用RocketMQ,轻松规避分布式事务操作

程序员科技 2025-03-23 20:10:50

在当今互联网大厂的复杂架构中,分布式系统已成为主流。而在分布式系统里,分布式事务的处理一直是个棘手难题。对于咱们互联网大厂开发人员而言,如何在保证系统数据一致性的同时,高效地完成业务操作,是日常工作中频繁面临的挑战。今天,咱们就来深入探讨一下,如何借助 RocketMQ 这一强大工具,巧妙地避免分布式事务操作。

分布式事务之痛

在分布式系统中,一个业务操作往往会涉及多个服务或节点。比如,在电商系统中,用户下单后,不仅要创建订单记录,还可能需要扣减库存、更新用户积分、通知物流系统等一系列操作。这些操作分布在不同的服务上,要确保它们要么全部成功,要么全部失败,这就是分布式事务的核心诉求。

传统的分布式事务解决方案,如基于 XA 协议的两阶段提交方案,虽然能保证强一致性,但存在性能开销大、协调成本高的问题。在高并发场景下,这种方案会严重影响系统的响应速度和吞吐量。还有 TCC 事务补偿型方案,它需要大量手动编写补偿代码,项目维护难度极大,除非是对一致性要求极高的核心业务场景,否则很少被采用。

RocketMQ 闪亮登场

RocketMQ 是一个分布式消息和流数据平台,具有低延迟、高性能、高可靠性、万亿级容量和灵活的可扩展性等诸多优势。它为我们解决分布式事务问题提供了新的思路,即通过可靠消息最终一致性方案来避免直接的分布式事务操作。

可靠消息最终一致性方案原理

消息发送阶段:A 系统首先发送一个 prepared 消息到 RocketMQ。如果这个 prepared 消息发送失败,那么就直接取消操作,不再进行后续执行。这一步就像是给后续操作上了一道保险,确保消息服务正常可用,才会继续后续业务操作。本地事务执行阶段:当 prepared 消息发送成功后,A 系统接着执行本地事务。如果本地事务执行成功,就告诉 RocketMQ 发送确认消息;如果失败,就告诉 RocketMQ 回滚消息。这里本地事务和消息状态的一致性非常关键,RocketMQ 提供了相应机制来保障。消息消费阶段:如果 RocketMQ 收到了确认消息,B 系统会接收到该消息,然后执行本地事务。消息重试与补偿:RocketMQ 会自动定时轮询所有 prepared 消息回调相应服务接口,询问这个消息是不是因为本地事务处理失败了,所以没发送确认消息。一般通过查询数据库查看之前的本地事务是否执行,如果回滚了,那么这里也回滚。这就有效避免了本地事务执行成功,而确认消息发送失败的情况。倘若 B 系统的事务失败了,RocketMQ 会自动不断重试,直到成功。如果实在无法成功,对于重要的资金类业务,可以针对 B 系统本地回滚后,想办法通知 A 系统也回滚;或者发送报警由人工来手工回滚和补偿。

实际应用场景举例

以电商业务中常见的 “订单支付” 场景来说,在订单支付成功后,需要更新订单状态、更新用户积分、通知商家有新订单、更新推荐系统中的用户画像等等。在引入 RocketMQ 之前,这些操作可能都在一个分布式事务中进行,牵一发而动全身,任何一个环节出错,整个事务都要回滚。

引入 RocketMQ 后,订单支付服务只需要关注最重要的流程 —— 更新订单状态即可。其他诸如更新用户积分、通知商家、更新用户画像等操作,全部交给 RocketMQ 来异步通知。这样一来,不仅实现了系统解耦,各个服务之间的依赖关系大大降低,而且因为这些步骤变成了异步执行,能显著减少订单支付的整体耗时,提升订单系统的吞吐量。

RocketMQ 通用实现步骤

引入依赖

以 Java 项目为例,若使用 Maven 构建项目,在pom.xml文件中添加 RocketMQ 客户端依赖:

<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.4</version></dependency>

确保版本与项目需求及 RocketMQ 服务端版本兼容。

配置生产者

创建生产者实例:

DefaultMQProducer producer = new DefaultMQProducer("producerGroup");producer.setNamesrvAddr("localhost:9876");// 根据实际RocketMQ集群地址配置

这里的producerGroup是生产者组名称,同一组内的生产者具有相同的角色和属性,NamesrvAddr指定了 RocketMQ 的 NameServer 地址,NameServer 负责管理 Broker 的路由信息。

启动生产者

try { producer.start(); System.out.println("Producer started successfully");} catch (MQClientException e) { e.printStackTrace();}

启动生产者后,它便可以向 RocketMQ 发送消息。

发送消息

构建消息对象

Message msg = new Message("TopicTest", // Topic名称 "TagA", // Tag标签 "OrderID188", // 消息键,用于消息查询等场景 "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET) // 消息体);

Topic用于对消息进行分类,不同业务场景可设置不同 Topic;Tag进一步细化消息类别,方便消费者过滤消息。

发送消息

try { SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult);} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { e.printStackTrace();}

发送消息时,会返回SendResult对象,包含消息发送的状态、消息在 Broker 中的存储位置等信息,可据此判断消息是否发送成功。

配置消费者

创建消费者实例

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "TagA");

consumerGroup是消费者组名称,同一组内的消费者共同消费 Topic 中的消息。subscribe方法用于指定消费者订阅的 Topic 和 Tag。

注册消息监听器

consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET); System.out.println("Received message: " + messageBody); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});

消息监听器用于处理接收到的消息,在consumeMessage方法中编写业务逻辑处理消息内容,处理完成后返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS表示消息消费成功。

启动消费者

try { consumer.start(); System.out.println("Consumer started successfully");} catch (MQClientException e) { e.printStackTrace();}

启动消费者后,它会开始从 RocketMQ 拉取消息并进行消费。

总结

通过使用 RocketMQ 的可靠消息最终一致性方案,为开发人员提供了一种高效、可靠且易于维护的避免分布式事务操作的方法。它在保证系统数据最终一致性的同时,极大地提升了系统的性能和可扩展性。

当然,在实际应用中,还需要根据具体的业务场景和需求,对 RocketMQ 进行合理的配置和优化。比如,根据业务的并发量和消息量,合理调整 RocketMQ 的集群规模和参数;针对不同的业务消息,设置合适的 Topic、Tag 和分片键等。

随着互联网技术的不断发展,分布式系统的复杂度还会持续增加,分布式事务的处理也将面临更多新的挑战。但相信借助像 RocketMQ 这样优秀的技术工具,我们一定能够在复杂的技术环境中,找到最佳的解决方案,为用户提供更加稳定、高效的服务。

0 阅读:0

程序员科技

简介:感谢大家的关注