使用 Telegraf 构建数据流水线:从 OPC-UA 服务器采集数据,发送到 Kafka

2024-01-09#Telegraf#Kafka

Telegraf 是一个收集并发送“指标”和“事件”的代理程序,利用其强大的插件系统,可以从数据源采集数据,然后进行处理和聚合,然后输出到其他数据存储中。它由 Go 语言编写,编译之后是一个没有其他依赖的可执行程序,给它传入配置文件即可运行。

本文介绍从 OPC-UA 采集数据,对数据进行转换,然后发送到 Kafka。

Telegraf 的整体架构 🔗

下面是 Telegraf 官方网站的插件系统图,可以一目了然地理解它的架构和4类插件类型:

Telegraf 的配置文件 🔗

Telegraf 的配置文件,采用的是 Toml 语言。一个最简单的例子如下:

# 代理设置
[agent]
  interval = "10s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 10000
  collection_jitter = "1s"
  flush_interval = "10s"
  flush_jitter = "0s"
  precision = "1ms"
  omit_hostname = true

# 输入
[[inputs.xxx]]
# ...

# 输出
[[outputs.xxx]]
# ...

从 OPC-UA 采集数据,发送到 Kafka 🔗

使用 OPC UA 客户端监听器输入插件,可从 OPCUA 服务器订阅节点数据,当点位的数据变化时,即可采集到数据。


[[inputs.opcua_listener]]
  name = "opcoa_listener"
  endpoint = "opc.tcp://x.x.x.x:xxxxx"
  connect_timeout = "10s"
  request_timeout = "5s"
  subscription_interval = "2s"
  security_policy = "None"
  security_mode = "None"
  auth_method = "Anonymous"
  timestamp = "server" # 时间戳可以为 server, source, gather 之一
  optional_fields = ["DataType"]

[[inputs.opcua_listener.group]]
sampling_interval = "2s"

nodes = [
  # 假定点位的标识符为 __identifier1__
  {name="__metricName1__", namespace="2", identifier_type="s", identifier="__identifier1__"},
]

此时可使用 Kafka 输出插件将数据写入 Kafka 中:

[[outputs.kafka]]
brokers = ["x.x.x.x:9092"]
topic = "__kafka_topic_name__"
routing_tag = "id"
data_format = "json"
json_timestamp_units = "1ms" #将输出的时间戳精度改为毫秒

此时写入到Kafka中的数据是嵌套的:

如果以JSON格式输出采集到的数据(采集到的点位数值为123),那么数据示例如下:

{
  "fields": { "DataType": "Double", "Quality": "OK (0x0)", "__metricName1__": 123 },
  "name": "opcua_listener",
  "tags": { "host": "server.local", "id": "ns=2;s=__identifier1__" },
  "timestamp": 1704692144
}

如果想将其拍平,可添加 json_transformation 属性:

[[outputs.kafka]]
brokers = ["x.x.x.x:9092"]
topic = "kafka_topic_name"
routing_tag = "id"
data_format = "json"
json_timestamp_units = "1ms"
json_transformation = '''$merge([{"name": name, "timestamp": timestamp}, tags, fields])'''

此时得到的数据如下:

{
  "name": "opcua_listener",
  "timestamp": 1704692144,
  "DataType": "Double",
  "Quality": "OK (0x0)",
  "__metricName1__": 123,
  "host": "server.local",
  "id": "ns=2;s=__identifier1__"
}

注意此时发给Kafka中的代表点位数值的键是 __metricName1__,这导致 Kafka 中的每条消息的Value的结构都不相同,可能会导致下游处理变得复杂。一个解决方法是把这个 __metricName1__ 直接修改为 value;使用id字段作为点位和指标的标识符。此时可使用正则替换处理器:

[[processors.regex]]
  [[processors.regex.field_rename]]
  pattern = "^__metric.*"
  replacement = "Value"

这样得到的输出数据为:

{
  "name": "opcua_listener",
  "timestamp": 1704692144,
  "DataType": "Double",
  "Quality": "OK (0x0)",
  "Value": 123,
  "host": "server.local",
  "id": "ns=2;s=__deviceIdentifier__"
}

备注 🔗

  • json_transformation 采用了 JSONata 语言对JSON数据进行查询和转换。

加载中...