使用 Telegraf 构建数据流水线:从 OPC-UA 服务器采集数据,发送到 Kafka
Telegraf 是一个收集并发送“指标”和“事件”的代理程序,利用其强大的插件系统,可以从数据源采集数据,然后进行处理和聚合,然后输出到其他数据存储中。它由 Go 语言编写,编译之后是一个没有其他依赖的可执行程序,给它传入配置文件即可运行。
本文介绍从 OPC-UA 采集数据,对数据进行转换,然后发送到 Kafka。
Telegraf 的整体架构 🔗
下面是 Telegraf 官方网站的插件系统图,可以一目了然地理解它的架构和4类插件类型:
Telegraf 的配置文件 🔗
Telegraf 的配置文件,采用的是 Toml 语言。一个最简单的例子如下:
# 代理设置
[agent]
interval = "10s"
round_interval = true
metric_batch_size = 1000
metric_buffer_limit = 10000
collection_jitter = "1s"
flush_interval = "10s"
flush_jitter = "0s"
precision = "1ms"
omit_hostname = true
# 输入
[[inputs.xxx]]
# ...
# 输出
[[outputs.xxx]]
# ...
从 OPC-UA 采集数据,发送到 Kafka 🔗
使用 OPC UA 客户端监听器输入插件,可从 OPCUA 服务器订阅节点数据,当点位的数据变化时,即可采集到数据。
[[inputs.opcua_listener]]
name = "opcoa_listener"
endpoint = "opc.tcp://x.x.x.x:xxxxx"
connect_timeout = "10s"
request_timeout = "5s"
subscription_interval = "2s"
security_policy = "None"
security_mode = "None"
auth_method = "Anonymous"
timestamp = "server" # 时间戳可以为 server, source, gather 之一
optional_fields = ["DataType"]
[[inputs.opcua_listener.group]]
sampling_interval = "2s"
nodes = [
# 假定点位的标识符为 __identifier1__
{name="__metricName1__", namespace="2", identifier_type="s", identifier="__identifier1__"},
]
此时可使用 Kafka 输出插件将数据写入 Kafka 中:
[[outputs.kafka]]
brokers = ["x.x.x.x:9092"]
topic = "__kafka_topic_name__"
routing_tag = "id"
data_format = "json"
json_timestamp_units = "1ms" #将输出的时间戳精度改为毫秒
此时写入到Kafka中的数据是嵌套的:
如果以JSON格式输出采集到的数据(采集到的点位数值为123
),那么数据示例如下:
{
"fields": { "DataType": "Double", "Quality": "OK (0x0)", "__metricName1__": 123 },
"name": "opcua_listener",
"tags": { "host": "server.local", "id": "ns=2;s=__identifier1__" },
"timestamp": 1704692144
}
如果想将其拍平,可添加 json_transformation
属性:
[[outputs.kafka]]
brokers = ["x.x.x.x:9092"]
topic = "kafka_topic_name"
routing_tag = "id"
data_format = "json"
json_timestamp_units = "1ms"
json_transformation = '''$merge([{"name": name, "timestamp": timestamp}, tags, fields])'''
此时得到的数据如下:
{
"name": "opcua_listener",
"timestamp": 1704692144,
"DataType": "Double",
"Quality": "OK (0x0)",
"__metricName1__": 123,
"host": "server.local",
"id": "ns=2;s=__identifier1__"
}
注意此时发给Kafka中的代表点位数值的键是 __metricName1__
,这导致 Kafka 中的每条消息的Value的结构都不相同,可能会导致下游处理变得复杂。一个解决方法是把这个 __metricName1__
直接修改为 value
;使用id
字段作为点位和指标的标识符。此时可使用正则替换处理器:
[[processors.regex]]
[[processors.regex.field_rename]]
pattern = "^__metric.*"
replacement = "Value"
这样得到的输出数据为:
{
"name": "opcua_listener",
"timestamp": 1704692144,
"DataType": "Double",
"Quality": "OK (0x0)",
"Value": 123,
"host": "server.local",
"id": "ns=2;s=__deviceIdentifier__"
}
备注 🔗
json_transformation
采用了 JSONata 语言对JSON数据进行查询和转换。
加载中...