修复 Kafka Connect 处理超大消息时任务失败的错误
在使用 Kafka Connect构建数据管道时,可能会遇到消息过大而导致任务失败的问题。本文记录排查问题的过程和解决方法。
背景 🔗
假定使用 Debezium 同步数据库的变更日志(比如MySQL binlog)到 Kafka,即使用 Kafka Source Connector 写入数据到 Kafka 中。当某条消息很大时,比如有多个 TEXT
类型的字段,导致整条记录被序列化成 JSON 后超过 1M。那么在默认情况下,在 Kafka Connect 的日志中(通常在 logs/connect.log
中)就会出现类似如下错误:
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1638624 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
注意这里的 1048576
指的是每条消息的最大字节数,其值等于 1024*1024
,即 1M
。
如果通过 Kafka Connect 的 API 接口查看任务的执行情况,也会看到虽然 Connector 在运行状态(RUNNING
),但是任务失败了(FAILED
):
{
"name": "connectorname",
"connector": {
"state": "RUNNING",
"worker_id": "0.0.0.0:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "0.0.0.0:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:334) \n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.prepareToPollTask(WorkerSourceTask.java:115/\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:347)\n\tat org.apache.kafka.connect. runtime.WorkerTask.doRun(WorkerTask.java:202)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask-java:257)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask-java:75/\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask. run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExeCutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor-java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:842)\nCaused by: org.apache.kafka.common.errors.RecordTo0LargeException: The message is 1638624 bytes when serialized which is larger than 1048576, which is the value of the max. request size configuration.\n",
"type": "source"
}
]
}
根据异常的错误消息,可以得知Kakfa Connect试图将超过1M的数据写入Kafka,超过了Kafka的配置。因此需要找到具体的配置项进行修改。
解决步骤 🔗
搜索相关信息 🔗
在网络搜索异常消息,可以找到很多的文章和讨论。比如:
因此这是一个常见问题,可以借鉴大量的前人经验。另外,还可找到Kakfa文档中的配置文档,作为参考:
修改 Kafka Connect 的配置 🔗
根据 Kafka Connect 的启动命令,它使用的配置文件是 config/connect-distributed.properties
。
根据错误消息中的 max.request.size
字样,修改相关配置。因此修改 config/connect-distributed.properties
,增加配置项:
producer.max.request.size=10485760 # 默认值为 1048576
注意:这里的配置项加入了 producer.
前缀。这似乎没有明确的文档说明,但亲测后,这个前缀是必须的;否则不生效)
重启 Kafka Connect后,可以看到 Kakfa Connect 里依然有错误消息,但是错误信息已经变了:
org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
可见修改是有效的。但还是需要修改 Kakfa 服务器端的配置项。
修改 Kakfa 服务器(Broker端)的配置项 🔗
根据 Kafka 的启动命令,可知它的配置文件是 config/kraft/server.properties
。因此在其中添加配置项:
message.max.bytes=10485880 # 默认值为 1048588
然后重启 Kafka 和 Kafka Connect。重启之后,可以发现 Kafka Connect已经不再产生错误消息;同时查看Connector的 API,任务已经变成了 RUNNING
状态:
{
"name": "connectorname",
"connector": {
"state": "RUNNING",
"worker_id": "0.0.0.0:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "0.0.0.0:8083"
}
],
"type": "source"
}
看起来问题已经修复。但是根据网络上的文章和官方文档,还应该修改配置项 replica.fetch.max.bytes
;因此在 config/connect-distributed.properties
中 中再添加配置项:
replica.fetch.max.bytes=10485760
然后重启 Kafka 和 Kafka Connect。
结尾 🔗
以上介绍的是全局修改Kafka消息大小限制的方式。另外Kakfa还提供了配置项,对每个 Topic 里的消息大小进行限制,以进行精细化的管理。