分布式消息服务Kafka

负载均衡

2024-05-08 08:55:59

Kafka的消费者示例代码片段如下:

       Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.90.139:8090,192.168.90.41:8090,192.168.90.42:8090");
properties.put("group.id", "ppsgroup");
properties.put("enable.auto.commit", "true");
properties.put("auto.offset.reset", "earliest");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList("pps"));
while (true) {
    ConsumerRecords<Object, Object> records = consumer.poll(100);
    records.forEach(record->{
        String format = String.format("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
        System.out.println(format);
    });
    TimeUnit.SECONDS.sleep(1);
}

 

                                             

每个 Consumer Group 可以包含多个消费实例,即可以启动多个 Kafka Consumer,并把参数 group.id 设置成相同的值。属于同一个 Consumer Group 的消费实例会负载消费订阅的 Topic。

举例:Consumer Group A 订阅了 Topic A,并开启三个消费实例 C1、C2、C3,则发送到 Topic A 的每条消息最终只会传给 C1、C2、C3 的某一个。Kafka 默认会均匀地把消息传给各个消息实例,以做到消费负载均衡。

Kafka 负载消费的内部原理是,把订阅的 Topic 的分区,平均分配给各个消费实例。因此,消费实例的个数不要大于分区的数量,否则会有实例分配不到任何分区而处于空跑状态。这个负载均衡发生的时间,除了第一次启动上线之外,后续消费实例发生重启、增加、减少等变更时,都会触发一次负载均衡。

消息队列 Kafka 的每个 Topic 的分区数量默认是 16 个,已经足够满足大部分场景的需求,且云上服务会根据容量调整分区数。


N8COK2Awi0lq