使用 Telegraf 消费 JSON 格式的数据
Telegraf 支持多种格式的输入数据。在实际应用中,最常见的就是JSON格式吧。但是,在消费 JSON 格式时,需要巧妙地设置参数,才能正确地提取数据。
解析固定结构的JSON数据 🔗
当查阅Telegraf文档时, json 的首选是首选的解析器。但是,它的文档中也有一行说明:
NOTE: All JSON numbers are converted to float fields. JSON strings and booleans are ignored unless specified in the
tag_key
orjson_string_fields
options.
这意味着,如果输入数据中包含了非数字的标签(tag
)或者字段(field
),比如字符串、布尔类型,那么需要使用 tag_key
或者 json_string_fields
显式地设置这些字段。如果输入数据的格式是固定的,那么就可以就可以这么做。
比如,假定数据源为 Kafka,输入的数据示例为
{
"fields": {
"Data1": 123.1,
"Data2": true
},
"name": "Device100",
"tags": {
"device_id": "Device100"
},
"timestamp": 1706154000430
}
期望按照原格式将数据发送到消费者,那么设置数据格式为 json
(即 data_format="json"
)。此时嵌套的数据会被拍平,消费数据的配置为:
[[inputs.kafka_consumer]]
brokers = ["x.x.x.x:9092"]
consumer_group = "consumer_group_name"
topics = ["kafka_topic_name"]
data_format = "json"
json_time_format = "unix_ms"
json_name_key = "name"
json_time_key = "timestamp"
tag_keys = ["tags__device_id"]
json_string_fields = ["fields__Data2"]
[[processors.regex]]
[[processors.regex.field_rename]]
pattern = "^fields_(.*)$"
replacement = "${1}"
[[processors.regex]]
[[processors.regex.tag_rename]]
pattern = "^tags_(.*)$"
replacement = "${1}"
解析动态结构的JSON数据 🔗
如果JSON数据的结构是动态,比如每条数据中的 fields
里的字段是不同的,或者相同字段名在不同的消息中对应的值类型也可能不同,那么就不能使用tag_key
和json_string_fields
固定地设置非数字类型的标签和字段。
另一个解析器是 xpath
。利用这个插件和 xpath 表达式,就可以动态地解析JSON格式的数据了,比如:
[[inputs.kafka_consumer]]
brokers = ["x.x.x.x:9092"]
topics = ["kafka_topic_name"]
consumer_group = "consumper_group_name"
data_format = "xpath_json"
xpath_native_types = true
[[inputs.kafka_consumer.xpath]]
metric_name = "/name"
timestamp = "/timestamp"
timestamp_format = "unix_ms"
field_selection = "/fields/*"
tag_selection = "/tags/*"
结尾 🔗
总体来看,使用xpath
解析JSON格式的输入数据是简单明了的解决方案。
加载中...