使用 Telegraf 消费 JSON 格式的数据

2024-01-29#Kafka#Telegraf

Telegraf 支持多种格式的输入数据。在实际应用中,最常见的就是JSON格式吧。但是,在消费 JSON 格式时,需要巧妙地设置参数,才能正确地提取数据。

解析固定结构的JSON数据 🔗

当查阅Telegraf文档时, json 的首选是首选的解析器。但是,它的文档中也有一行说明:

NOTE: All JSON numbers are converted to float fields. JSON strings and booleans are ignored unless specified in the tag_key or json_string_fields options.

这意味着,如果输入数据中包含了非数字的标签(tag)或者字段(field),比如字符串、布尔类型,那么需要使用 tag_key 或者 json_string_fields 显式地设置这些字段。如果输入数据的格式是固定的,那么就可以就可以这么做。

比如,假定数据源为 Kafka,输入的数据示例为

{
  "fields": {
    "Data1": 123.1,
    "Data2": true
  },
  "name": "Device100",
  "tags": {
    "device_id": "Device100"
  },
  "timestamp": 1706154000430
}

期望按照原格式将数据发送到消费者,那么设置数据格式为 json (即 data_format="json")。此时嵌套的数据会被拍平,消费数据的配置为:

[[inputs.kafka_consumer]]
brokers = ["x.x.x.x:9092"]
consumer_group = "consumer_group_name"
topics = ["kafka_topic_name"]
data_format = "json"
json_time_format = "unix_ms"
json_name_key = "name"
json_time_key = "timestamp"
tag_keys = ["tags__device_id"]
json_string_fields = ["fields__Data2"]

[[processors.regex]]
[[processors.regex.field_rename]]
pattern = "^fields_(.*)$"
replacement = "${1}"

[[processors.regex]]
[[processors.regex.tag_rename]]
pattern = "^tags_(.*)$"
replacement = "${1}"

解析动态结构的JSON数据 🔗

如果JSON数据的结构是动态,比如每条数据中的 fields 里的字段是不同的,或者相同字段名在不同的消息中对应的值类型也可能不同,那么就不能使用tag_keyjson_string_fields固定地设置非数字类型的标签和字段。

另一个解析器是 xpath。利用这个插件和 xpath 表达式,就可以动态地解析JSON格式的数据了,比如:

[[inputs.kafka_consumer]]
brokers = ["x.x.x.x:9092"]
topics = ["kafka_topic_name"]
consumer_group = "consumper_group_name"
data_format = "xpath_json"
xpath_native_types = true

[[inputs.kafka_consumer.xpath]]
metric_name = "/name"
timestamp = "/timestamp"
timestamp_format = "unix_ms"
field_selection = "/fields/*"
tag_selection = "/tags/*"

结尾 🔗

总体来看,使用xpath解析JSON格式的输入数据是简单明了的解决方案。


加载中...