使用阿里云 Flink 集成 Kafka 数据到 Hologres 中

2024-03-05#Kafka#Flink#Hologres#阿里云

Hologres 是阿里云提供的实时数据仓库引擎,将业务数据库同步到Hologre后,可构建实时的分析型应用。本文介绍一种数据管道的实现,将数据同步到Hologres中;同时介绍了潜在的漏洞和解决方案。

场景定义 🔗

假定业务系统的数据变更(比如MySQL的binglog)已经通过 Debezium 等CDC工具接入到了 Kafka了,数据格式为 json。期望将业务数据库表重现到 Hologres 中。

那么可使用阿里云的实时计算 Flink 服务,即可将 Kafka 中的数据,实时地接入到 Hologres 中。主要步骤如下:

  1. 在 Hologres 中创建目标表
  2. 在阿里云 Flink 中创建数据目录
  3. 在阿里云 Flink 中创建 Flink SQL脚本
  4. 部署 Flink SQL脚本,创建告警规则

操作步骤 🔗

在Hologres中创建目标表 🔗

在 Hologres 的控制台界面通过表单创建表,或者使用纯SQL创建数据库表。鉴于要将业务数据库表重现到 Hologres 中,那么就得使用 upsert 的方式插入数据,因此数据库表必须有主键。

通常业务数据库都是行存储,但是在大部分的分析场景下,数据表的存储格式应该为列存储(或者“行列共存”),具体见官方文档

如要需要在 Hologres 一次性创建多个表,那么在需要在SQL使用 BEGIN;END; 包起来。

为了便于今后的调试和问题排查,除了将业务数据表字段同步到目标表之外,还可以将数据集成的元数据存储起来,比如 Kafka 消息中的分区、偏移量,以及数据的处理时间等。

示例如下:

BEGIN;

CREATE TABLE schema_name.table_name (
    id text,
    column_example text,
    __deleted boolean,
    __kafka_partition integer,
    __kafka_offset bigint,
    __kafka_timestamp timestamp without time zone,
    __proc_time timestamp without time zone,
    PRIMARY KEY (id)
);

END;

注意,以上的定义中包含了特殊的字段 __deleted。这是因为在 Flink SQL中无法删除字段,只能进行标识。这意味着在查询数据时,必须加入过滤条件 __deleted = false。这也是无奈之举。

在定义数据类型时,可能需要进行映射。这是因为:

  • 业务数据库的数据类型与Hologres并不一致,具体见文档
  • Kafka消息中的数据类型,与业务数据库字段类型可能不一致。比如timestamp类型可能会被转换成整型(unix time,注意确定是否要转换时区);bit(1)类型可能会变成布尔类型。
  • 业务数据库的字段类型可能发生变化。也许业务数据库中字符串类型为varchar(x),但强烈建议在此处使用text类型。因为业务数据库可能在今后修改可变字符串的最大长度,而 Hologres 是不支持修改类型。如果在 Hologres 中也使用 varchar(x),那么当业务数据库增加长度时,数据同步就会出错停止;此时也许只能删表重建了。

为了让 Flink 写数据到 Hologres 中,可以采用两种方式:

  1. 在 Flink SQL中创建临时表,并指定 Hologres数据库地址、用户名、密码等。由于密码是敏感信息,因此不能直接放在Flink SQL脚本中。幸运地是,阿里云Flink提供了密钥存储能力。可以在控制台界面创建密钥,然后在Flink SQL脚本中引用。
  2. 在阿里云 Flink 中创建一个数据目录,只填写一次数据库的地址、用户名、密码等即可。然后在Flink SQL中可以引用数据目录里的数据表即可。

第2中方法相对来说更简单。

在阿里云Flink控制台创建Flink SQL脚本,在其中定义临时表和插入语句。

注意:虽然此处的Flink脚本可以按照目录结构分层组织,但是每个脚本对应的运维任务并没有层级结构。因此Flink脚本的名字不能重复。

假定 Kafka 中的消息实例如下:

{ "Id": "ABC1000", "StringColumnExample": "string value", "__deleted": "false" }

Flink SQL脚本的示例如下:

CREATE TEMPORARY TABLE source_table_name(
    `Id` string,
    `StringColumnExample` string,
    __deleted string,
    __kafka_topic string METADATA from 'topic' VIRTUAL,
    __kafka_partition int METADATA FROM 'partition' VIRTUAL,
    __kafka_offset bigint METADATA FROM 'offset' VIRTUAL,
    __kafka_timestamp timestamp(3) METADATA FROM 'timestamp' VIRTUAL,
    __proc_time AS PROCTIME()
)
WITH (
    'connector' = 'kafka',
    'topic' = 'kafka_topic_name', -- changeme
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kakfa_bootstrap_server', -- changeme
    'properties.enable.auto.commit' = 'true',
    'properties.group.id' = 'consumer_group_id', -- changeme
    'format' = 'json'
);

BEGIN STATEMENT SET;

INSERT INTO `catalog_name`.`database_name`.`schema_name.table_name`(`id`, `column_example`, __deleted, __kafka_partition, __kafka_offset, __kafka_timestamp, __proc_time) -- changeme
SELECT
  `Id`,
  `StringColumnExample`,
  CASE WHEN __deleted = 'false' THEN FALSE ELSE TRUE END __deleted,
  __kafka_partition,
  __kafka_offset,
  __kafka_timestamp,
  __proc_time
FROM source_table_name;

END;

由于,Kakfa中的数据是JSON格式,因此字段名是大小写敏感的。而 Hologres 里的字段名也是大小写敏感的,但是 Hologres 默认会将查询中的字段名改成小写,因此如果有大写,那么需要用反引号引起来。

通常数据库表的负载并不高,如果每个Flink SQL只同步一个表,那么在表数量很多时,就得创建很多的Flink SQL脚本,意味着很多的Flink作业。不仅带来运维的复杂度,也导致相应的费用急剧上升。因此可以在每个Flink SQL脚本中,同步多个表。但要注意,如果想重新同步脚本中的某一个表,那么也不得不同步脚本中的所有表。因此需要合理地组织Flink SQL中的表。当Flink SQL脚本中需要写入多个目标表时,需要将所有的插入语句(INSERT INTO)放在 BEGIN STATEMENT SET;END; 之间。

在实际中发生了匪夷所思的致命问题:如果一个Flink SQL同步多个表,比如将 topicA 同步到 tableAtopicB 同步到 tableB,以此类推。那么,在某些特定情况下(有可能是数据同步速率高、数据表多),topicA的数据可能会被写入到 topicB。这是在真实情况下发生的情况,原因不得而知。目前摸索出的解决方案是在插入语句中,加入一个看似多余的条件 WHERE __kafka_topic = 'kafka_topic_name'。即插入语句应该为:

BEGIN STATEMENT SET;

INSERT INTO `catalog_name`.`database_name`.`schema_name.table_name`(`id`, `column_example`, __deleted, __kafka_partition, __kafka_offset, __kafka_timestamp, __proc_time)
SELECT
  `Id`,
  `StringColumnExample`,
  CASE WHEN __deleted = 'false' THEN FALSE ELSE TRUE END __deleted,
  __kafka_partition,
  __kafka_offset,
  __kafka_timestamp,
  __proc_time
FROM source_table_name WHERE __kafka_topic = 'kafka_topic_name';

END;

在编写好了Flink SQL脚本后,在控制台界面部署,然后运行即可开始同步任务。注意应该配置告警规则,以便在Flink作业出错(比如重启)时,推送消息到相应的维护者。

结尾 🔗

通过 Flink 同步 Kafka 中的流数据到 Hologres 应该是一个常见方案。在阿里云的一些介绍和阿里中,也有提及。但在真实实际的实施中,还要考虑运维复杂度、费用等问题,这又带来更多的问题,也许一不小心就撞到BUG。实践是检验真理的唯一标准,尽早在真实场景中验证,才能尽早地暴露问题,降低交付风险。


加载中...