For most distributed applications, a consumer group in RocketMQ. usually consists of multiple consumer instances. Consistent subscription relationship: all consumer instances in the same consumer group must have the exact same subscription to the Topic and Tag. If the subscription relationship is not consistent, it can cause confusion when consuming messages and may even result in message loss.
Background
A RocketMQ consumer represents a group of consumer instances. In most scenarios, a consumer group consists of multiple consumer instances.
The subscription relationship mainly consists of Topic+Tag. To ensure that subscriptions are consistent, all consumer instances that are identified by the same group ID must be consistent in the following aspects:
The topics to which the consumer instances in the same consumer group subscribe must be consistent. For example, if Consumer A subscribes to Topic 1 and Topic 2, Consumer B must also subscribe to Topic 1 and Topic 2. Consumer B cannot subscribe only to Topic 1 or Topic 2, or subscribe to Topic 2 and Topic 3.
The tags to which the consumer instances subscribe in the same topic must be consistent, including the number and order of the tags. For example, if Consumer A subscribes to Tag1||Tag2 in Topic 1, Consumer B must also subscribe to Tag1||Tag2 in Topic 1. Consumer B cannot subscribe only to Tag1 or Tag2, or subscribe to Tag2||Tag1.
The following shows an example of consistent subscriptions. Multiple consumer groups subscribe to different topics. All consumer instances in the same group subscribe to the same topics and tags.
Code Examples
Consistent subscription 1: One tag in one topic
All consumer instances in the same consumer group subscribe to a topic and subscribe to the same tag in the topic. This scenario meets the requirements of subscription consistency.
consumer.setConsumerGroup("group1");
consumer.subscribe(topic, "Tag1");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// do something
}Subscribing to a topic and multiple tags
The code that is written for each consumer to subscribe to messages must be identical.
consumer.setConsumerGroup("group1");
consumer.subscribe(topic, "Tag1||Tag2");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// do something
}Subscribing to multiple topics and tags
consumer.setConsumerGroup("group1");
consumer.subscribe(topic1, "Tag1");
consumer.subscribe(topic2, "Tag1|Tag2");
consumer.subscribe(topic3, "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// do something
}Inconsistent Subscriptions
You use RocketMQ to send and receive messages. The consumer instances do not receive the messages as expected and the subscriptions of the consumer instances are inconsistent.
This can be caused by one of the following issues:
Consumer instances that belong to the same group subscribe to different topics.
Code for consumer instance 1:
consumer.setConsumerGroup("group1");
consumer.subscribe(topic1, "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// do something
}Code for consumer instance 2:
consumer.setConsumerGroup("group1");
consumer.subscribe(topic2, "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// do something
}Code for consumer instance 3:
consumer.setConsumerGroup("group1");
consumer.subscribe(topic3, "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// do something
}Consumer instances that belong to the same group subscribe to the same topic but different tags.
Code for consumer instance 1
consumer.setConsumerGroup("group1");
consumer.subscribe(topic, "Tag1");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// do something
}Code for consumer instance 2:
consumer.setConsumerGroup("group1");
consumer.subscribe(topic, "Tag2");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// do something
}Code for consumer instance 3:
consumer.setConsumerGroup("group1");
consumer.subscribe(topic, "Tag2");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// do something
}All consumer instances that belong to the same group subscribe to the same topic and tags, but the tags are specified in different orders.
Code for consumer instance 1
consumer.setConsumerGroup("group1");
consumer.subscribe(topic, "Tag1||Tag2");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// do something
}Code for consumer instance 2:
consumer.setConsumerGroup("group1");
consumer.subscribe(topic, "Tag2||Tag1");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// do something
}Code for consumer instance 3:
consumer.setConsumerGroup("group1");
consumer.subscribe(topic, "Tag2||Tag1");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// do something
}