分布式消息服务Kafka

操作类

2024-05-09 03:11:01

1.1.1    操作类常见问题

消息在kafka保留多长时间?

消息保存72小时,超过72小时的消息将会被删除。

Kafka可以创建多少个主题?

Kafka基础版可以创建50个主题、Kafka高级版可以创建100个主题。

如果想消费已经被消费过的数据?

consumer是底层采用的是一个阻塞队列,只要一有producer生产数据,那consumer就会将数据消费。当然这里会产生一个很严重的问题,如果你重启一消费者程序,那你连一条数据都抓不到,但是log文件中明明可以看到所有数据都好好的存在。换句话说,一旦你消费过这些数据,那你就无法再次用同一个groupid消费同一组数据了。针对这种情况,你可在控制台重置消费组消费点(3天内)。

是否需要预先创建消费组

消费组和消费组订阅主题关系虽然业务应用客户端接入时可自动创建,但建议都先预先创建做好管理。

出现“Not authorized to access group”的错误信息

没有创建消费组时会遇到此报错信息,创建消费组可解决此问题。

为什么PHP发送延时比较长?

PHP发送延时比较长是PHP的语言特性导致的。PHP每次发送时,都会重新初始化一个KafkaProducer对象,这个初始化会进行各种操作,包括连接各个Broker、更新元数据等,在VPC内耗时100ms以上,在公网可能耗时500ms以上。相比之下,Java会复用KafkaProducer,发送延迟较低。

哪里可以找到生产消费消息的示例

最佳实践 - 生产者实践、消费者实践。

如何进行发送消息的测试?

可以直接在Kafka控制台进行发送消息的测试。在控制台的Topic管理页面,单击目标Topic右侧的生产拨测,进行消息发送测试,以验证集群是否运转正常。

使用客户端发送消息后,如何确定是否发送成功?

如果回调成功则说明消息发送成功。大部分客户端在发送之后,会返回Callback或者Future,如果回调成功,则说明消息发送成功。

此外,还可以在控制台通过以下方式确认消息发送是否正常:

查看“监控信息”,实例消息生产条数。

查看“消息查询”,可按时间查询消息。

发送消息的回调是否会影响发送速度?

Java客户端设置回调是否会影响消息发送的速度取决于如下两点:

(1)回调的处理耗时:为减少回调的处理耗时,不要过于频繁地在回调做耗时较长的处理。可以积累一定量Ack后再做批量的回调处理,或者在另一个异步的线程去处理,从而不阻塞回调的完成。

(2)max.in.flight.requests.per.connection:该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。

实例支持哪些开源版本?

当前服务端支持的版本为2.3.1和2.8.2。实例创建后,服务端版本不支持升级,不支持定制版本。

如何选择实例硬盘大小?

磁盘大小:流量均值×存储时长×3(备份),建议在迁移上云过程中优化Topic以降低成本。

升级Broker可能产生哪些影响?

升级Broker可能产生消息乱序、客户端连接中断、消息量不均衡等影响。

升级Broker包含以下影响:

升级过程中,会逐个重启云消息队列 Kafka 版集群中所有的Broker。在重启Broker的过程中服务不会中断,但是从每个Broker重启完成之后的5分钟内消费的分区消息可能会发生乱序。

l  重启过程中已有的客户端连接可能会中断。客户端要有自动重连功能,服务端的其他Broker会自动接替服务。

l  此外,升级和重启Broker期间,各个分区处理的消息量也会出现一定的不均衡,需要评估一下升级变更对业务可能产生的影响。

升级所有Broker大概需要5分钟~15分钟。如果有多个实例,可以考虑先升级测试集群,验证通过后再升级生产集群。

实例的地域无法变更?

实例购买部署之后,其地域与物理资源紧密结合,无法变更。如需变更实例的地域,请释放实例,并重新购买。

如何快速测试分布式消息服务Kafka服务端是否正常?

前提条件

已创建并部署分布式消息服务Kafka实例,且实例处于服务中状态。

操作流程

快速测试分布式消息服务Kafka服务端的流程如下:

(1)新建Topic

(2)在主题管理页面,生产拨测,体验发送消息

(3)在主题管理页面,查看分区状态

(4)在消息查询页面,按时间或位点查询消息

是否支持延迟消息?

和开源Apache Kafka一样,分布式消息服务Kafka同样不支持延迟消息。

是否支持压缩消息?

分布式消息服务Kafka服务端支持收发压缩消息。

如需使用压缩消息,需要在分布式消息服务Kafka的客户端进行设置。在分布式消息服务Kafka客户端进行消息压缩的说明如下:

压缩格式:支持Snappy、LZ4、GZIP等压缩格式。其中,GZIP对CPU的消耗较高,因此不建议选择GZIP,建议选择Snappy或LZ4。

适用场景:一般来说,CPU的价格比流量和存储要高。对于日志类等压缩比较高的场景,可以考虑使用压缩。其余场景,不建议使用压缩。

CPU消耗:压缩会消耗额外的CPU,平均在20%以上。具体额外CPU消耗,需要根据实际场景进行测试。

如何释放实例?

若不再需要使用实例,可以在控制台上退订实例。

为什么限制Topic总数(分区总数)?

Topic总数(分区总数)太多会使集群性能和稳定性能急剧下降。

分布式消息服务Kafka的存储和协调机制是以分区为粒度的,分区数太多,会导致存储碎片化严重,集群性能和稳定性都会急剧下降。

为什么Topic不能减分区?

Topic减分区会造成数据丢失,这是Apache Kafka自身设计所限制的。

是否支持Compact的日志清理策略?

开源版本为2.2.0或以上的分布式消息服务Kafka实例支持Compact的日志清理策略。

如何查看哪些IP在消费消息?

在控制台消费组管理页面,查看消费实例。

哪里可以找到消费最佳实践?

最佳实践 - 消费者实践。

如何在修改Consumer的offset?

提交消费位点的机制取决于客户端SDK,一般支持以下两种机制:

自动提交:按照时间间隔,SDK把消费过的最新消息的位点+1提交上去。

手动提交:应用程序里,把消费过的最新消息的位点+1提交上去。

在控制台的消费组管理页面,可以重置消费位置。

为什么在控制台看不到Group的订阅关系?

问题现象

已启动某个Group的消费线程,但在分布式消息服务Kafka控制台的消费组管理页面,查不到该Group订阅的Topic信息。

可能原因

客户端的配置错误或者所处的网络环境异常导致无法成功订阅消息。

采用assign方式手动指定消费者订阅某个Topic分区的消息,并未提交消费位点。

解决方案

排查客户端配置问题

排查客户端网络问题

需提交消费位点

为什么同一个分区被多个消费线程消费了?

消费客户端使用“StickyAssignor”分配模式消费消息时,发现同一个分区被多个消费线程消费,出现数据错乱的情况。

可能原因

客户端低于2.3版本。2.3版本以前的客户端有可能将同一个分区分配给多个消费线程进行消费。

解决方案

建议您升级客户端至2.3或以上版本,或者换成其他分区分配策略。

使用建议:“StickyAssignor”分配策略目前在一些情况下会产生分配偏差,比如分区重复分配问题。如果不是业务特殊需求,不建议使用该分配策略。

为什么Group的状态一直处于“删除中”?

当前删除Group采用的是异步删除方式,一般情况下,删除一个Group大概需要耗时1-2分钟。

问题现象

在分布式消息服务Kafka控制台的消费组管理页面删除Group后,此Group的状态一直处于删除中。删除中

可能原因

Group中存在活跃的订阅关系。

Group上有消费线程在提交新的消费位点。

解决方案

登录分布式消息服务Kafka控制台,查看Group的订阅关系。

若查询到Group订阅了Topic或者有消费线程在提交新的消费位点,请先取消订阅关系再删除Group。

为什么消费组的消息堆积量为“0”或者未显示

查看Group的消费状态时,消费位点不等于最大位点,但Group的堆积量显示为“0”,或者未显示。

可能原因

未曾提交过消费位点,或者消费位点已过期。

提交过消费位点并且未过期,但由于部分消息数据过期被删除,导致消费位点小于或者等于最小位点。

为什么不能登录部署分布式消息服务Kafka的机器?

分布式消息服务Kafka提供全托管免运维服务,您无需登录机器,集群的一些基础信息会通过监控告警进行透传。

修改企业项目,是否会导致Kafka重启?

修改企业项目不会导致Kafka重启。

Kafka实例支持批量导入Topic功能么?

支持批量创建Topic,可下载批量创建模板填写信息后导入。见操作指南批量创建Topic

消息被消费后,没有删除,导致Kafka存储空间占满?

当消息被消费后,Kafka默认情况下并不会立即删除它们,而是将其保留在磁盘上。这可能会导致Kafka存储空间占满的问题。

Kafka可以删除消费组下不用的Topic吗?

可以。操作步骤如下:

(1)登录管理控制台。

(2)进入Kafka管理控制台。

(3)在实例列表页在操作列,目标实例行点击“管理”。

(4)点击“主题管理”后进入主题管理页面后点击“更多”,出现如下图弹窗。

(5)在Topic所在行,单击“删除”,并选择确定。

Kafka实例是否需要创建消费组、生产者和消费者?

不需要额外创建,支持创建消费组,操作步骤如下:

(1)登录管理控制台。

(2)进入Kafka管理控制台。

(3)在实例列表页在操作列,目标实例行点击“管理”。

(4)点击“消费组管理”后进入消费组管理页面。

(5)点击“新建消费组”后,输入消费组名称,点击创建。

注:消费组业务应用接入使用时客户端也可自动创建。

Kafka生产消息的最大长度是多少?

Kafka生产消息的最大长度默认情况下为1MB。

消息超过老化时间,消息仍存在的原因

消息超过老化时间(message retention time)仍然存在的原因可能有以下几种情况:

l  消息存储配置错误:可能是由于配置错误导致消息的老化时间没有按照预期进行清理。您可以检查Kafka的配置文件中log.retention.ms或log.retention.hours参数,确保其设置正确。

l  消费者组延迟或故障:如果消息被分发给了一个消费者组,但消费者组中的某个消费者出现延迟或故障,导致消息无法及时消费,那么这些消息可能会超过老化时间而仍然存在。

l  消息处理逻辑问题:在消费者端的消息处理逻辑中可能存在问题,导致消息无法被及时处理或消费。这可能是由于代码bug、处理逻辑错误或其他原因引起的。

l  多个分区或主题的不一致:如果您的消息被分布在多个分区或主题中,而某些分区或主题的老化时间设置与其他分区或主题不一致,那么消息可能会在某些分区或主题中超过老化时间而仍然存在。

为了解决这个问题,您可以采取以下措施:

l  检查Kafka的配置文件,确保消息的老化时间设置正确。

l  检查消费者组的消费情况,确保消费者能够及时消费消息。

l  检查消费者端的消息处理逻辑,确保消息能够被正确处理和消费。

l  检查分区或主题的配置,确保它们的老化时间设置一致。

1.1.2    如何配置客户端参数?

据实际业务场景,客户端参数配置适当的值,客户端参数列表及说明如下所示:

表1.生产者客户端

参数

说明

retries

消息发送失败时的重试次数。

retry.backoff.ms

消息发送失败时的重试间隔,建议设置为1000。单位:毫秒。

acks

发送消息的持久化机制。为了提升发送性能, 建议设置为acks=1。acks=0:无需服务端的Response,性能较高、丢数据风险较大。acks=1:服务端主节点写成功即返回Response,性能中等、丢数据风险中等、主节点宕机可能导致数据丢失。acks=all:服务端主节点写成功且备节点同步成功才返回Response,性能较差、数据较为安全、主节点和备节点都宕机才会导致数据丢失。

batch.size

发往每个分区的消息缓存量。达到设置的数值时,就会触发一次网络请求,然后Producer客户端把消息批量发往服务器。如果batch.size设置过小,有可能影响发送性能和稳定性。建议保持默认值16384。单位:字节。

linger.ms

每条消息在缓存中的最长时间。若超过这个时间,Producer客户端就会忽略batch.size的限制,立即把消息发往服务器。建议根据业务场景,   将linger.ms设置在100~1000之间。单位:毫秒。

partitioner.class

设置分区策略。建议采用粘性分区策略,可提升发送性能。发送客户端2.4及以上版本,默认采用粘性分区策略模式。

buffer.memory

发送的内存池大小。如果内存池设置过小,则有可能导致申请内存耗时过长,从而影响发送性能,甚至导致发送超时。建议buffer.memory ≧ batch.size * 分区数 * 2。单位:字节。

表2.消费者客户端参数

参数

说明

fetch.min.bytes

消费者从服务端获取数据的最小字节数。设置该参数时请尽量评估消息发送端的消息量,若设置过大可能会导致消费端延迟增大,过小可能会导致消费端频繁拉取消息。单位:字节。

fetch.max.wait.ms

服务端等待的最大时间。单位:毫秒。如果使用Local存储引擎,配置了fetch.min.bytes参数,服务器会等待足够的数据才会返回。超过此时间即使没有足够数据也会返回。如果是云存储引擎,一旦有新数据发送进来,   服务器就会结束等待,不需要等待fetch.min.bytes的值。

max.partion.fetch.bytes

每个分区返回的最大字节数。单位:字节。

session.timeout.ms

消费端发送心跳的时间间隔,如果在心跳时间间隔内没有发送心跳,则服务端会认为消费者死亡,从而触发Rebalance,Rebalance期间客户端将会停止消费数据等待Rebalance完成。建议将此参数设置为3000060000。单位:毫秒。默认有效值:6000300000。

max.poll.records

每次Poll获取的最大消息数量,若此值设置过大则需要尽快处理业务逻辑,避免处理过慢影响下一次Poll数据,从而导致在session.timeout.ms时间内没有发送心跳引起Rebalance。建议该值小于<单个线程每秒消费的条数> * <消费线程的个数> *的值。重要 在Java Client 0.10.1及其以上版本有单独的线程发送心跳,小于此版本或者其他语言的客户端都需考虑处理数据时间和发送心跳的间隔,防止频繁Rebalance影响正常消费。

max.poll.interval.ms

最大的Poll间隔时间,仅在Java Client 0.10.1及其以上版本需配置该参数。如果在间隔时间内消费者没有发送Poll请求,即使在session.timeout.ms参数设置的时间内发送了心跳。服务端也会认为消费者死亡,从而触发Rebalance。因此需注意此值需要合理设置,建议该值大于<消费一条消息花费的时间> *的值。通常使用默认值即可。单位:毫秒。默认值:300000。

enable.auto.commit

是否采用自动提交位点机制。true:默认采用自动提交机制。false:不采用自动提交机制。默认值:true。

auto.commit.interval.ms

自动提交位点时间间隔。默认值为1000,单位:毫秒。

auto.offset.reset

消费位点重置策略。latest:从最大位点开始消费。earliest:从最小位点开始消费。none:不做任何操作,即不重置。说明建议设置成latest,而不要设置成earliest,避免因位点非法时从头开始消费,从而造成大量重复。如果是您自己管理位点,可以设置为none。

1.1.3    如何判断和处理消息堆积?

l  断消息堆积是否属于正常情况

登录“分布式消息服务Kafka”控制台,在“消费组管理”页面,找到目标消费组,进入“消息堆积”页面。

(1)堆积量保持在一个稳定的数值之间波动,没有持续扩大。说明客户端一直在拉取最新消息,没有消息堆积,属于正常情况。

(2)堆积量逐步扩大,并且当前位点一直不变。客户端的消费线程因为某些原因卡住,没有继续消费,也没有继续向服务端提交位点,属于异常情况,即消息的确堆积了。

(3)堆积量逐步扩大,同时当前位点在前进。说明客户端还在消费中,但是消息的消费速度慢于消息的发送速度。消息堆积大多是消费速度过慢或者消费线程阻塞造成的,建议不要在消费逻辑中有太多耗时的操作。

l  消息堆积的处理方式

经过上述判断,确认消息的确存在堆积情况时,建议打印消息的消费耗时,或者根据堆栈信息查看线程执行情况,适当调整以加快消息的消费速度,避免出现消息堆积。

1.1.4    为什么消费客户端频繁出现Rebalance?

l  能原因

可能是Kafka客户端版本过低或者Consumer没有独立线程维持心跳。

v0.10.2之前版本的客户端:Consumer没有独立线程维持心跳,而是把心跳维持与poll接口耦合在一起。其结果就是,如果用户消费出现卡顿,就会导致Consumer心跳超时,引发Rebalance。

v0.10.2及之后版本的客户端:如果消费时间过慢,超过一定时间(max.poll.interval.ms设置的值,默认5分钟)未进行poll拉取消息,则会导致客户端主动离开队列,而引发Rebalance。

l  解决方案

首先您需要了解以下几点信息:

session.timeout.ms:心跳超时时间(可以由客户端自行设置)。

max.poll.records:每次poll返回的最大消息数量。

v0.10.2之前版本的客户端:心跳是通过poll接口来实现的,没有内置的独立线程。

v0.10.2及之后版本的客户端:为了防止客户端长时间不进行消费,Kafka客户端在v0.10.2及之后的版本中引入了max.poll.interval.ms配置参数。

(1)参考以下说明调整参数值:

session.timeout.ms:v0.10.2之前的版本可适当提高该参数值,需要大于消费一批数据的时间,但不要超过30s,建议设置为25s;而v0.10.2及其之后的版本,保持默认值10s即可。

max.poll.records:降低该参数值,建议远远小于<单个线程每秒消费的条数> * <消费线程的个数> *的积。

max.poll.interval.ms:该值要大于 / (<单个线程每秒消费的条数> * <消费线程的个数>)的值。

(2)尽量提高客户端的消费速度,消费逻辑另起线程进行处理。

(3)减少Group订阅Topic的数量,一个Group订阅的Topic最好不要超过5个,建议一个Group只订阅一个Topic。

(4)将客户端升级至0.10.2以上版本。

1.1.5    消费端从服务端拉取不到消息或拉取消息缓慢

问题现象

Topic中有消息并且Consumer未消费到最新的位置,出现消费端从服务端拉取不到消息或拉取消息缓慢的情况(特别是公网消费时)。

可能原因

消费流量达到网络带宽。

单个消息大小超过网络带宽。

Consumer每次拉取的消息量超过网络带宽。

说明

Consumer每次消息的拉取量受以下参数影响:

max.poll.records:每次拉取的最多消息数。

fetch.max.bytes:每次拉取的最大总byte数。

max.partition.fetch.bytes:每个Partition每次拉取的最大总byte数。

 

解决方案

(1)登录分布式消息服务Kafka控制台查询消息。

如果能查询到消息,请继续尝试以下步骤。

(2)在实例详情页面,单击左侧导航栏的监控信息,查看消费流量是否已达到网络带宽。

如果消费流量已经达到网络带宽,您需要扩充网络带宽。

(3)检查Topic中是否存在单个消息的大小超过网络带宽。

如果存在单个消息的大小超过网络带宽,请提高网络带宽,或者减小单个消息的大小。

(4)检查Consumer每次拉取的消息量是否超过网络带宽。

说明

如果每次拉取的消息量超过网络带宽,您需要调整以下参数。

网络带宽>fetch.max.bytes

网络带宽>max.partition.fetch.bytes*总订阅Partition数

1.1.6    为什么不推荐使用Sarama Go客户端收发消息?

问题现象

所有Sarama Go版本客户端存在以下已知问题:

当Topic新增分区时,Sarama Go客户端无法感知并消费新增分区,需要客户端重启后,才能消费到新增分区。

当Sarama Go客户端同时订阅两个以上的Topic时,有可能会导致部分分区无法正常消费消息。

当Sarama Go客户端的消费位点重置策略设置为Oldest(earliest)时,如果客户端宕机或服务端版本升级,由于Sarama Go客户端自行实现OutOfRange机制,有可能会导致客户端从最小位点开始重新消费所有消息。

解决方案

建议尽早将Sarama Go客户端替换为Confluent Go客户端。

Confluent Go客户端的Demo地址,请访问kafka-confluent-go-demo

1.1.7    为什么发送给Topic的消息在分区中分布不均衡?

问题现象

发送消息到某个Topic后,该Topic下部分分区消息比较多,部分分区消息少,甚至没有。

可能原因

发送消息时指定了分区,导致未指定的分区没有消息。

发送消息时指定了消息Key,按照对应的Key发送消息至对应的分区,导致分区消息不均衡。

通过代码重新实现了分区分配策略,但策略逻辑有问题,导致分区消息不均衡。

1.1.8    为什么Group不存在但能消费消息?

在分布式消息服务Kafka控制台上,未查看到对应的Group,但此Group下却有消费线程在消费消息。

可能原因

l  如果客户端使用assign方式消费消息,那么即使不创建Group,也可能消费消息。

l  如果客户端使用subscribe方式消费消息,删除Group后,消费线程未停止或者未发生Rebalance,那么消费线程还可以继续正常消费。

解决方案

l  如果客户端使用assign方式消费消息,请提前在分布式消息服务Kafka控制台创建Group。

l  请尽量复用Group,避免创建过多的Group而影响集群的稳定性。

l  在删除Group前,请确保已停止该Group下的所有消费线程。

1.1.9    消费端挂载NFS是否会影响消费速度?

费端在消费消息的主线程里同步将拉取的消息存储在NFS,导致消费端处理消息的速度变慢,阻塞消息处理。

可能原因

NFS本身速度就不太理想。

NFS是网络共享存储,虽然有多机器共享访问的能力优势,但多台机器访问是争抢的,消费者个数增多,性能反而下降。

解决方案

建议将消费端拉取消息和存储消息分别放在两个独立且不同的线程里操作。拉取消息的线程只管消费消息,把消息转给缓存处理线程后就继续消费消息,这样可以保证消费速度的稳定。

也可以考虑采用云盘,给每台消费端处理机挂载自己的云盘,各自独立存储,这样消费端不会再因为争抢NFS而降低性能。如果需要把最终的处理结果集中到同一个NFS上保存,仍然可以通过一个异步的工具或者线程,把云盘上的结果再转发到NFS上,而不要让同步存储NFS阻塞消息处理。总之,对于资源访问造成的处理低效,总是可以用异步处理的方式解决。


X0DguLzmdqZ9