RocketMQ-消费者是如何获取消息处理以及进行ACK的

无天有壁纸 2024-05-05 22:38:25
消费者是如何获取消息处理以及进行ACK的消费者组的意思就是让你给一组消费者起一个名字,比如有一个Topic叫“TopicOrderPaySuccess”,然后假设有库存系统、积分系统、营销系统、仓储系统他们都要去消费这个Topic中的数据。 此时我们应该给这四个系统分别起一个消费者组的名字,比如stock_consumer_group、marketing_consumer_group、credie_consumer_group、wms_consumer_group。 设置消费者组的方式是在代码里面进行设置的,类似下面: DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("stock_consumer_group");然后比如库存系统部署了4台机器,每台机器上的消费者组的名字都是stock_consumer_group,那么这4台机器就同属于一个消费者组,以此类推,每个系统的几台机器都是属于各自的消费者组的。 假设库存系统和营销系统作为两个消费者组,都订阅了TopicOrderPaySuccess这个订单支付成功消息的Topic,此时假设订单系统作为生产者发送了一条消息到这个Topic,此时这条消息是怎么被消费的呢? 正常情况来说,这条消息进入Broker之后,库存系统和营销系统作为两个消费者组,每个组都会拉取到这条消息。也就是说这个订单支付成功的消息,库存系统会获取到一条,营销系统也会获取到一条,他们俩都会获取到这条消息。 但是问题来了,库存系统这个消费者组里面有多台机器,是多台机器都获取到这条消息,还是只有一台机器会获取到这条消息? 答案是正常情况来说,库存系统这个消费者组里面只有一台机器会获取到这条消息,营销系统也是同理。 集群模式消费 vs 广播模式消费对于一个消费组而言,他获取到一条消息之后,如果消费组内部有多台机器,到底是只有一台机器可以获取到这个消息,还是每台机器都可以获取到这消息? 这就是集群模式和广播模式的区别。 默认情况下我们都是集群模式,也就是说一个消费组获取到一条消息,只会交给组内的一台机器去处理,不是每台机器都可以获取到这条消息的。 但是我们可以通过如下设置来改成广播模式: consumer.setMessageMode(MessageModel.BROADCASTING);如果改成广播模式,那么对于消费组获取到的一条消息,组内每台机器都可以获取到这条消息,但是相对而言广播模式其实很少用到。 MessageQueue与消费者的关系对于一个Topic上的多个MessageQueue,是如何由一个消费组中的多台机器来进行消费的? 可以简单理解为他会均匀的将MessageQueue分配给消费组内的多台机器来消费。 假设Topic有4个MessageQueue,然后消费组中有两台机器,那么正常情况下,当然最好的就是让这两台机器每个都负责两个MessageQueue的消费了。 原则就是一个MessageQueue只能被一个消费机器去处理,但是一台消费者机器可以负责多个MessageQueue的消费处理。 Push模式 vs Pull模式我们已经知道了一个消费者组内的多台机器分别负责一部分MessageQueue的消费,那么每台机器都必须去连接到对应的Broker,尝试消费里面的MessageQueue对应的消息了。 此时就要涉及到两种消费模式,push和pull。实际上这两个消费模式的本质是一样得到,都是消费者机器主动发送请求到Broker机器去拉取一批消息下来。 Push消费模式本质底层也是基于消费者主动拉取的模式来实现的,只不过他的名字叫Push而已,意思是Broker会尽可能实时的把新消息交给消费者机器来进行处理,他的消息时效性会更好。 一般我们使用RocketMQ的时候,消费模式通常都是基于他的Push模式来做的,因为Pull模式的代码写起来更加的复杂和繁琐,而且Push模式底层本身就是基于消息拉取的方式来做的,只不过时效性更好而已。 Push模式的实现思路是当消费者发送请求到Broker去拉取消息的时候,如果有新的消息可以消费,那么就会立马返回一批消息到消费者机器去处理,处理完之后会接着立刻发送请求到Broker机器去拉取下一批消息。 所以消费者机器在Push模式下会处理完一批消息,立马发起请求拉取下一批消息,消息处理的时效性非常好,看起来就跟Broker一直不停的推送消息到消费者机器一样。 另外Push模式下有一个请求挂起和长轮询的机制,当你的请求发送到Broker,结果Broker发现没有新的消息给你处理的时候,就会让请求线程挂起,默认是挂起15秒,然后这个期间他会有后台线程每隔一会儿就去检查一下是否有新的消息给你,另外如果在这个挂起过程中,如果有新的消息到达了,会主动唤醒挂起的线程,然后把消息返回给你。 如果消费者组中出现机器宕机或者扩容加机器,会怎么处理这个时候会进入一个rebalance的环节,也就是重新给消费者组的机器分配他们要处理的MessageQueue。
0 阅读:2

无天有壁纸

简介:感谢大家的关注