修复 Kafka Connect 处理超大消息时任务失败的错误

2024-03-06#Kafka

在使用 Kafka Connect构建数据管道时,可能会遇到消息过大而导致任务失败的问题。本文记录排查问题的过程和解决方法。

背景 🔗

假定使用 Debezium 同步数据库的变更日志(比如MySQL binlog)到 Kafka,即使用 Kafka Source Connector 写入数据到 Kafka 中。当某条消息很大时,比如有多个 TEXT 类型的字段,导致整条记录被序列化成 JSON 后超过 1M。那么在默认情况下,在 Kafka Connect 的日志中(通常在 logs/connect.log 中)就会出现类似如下错误:

org.apache.kafka.common.errors.RecordTooLargeException: The message is 1638624 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.

注意这里的 1048576 指的是每条消息的最大字节数,其值等于 1024*1024,即 1M

如果通过 Kafka Connect 的 API 接口查看任务的执行情况,也会看到虽然 Connector 在运行状态(RUNNING),但是任务失败了(FAILED):

{
  "name": "connectorname",
  "connector": {
    "state": "RUNNING",
    "worker_id": "0.0.0.0:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "0.0.0.0:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:334) \n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.prepareToPollTask(WorkerSourceTask.java:115/\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:347)\n\tat org.apache.kafka.connect. runtime.WorkerTask.doRun(WorkerTask.java:202)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask-java:257)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask-java:75/\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask. run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExeCutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor-java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:842)\nCaused by: org.apache.kafka.common.errors.RecordTo0LargeException: The message is 1638624 bytes when serialized which is larger than 1048576, which is the value of the max. request size configuration.\n",
      "type": "source"
    }
  ]
}

根据异常的错误消息,可以得知Kakfa Connect试图将超过1M的数据写入Kafka,超过了Kafka的配置。因此需要找到具体的配置项进行修改。

解决步骤 🔗

搜索相关信息 🔗

在网络搜索异常消息,可以找到很多的文章和讨论。比如:

因此这是一个常见问题,可以借鉴大量的前人经验。另外,还可找到Kakfa文档中的配置文档,作为参考:

修改 Kafka Connect 的配置 🔗

根据 Kafka Connect 的启动命令,它使用的配置文件是 config/connect-distributed.properties

根据错误消息中的 max.request.size 字样,修改相关配置。因此修改 config/connect-distributed.properties,增加配置项:

producer.max.request.size=10485760 # 默认值为 1048576

注意:这里的配置项加入了 producer. 前缀。这似乎没有明确的文档说明,但亲测后,这个前缀是必须的;否则不生效)

重启 Kafka Connect后,可以看到 Kakfa Connect 里依然有错误消息,但是错误信息已经变了:

org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

可见修改是有效的。但还是需要修改 Kakfa 服务器端的配置项。

修改 Kakfa 服务器(Broker端)的配置项 🔗

根据 Kafka 的启动命令,可知它的配置文件是 config/kraft/server.properties。因此在其中添加配置项:

message.max.bytes=10485880 # 默认值为 1048588

然后重启 Kafka 和 Kafka Connect。重启之后,可以发现 Kafka Connect已经不再产生错误消息;同时查看Connector的 API,任务已经变成了 RUNNING 状态:

{
  "name": "connectorname",
  "connector": {
    "state": "RUNNING",
    "worker_id": "0.0.0.0:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "0.0.0.0:8083"
    }
  ],
  "type": "source"
}

看起来问题已经修复。但是根据网络上的文章和官方文档,还应该修改配置项 replica.fetch.max.bytes;因此在 config/connect-distributed.properties 中 中再添加配置项:

replica.fetch.max.bytes=10485760

然后重启 Kafka 和 Kafka Connect。

结尾 🔗

以上介绍的是全局修改Kafka消息大小限制的方式。另外Kakfa还提供了配置项,对每个 Topic 里的消息大小进行限制,以进行精细化的管理。


加载中...