问题背景
业务场景
mq消息消费实时性要求不高,期望可以牺牲一部分实时性,换取吞吐量,例如:数据库单条insert优化为batchInsert。优化后结果不符合预期:消费者消费消息的batchSize远小于实际配置的max.poll.records,导致在批量消息达到时想要聚合mq批量操作业务数据效果与单条处理效果类似。于是翻查影响kafka吞吐量的相关配置
原因分析
Kafka 的 poll()
方法返回的消息数量与 batch.size
参数并不是直接相关的,影响 Kafka 消费者 poll()
时能获取消息数量的因素有很多。让我们逐步分析这些因素,并探讨可能的优化方法。
影响 poll()
返回消息数量的因素
**max.poll.records**
** 设置**max.poll.records
限制了每次poll()
获取的消息最大数量。即使batch.size
设置得较大,如果max.poll.records
较小,poll()
获取的消息数依然受限制。
- 消息生产速度
- 如果生产者写入 Kafka 的速度较慢,消费端的
poll()
方法在调用时可能没有足够的消息积累,因此无法返回足够多的消息。 - 解决方案:监控生产者的写入速度和 Kafka 的积压情况,确保有足够的消息被生产和累积。
- 如果生产者写入 Kafka 的速度较慢,消费端的
- 分区数量与消费者线程数
- 如果 Kafka topic 的分区数量较少,且每个消费者线程处理一个或多个分区,那么分区中的消息总量可能不足,导致每次
poll()
返回的消息数较少。 - 解决方案:确保 topic 分区数量合理,并且适当增加消费者实例,以提高并行处理能力。
- 如果 Kafka topic 的分区数量较少,且每个消费者线程处理一个或多个分区,那么分区中的消息总量可能不足,导致每次
**fetch.min.bytes**
** 和**fetch.max.wait.ms**
设置**fetch.min.bytes
:消费者从 Kafka 获取消息时,设置了一个最小的字节数,当数据量不足时,Kafka 将等待更多的数据写入。fetch.max.wait.ms
:当fetch.min.bytes
的数据量未达到时,Kafka 消费者会等待一定的时间再返回消息。如果设置过短,可能会在消息不足时提早返回。- 优化思路:增加
fetch.min.bytes
值,使消费者等待更多数据积累后再返回。同时适当调整fetch.max.wait.ms
,确保不会过早返回消息。
- 消费者的
**poll()**
调用频率- 消费者应用程序调用
poll()
的频率也影响返回的消息数量。如果频繁调用poll()
,每次返回的消息可能较少。 - 解决方案:适当调整
poll()
的调用频率,确保消费者等待足够的消息后再调用。
- 消费者应用程序调用
**session.timeout.ms**
** 和**heartbeat.interval.ms**
设置**- 如果这些参数配置不当,Kafka 消费者可能会因为过频繁发送心跳而导致每次
poll()
间隔较短,未能积累足够的消息。 - 解决方案:增加
session.timeout.ms
和heartbeat.interval.ms
的值,允许消费者有足够时间poll()
更多消息。
- 如果这些参数配置不当,Kafka 消费者可能会因为过频繁发送心跳而导致每次
优化建议
- **增加 **
**max.poll.records**
- 将
max.poll.records
设置得更大,以确保每次poll()
尽可能多地返回消息。例如,尝试将其从默认的 500 增加到 1000 或更大。
- 将
- **调整
**fetch.min.bytes**
和 ****fetch.max.wait.ms**
- 可以将
fetch.min.bytes
增加到 1MB 或 5MB,这样消费者将会等待 Kafka 收集到足够的消息再返回。也可以适当增加fetch.max.wait.ms
,让 Kafka 多等待一段时间再返回消息。
- 可以将
- 监控消费者调用的频率
- 适当降低
poll()
的调用频率,确保 Kafka 消费者有时间积累足够的数据。
- 适当降低
- 增加分区数
- 确保 Kafka topic 有足够的分区,使得每个分区中可以累积足够的消息。此外,可以考虑增加消费者数量来并行处理消息。
通过这些配置调整,你可以增加每次 poll()
获取的消息数量,从而提高批量处理效率。
参数含义
**fetch.max.wait.ms**
- 描述:
fetch.max.wait.ms
是 Kafka 消费者端的配置,表示当消费者从 Kafka broker 请求消息时,如果可用的数据量不足fetch.min.bytes
,消费者最多会等待的时间(毫秒)。当超出这个时间后,即使没有足够的数据,也会返回当前已经积累的消息。 - 用途:
- 这个参数主要用于优化消费者在没有足够数据的情况下的等待时间。通过设定一个合理的
fetch.max.wait.ms
,消费者可以等待更多数据积累来提高吞吐量,但不会因数据不足无限等待。 - 如果消息到达频率低,消费者就会等待
fetch.max.wait.ms
毫秒后返回;如果数据积累足够快,消费者会尽早返回。
- 这个参数主要用于优化消费者在没有足够数据的情况下的等待时间。通过设定一个合理的
- 默认值: 通常默认值是 500 毫秒。
- 场景: 应用于从本地 Kafka broker 拉取消息的延迟和吞吐量控制。
**remote.fetch.max.wait.ms**
- 描述:
remote.fetch.max.wait.ms
是 Kafka 远程存储机制相关的一个参数。远程存储是在 Kafka 3.0 引入的架构优化,允许将过期的日志段(log segments)存储在远程存储介质上,如云存储(Amazon S3,Google Cloud Storage 等),而不是一直保留在本地磁盘上。remote.fetch.max.wait.ms
控制 Kafka 代理从远程存储拉取过期日志段时,最大等待的时间。 - 用途:
- 当消费者尝试读取的数据已经从本地磁盘迁移到远程存储时,Kafka 代理会从远程存储系统中拉取该段数据。
remote.fetch.max.wait.ms
就是用来限制 Kafka 在从远程存储读取数据时的最大等待时间。 - 远程存储拉取通常比本地拉取要慢,因为涉及外部存储系统,所以这个参数用于优化消费者在这种情况下的性能。
- 当消费者尝试读取的数据已经从本地磁盘迁移到远程存储时,Kafka 代理会从远程存储系统中拉取该段数据。
- 场景: 应用于 Kafka 从远程存储拉取过期消息的等待时间控制。
fetch.min.bytes
fetch.min.bytes
是 Kafka 消费者端的一个配置参数,它用于控制每次从 Kafka broker 拉取数据时的最小数据量。这个参数决定了消费者拉取消息时的行为,影响数据批量处理的效率和延迟。
参数作用:
- 功能:指定 Kafka broker 每次返回给消费者的消息的最小字节数。当消费者发起拉取请求时,Kafka broker 会等待消息积累到至少
fetch.min.bytes
指定的字节数后,再将数据返回给消费者。 - 如果 broker 在
fetch.max.wait.ms
时间内没有积累到足够的数据(即fetch.min.bytes
),它会返回当前可用的数据量,即使小于fetch.min.bytes
。
默认值:
- 默认值为 1,意味着 broker 不需要等待消息积累到一定的字节数,只要有消息(即使只有一条消息),就可以立即返回给消费者。
使用场景:
- 高吞吐量场景:
- 如果你的应用需要批量处理 Kafka 消息,可以将
fetch.min.bytes
设置得大一些,确保每次拉取的数据足够多,以减少频繁的网络请求。 - 适合需要处理大批量数据的系统,比如数据分析或日志处理系统。这时,你可以设置较高的
fetch.min.bytes
,让 Kafka broker 等待更多消息积累后再返回。
- 如果你的应用需要批量处理 Kafka 消息,可以将
- 低延迟场景:
- 如果你希望 Kafka 消息能尽快被消费,不希望消费者等待消息积累,你可以将
fetch.min.bytes
保持默认值(1)。这时,broker 会在有数据可供消费时尽快返回,减少延迟。 - 适合需要快速响应的系统,比如实时监控或流数据处理。
- 如果你希望 Kafka 消息能尽快被消费,不希望消费者等待消息积累,你可以将
如果poll(100ms),fetch.max.wait.ms=500ms,那么100ms后mq未达到fetch.min.bytes。客户端会得到当前的records吗?
不会
例子:
假设有以下配置:
fetch.min.bytes = 1MB
fetch.max.wait.ms = 500ms
- 消费者调用了
poll()
向 broker 请求数据。
情况1:Broker 上的消息量 < 1MB
- 当消费者请求数据时,broker 上只有 500KB 的消息。
- Broker 会等到
fetch.max.wait.ms
(500ms),试图等待更多消息的到达。 - 如果在 500ms 内消息累积达到了 1MB,broker 会立即返回这 1MB 的消息。
- 如果 500ms 过去了,仍然没有足够的消息(比如只有 700KB),broker 会返回这些 700KB 的消息。
情况2:Broker 上的消息量 >= 1MB
- 如果消费者请求时,broker 上已经有 1MB 或更多消息,broker 会立即返回这些消息,不再等待
fetch.max.wait.ms
。
**fetch.max.wait.ms**
** 与 **fetch.min.bytes**
配合的意义:**
**fetch.min.bytes**
设置了消费者希望每次拉取的最小数据量,这样可以避免频繁拉取少量消息,提高吞吐量。**fetch.max.wait.ms**
防止消费者因为等不到足够多的消息而无限期等待,设置了一个时间上限。如果在这个时间内没有足够的数据,broker 仍然会返回已有的数据,避免消费者一直没有数据处理。
fetch.max.wait.ms 与 poll的超时应该相等比较合理,这样poll不会在mq消息量不足的时候拉到空数据空跑浪费cpu资源?
不是
理解两者的关系:
**fetch.max.wait.ms**
:- 这是 Kafka broker 在消息不足时等待积累更多数据的最大时间。如果在这段时间内没有更多数据到达,broker 会返回已经积累的消息(即使小于
fetch.min.bytes
)。 - 主要目的是避免过于频繁的拉取请求,减少网络传输的开销,增加单次拉取的消息量。
- 这是 Kafka broker 在消息不足时等待积累更多数据的最大时间。如果在这段时间内没有更多数据到达,broker 会返回已经积累的消息(即使小于
**poll()**
** 超时时间**:- 这是 Kafka 消费者客户端在本地等待 broker 返回数据的最大时间。
- 如果在
poll()
超时时间内 broker 没有返回数据,poll()
会返回空结果,并且消费者会继续下一轮poll()
。 - 主要目的是控制消费者的等待时间,以确保在没有数据的情况下不会阻塞太久。
两者的配合:
- 将
**fetch.max.wait.ms**
与**poll()**
超时时间设置为相等:如果poll()
的超时时间等于fetch.max.wait.ms
,理论上可以避免poll()
过早返回空数据,因为 broker 正在等待积累足够的数据。
这种配置的好处是可以减少消费者空跑的概率,尤其是在消息量较小的场景中。它确保了在 poll()
的等待时间内,broker 至少有足够的时间来积累消息,最大限度地提高单次拉取的数据量。
- 实际应用中的权衡:
- 在实际场景中,将
**fetch.max.wait.ms**
设置略小于**poll()**
超时时间可能是更合理的选择。例如,你可以设置fetch.max.wait.ms
为 500ms,poll()
超时时间为 600ms。这种配置让 broker 有足够时间积累数据,并且消费者poll()
方法也有足够时间等待 broker 返回数据。 - 如果
poll()
超时时间与fetch.max.wait.ms
完全相等,有时可能会导致poll()
稍微早于 broker 返回数据,从而造成一些无效的poll()
调用。稍微延长poll()
时间可以避免这一问题。
- 在实际场景中,将
其他因素影响:
- 消息吞吐量和延迟:过长的
fetch.max.wait.ms
和poll()
时间会增加数据的累积量,但也可能增加处理延迟。如果应用对实时性要求较高,可能需要缩短这两个时间,使得消费者更频繁地获取消息。 - CPU 和网络资源:延长这两个时间可以减少空跑和频繁的拉取请求,从而节省 CPU 和网络资源。但如果设置过长,可能会造成消费者响应不及时,特别是当消息积压严重时。
示例:
假设 fetch.max.wait.ms
设置为 500ms,poll()
超时时间设置为 600ms:
- 如果在 500ms 内 broker 积累了足够消息,broker 会立即返回数据,消费者的
poll()
将会在接收到消息后立刻处理。 - 如果 broker 在 500ms 内没有积累到足够的消息,它会返回当前可用的数据,
poll()
超时不会过早触发,确保了消费者不会空跑。 - 如果
poll()
超时时间设置得比fetch.max.wait.ms
短,消费者可能会在 broker 还未返回数据之前超时,导致空轮询。
总结:
- 一般建议:可以将
poll()
超时时间设置得稍微大于fetch.max.wait.ms
,这样可以确保 broker 有足够时间积累消息,同时避免poll()
过早超时。 - 合理的设置:
fetch.max.wait.ms
= 500ms,poll()
超时 = 600ms。这样 broker 可以最大限度积累数据,消费者也有足够时间等待数据返回,避免空跑。
这种策略会帮助你在减少空轮询和提高批量处理效率之间找到平衡。