使用 Telegraf 发送数据到阿里云 IoT 平台

2024-03-25#Telegraf#阿里云#IoT

阿里云的IoT平台本质上是一个以MQTT服务为核心的物联网平台。国内的一些商业网关设备,提供了上报数据阿里云IoT平台的支持。而使用开源方案的话,Telegraf 提供了 MQTT 输出插件,配合其他的配置,也可以数据上云。

问题定义 🔗

工业设备或者 IoT 设备的数据已经被采集到本地服务器(边缘服务器),比如在 OPCUA 服务器、Kakfa 甚至本地的 MQTT 服务器中。为了数据监控、分析和应用,需要将数据传送到阿里云 IoT 平台中。待传到 IoT 平台之后,还可以利用它的数据流转服务,编写类 Javascript 代码,将数据流转到 Lindorm 时序数据库中。本文只关注上报数据到阿里云 IoT 平台。

技术调研 🔗

Telegraf 🔗

Telegraf 提供了发送数据到 MQTT 服务器的集成,具体见其代码库中的文档《MQTT Producer Output Plugin》

阿里云 IoT 平台 🔗

在最核心 MQTT 服务器 之外,阿里云IoT平台还包括了管理元数据的物模型、鉴权认证等。一个产品对应一个物模型,而一个产品可以实例化为多个设备。物模型的介绍见阿里云的文档《配置物模型》,其中介绍了《什么是物模型》。在此不做赘述。

鉴权包括了TLS证书、MQTT用户名密码认证。其中,MQTT用户名是由设备名字和产品键组成的,即deviceName+"&"+productKey,而每个设备由自己的密码。TLS证书见《MQTT-TLS连接通信》 中的文档。本文作者验证了使用“Global Sign R1根证书”的方法。

Telegraf 与 阿里云 IoT 集成的挑战 🔗

在真实的场景下,当使用 Telegraf 的 MQTT 输出插件与阿里云 IoT 集成时,会遇到一个问题:假如有数十个设备,那么意味着就有数十个用户名密码对。但是每一个 Telegraf 的集成,只支持填写一对 MQTT 输出插件的用户名密码。这也就意味着,针对每一个设备,都需要启动一个 Telegraf 进程。这显然带来了管理和运维上的复杂度。目前我还没有找到简化的方法。

当发送数据到阿里云时,需要构建特定格式的 JSON 格式的消息体,而不是 Influxdb 的格式。好消息是,可以使用 json_transformation 配置发送的消息体。

为了安全起见,一些企业只允许本地网络和阿里云之间通过VPN网关联通,此时阿里云IoT位于 VPC 之中。截止当前,本地的客户端无法直接访问阿里云IoT的VPC地址,还需要在 VPC 中搭建一个三层代理进行数据转发。

在早期的 Telegraf 有一个 Bug,它会在构建 MQTT 主题的时候,去掉 / 前缀,而阿里云的 MQTT 主题正好以 / 开头。有很多文章不建议 MQTT 的主题以 / 开头,因为这会产生歧义,但阿里云似乎总是不按常理出牌。在最新的版本中这个问题已经修复。

【疯狂吐槽】阿里云为什么不按常理出牌?可能是独树一帜吧!也许这样就有自己的标准了。

配置举例 🔗

使用 Nginx 做三层代理 🔗

通常 Nginx 被用于做四层代理,其实也可以作为三层代理,配置示例如下:

stream {
    upstream aliyun_iot_platform_1883 {
        server iot-xxxx-vpc.mqtt.iothub.aliyuncs.com:1883; # 服务器地址位于“查看开发设置”
    }

    server {
        listen 31883; # 本地端口,自行选择
        proxy_pass aliyun_iot_platform_1883;
    }
}

Telegraf 配置 🔗

Telegraf MQTT 输出插件的配置示例如下,注意修改deviceNameproductKeyclient_idpasswordclient_idpassword 的计算方式见文档《MQTT-TLS连接通信》):

[[outputs.mqtt]]
servers = ["tls://x.x.x.x:xxxx"] # 指向 VPC 中的代理
username = "deviceName&productKey"
client_id = "client_id"
password = "password"
topic = "/sys/productKey/deviceName/thing/event/property/post"
protocol = "3.1.1"
qos = 1
tls_ca = "/etc/telegraf/aliyun/root.crt" # 指向本地的 “Global Sign R1根证书”
keep_alive = 3000
connection_timeout = "5s"
insecure_skip_verify = true
timeout = "2s"

data_format = "json"
json_timestamp_units = "1ms"
# 下面的 tags.Property 是一个自定义的标签,对应的是设备的属性
json_transformation = '''
  {"id": $join([tags.Property, "-", $string(timestamp)]),
  "version":"1.0",
  "sys": {
    "ack": 0
  },
  "params": {
    tags.Property: {
      "value": fields.Value,
      "time": timestamp
    }},
    "method": "thing.event.property.post"
  }
'''

结尾 🔗

使用 Telegraf 发送数据到阿里云 IoT 平台是可行的,但是在集成的过程中,并不是很顺利。尤其在遇到很多设备的时候,如果每个设备都运行一个 Telegraf 进程,那管理起来就是噩梦。如果不需要在阿里云上构建物模型,而只是为了转存 IoT 数据到阿里云上的 Lindorm 时序数据库作为分析,那么可以直接将数据发送到 Lindorm 时序数据。


加载中...