SpringBoot中使用SpringIntegration对Ra...

哥们看看码农 2024-08-07 19:51:30
在构建微服务应用时,您可能会遇到需要批量处理传入消息的场景,例如创建ETL记录或批量更新数据库等。Spring Integration正是为此类需求量身定制的解决方案。 Spring Integration作为Spring框架的扩展,提供了丰富的企业集成模式,使得Spring应用间的模块通信和消息处理变得简单高效。借助这一框架,您不仅可以轻松实现不同模块或服务间的通信,还能在消息处理前对其进行灵活的操控。 本文将通过实例详细展示如何利用Spring Integration对RabbitMQ消息进行分组和处理。假设我们正在开发一个日志收集服务,该服务需要逐条接收消息,并按数量或类型进行分组后再行处理。让我们深入探讨这一过程。 依赖项 org.springframework.boot spring-boot-starter-amqp 3.3.1 org.springframework.boot spring-boot-starter-integration 3.3.1 org.springframework.integration spring-integration-amqp 6.3.1 org.springframework.integration spring-integration-http 6.3.1欢迎关注 SpringForAll社区(spring4all.com),专注分享关于Spring的一切!关注公众号:SpringForAll社区,回复“加群”还可加入Spring技术交流群!消息结构我们的日志消息包含message、userId和type属性。这些消息将通过RabbitMQ发送,并通过分组进行处理。其结构如下所示: public LogDto { private String message; private String userId; private LogType type;}public enum LogType { INFO, WARN, ERROR, FATAL;}按消息数量分组@Component@RequiredArgsConstructorpublic LogWriterQueueHandlerByCount implements CorrelationStrategy { @Value("${messaging.log.writer.queue}") private String QUEUE_NAME; private final MessageConverter messageConverter; private final boolean QUEUE_AUTO_START = false; private final int BATCH_SIZE = 5; private final int BATCH_TIMEOUT = 1000; @Bean public IntegrationFlow logFlowByCount(ConnectionFactory connectionFactory) { return IntegrationFlow.from( Amqp.inboundAdapter(connectionFactory, QUEUE_NAME) // listen to the queue .messageConverter(messageConverter) .autoStartup(QUEUE_AUTO_START)) .aggregate(a -> a .correlationStrategy(this, "getCorrelationKey") .releaseStrategy(r -> r.size() == BATCH_SIZE) // complete the group after BATCH SIZE of messages .expireGroupsUponCompletion(true) // don't wait to finish the group .sendPartialResultOnExpiry(true) // send partial results if needed .groupTimeout(BATCH_TIMEOUT)) // timeout groups after milliseconds .handle(this) // calls handleMessage method .get(); } @Override public Object getCorrelationKey(Message message) { // Integration flow is not allowed null values, so we need to set a default value if (message.getPayload() instanceof LogDto) { return 1; } else { throw new RuntimeException("Correlation has been failed"); } } @ServiceActivator public void handleMessage(Collection aggregatedData) { System.out.println(MessageFormat.format("Collected Items {0}", aggregatedData)); }}在本例中,我们设定了一个相关性策略,该策略将持续收集消息,直至满足以下任一条件:收到5条消息或时间过去1秒。Spring Integration通过相关键来定义消息组。在此示例中,我们使用getCorrelationKey方法来定义组键。由于我们的目标仅仅是按数量对消息进行分组,因此我们返回一个静态值1作为组键。 那么,如果我们希望按类型而非数量进行分组,该如何操作呢?其实非常简单! 按类型分组@Component@RequiredArgsConstructorpublic LogWriterQueueHandlerByType implements CorrelationStrategy { @Value("${messaging.log.writer.queue}") private String QUEUE_NAME; private final MessageConverter messageConverter; private final boolean QUEUE_AUTO_START = true; private final int BATCH_SIZE = 5; private final int BATCH_TIMEOUT = 1000; @Bean public IntegrationFlow logFlowByType(ConnectionFactory connectionFactory) { return IntegrationFlow.from( Amqp.inboundAdapter(connectionFactory, QUEUE_NAME) // listen to the queue .messageConverter(messageConverter) .autoStartup(QUEUE_AUTO_START)) .aggregate(a -> a .correlationStrategy(this, "getCorrelationKey") .releaseStrategy(r -> r.size() == BATCH_SIZE) // complete the group after BATCH SIZE of messages .expireGroupsUponCompletion(true) // don't wait to finish the group .sendPartialResultOnExpiry(true) // send partial results if needed .groupTimeout(BATCH_TIMEOUT)) // timeout groups after milliseconds .handle(this) // calls handleMessage method .get(); } @Override public Object getCorrelationKey(Message message) { // Integration flow is not allowed null values, so we need to set a default value if (message.getPayload() instanceof LogDto) { var type = ((LogDto) message.getPayload()).getType(); return Objects.requireNonNullElse(type, -1); } else { throw new RuntimeException("Correlation has been failed"); } } @ServiceActivator public void handleMessage(Collection aggregatedData) { System.out.println(MessageFormat.format("Collected Items {0}", aggregatedData)); }}正如我们所见,只需对getCorrelationKey方法稍作调整即可实现。我们从消息中提取类型值,并将其作为相关键使用。 总结Spring Integration框架以其丰富的模式实现,极大地简化了企业级项目的开发。在本文中,我们探讨了如何利用Spring Integration创建相关性策略流,从而按照我们的需求对RabbitMQ消息进行灵活的分组和处理。期待在下一篇文章中与您继续探讨更多技术话题。 来源:https://spring4all.com/forum-post/7379.html
0 阅读:0

哥们看看码农

简介:感谢大家的关注