分布式消息服务-RocketMQ

同组Consumer订阅关系一致

2024-07-02 09:23:12

RocketMQ里的一个Consumer Group代表一个Consumer群组。对于大多数分布式应用来说,一个Consumer Group下通常会有多个Consumer实例。订阅关系一致指的是同一个Consumer Group下所有Consumer实例的处理逻辑必须完全一致,一旦订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。

背景信息

RocketMQ 中一个消费者代表一个Consumer实例群组。在大多数场景中,一个消费者组下面包含多个Consumer实例。

由于云消息队列 RocketMQ 版的订阅关系主要由Topic+Tag共同组成,因此,保持订阅关系一致意味着同一个消费者Group ID下所有的Consumer实例订阅关系的一致性大概包括下面几个方面:

同一个消费组订阅的Topic必须一致,例如:在同一个消费组下,ConsumerA订阅Topic1和Topic2,ConsumerB也必须订阅Topic1和Topic2,只订阅Topic1、只订阅Topic2或订阅Topic2和Topic3都是不允许的。

同一个消费者订阅的同一个Topic的场景下Tag必须一致,包括Tag的数量和T顺序,例如:ConsumerA订阅Topic1的Tag配置为Tag1||Tag2,ConsumerB订阅Topic1的Tag也必须是Tag1||Tag2,只订阅Tag1、只订阅Tag2或者订阅Tag2||Tag1都是不允许的。

正确的订阅关系如下,多个不同的topic可以被多个消费组订阅,但是同一个消费组下的多个Consumer实例订阅Topic和Tag都必须一致。

几个代码示例

订阅一个Topic、一个Tag

同一个消费组下面的全部消费者实例均订阅一个topic,且均配置同一个tag这种是符合订阅关系一致性原则的。

consumer.setConsumerGroup("group1");
consumer.subscribe(topic, "Tag1");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
//         do something
                }

订阅一个topic多个tag

每个消费者订阅消息的代码必须一致

consumer.setConsumerGroup("group1");
consumer.subscribe(topic, "Tag1||Tag2");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
//         do something
                }

订阅多个topic且订阅多个tag

consumer.setConsumerGroup("group1");
consumer.subscribe(topic1, "Tag1");
consumer.subscribe(topic2, "Tag1|Tag2");
consumer.subscribe(topic3, "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
//         do something
                }

常见的订阅关系不一致情况

如果Rocketmq实例消费到的消息不符合预期,可以检查一下消费者逻辑是否存在订阅关系不一致的情况

下面列举几种常见的错误示例

同一个消费组下订阅的topic不一致

消费者实例1的代码:

consumer.setConsumerGroup("group1");
consumer.subscribe(topic1, "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
//         do something
                }

消费者实例2的代码:

consumer.setConsumerGroup("group1");
consumer.subscribe(topic2, "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
//         do something
                }

消费者实例3的代码:

consumer.setConsumerGroup("group1");
consumer.subscribe(topic3, "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
//         do something
                }

同一个消费组的消费实例订阅的topic相同但订阅的tag不一致

消费者实例1的代码

consumer.setConsumerGroup("group1");
consumer.subscribe(topic, "Tag1");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
//         do something
                }

消费者实例2的代码:

consumer.setConsumerGroup("group1");
consumer.subscribe(topic, "Tag2");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
//         do something
                }

消费者实例3的代码:

consumer.setConsumerGroup("group1");
consumer.subscribe(topic, "Tag2");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
//         do something
                }

同一个消费组下全部消费者实例订阅的topic以及tag都一致但订阅tag的顺序不一致

消费者实例1的代码

consumer.setConsumerGroup("group1");
consumer.subscribe(topic, "Tag1||Tag2");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
//         do something
                }

消费者实例2的代码:

consumer.setConsumerGroup("group1");
consumer.subscribe(topic, "Tag2||Tag1");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
//         do something
                }

消费者实例3的代码:

consumer.setConsumerGroup("group1");
consumer.subscribe(topic, "Tag2||Tag1");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
//         do something
                }


rOd7R1et.a8r