Hudi on Flink

本文将分享如何在 Flink 环境下使用 Apache Hudi。

一、准备工作

1. 下载 Flink 安装包
Hudi 适用于 Flink 1.13, Flink 1.14, Flink 1.15 和 Flink 1.16。安装完 Flink 后,选择你想要的 Hudi-Flink bundle jar 包,比如:
  • hudi-flink1.13-bundle
  • hudi-flink1.14-bundle
  • hudi-flink1.15-bundle
  • hudi-flink1.16-bundle
2. 启动 Flink 集群
在 Hadoop 环境中启动一个 standAlone 模式的 Flink 集群,启动之前,需要做如下配置:
  • flink-conf.yaml 中增加taskmanager.numberOfTaskSlots: 4
  • workers 中增加四行localhost,表示在本地集群中添加4个 Worker
然后使用如下命令启动 Flink 集群:
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

#
 Start the Flink standalone cluster
./bin/start-cluster.sh
3. 启动 Flink SQL 客户端
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

./bin/sql-client.sh embedded -j .../hudi-flink-bundle_2.1?-*.*.*.jar shell
注意:
  • 建议使用 hadoop 2.9.x+ 版本,因为有些对象存储只有在 2.9 版本以后才有具体的实现
  • flink-parquet 和 flink-avro 文件格式已经被打包进了 hudi-flink-bundle jar 中,无需单独引入

二、数据导入

-- sets up the result mode to tableau to show the results directly in the CLI
set sql-client.execution.result-mode = tableau;

CREATE TABLE t1(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = '${path}',
  'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
);

-- insert data using values
INSERT INTO t1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

三、数据查询

-- query from the Hudi table
select * from t1;

四、数据更新

-- this would update the record with key 'id1'
insert into t1 values
  ('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');

五、流式查询

Hudi Flink 还提供了根据指定时间戳查询变更的历史流记录信息。要想使用流式查询,需要带有 hive 配置的 jar 包,该 jar 包需要手动构建,可以参考这里:https://hudi.apache.org/cn/docs/syncing_metastore/#install
CREATE TABLE t1(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = '${path}',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',  -- this option enable the streaming read
  'read.start-commit' = '20210316134557'-- specifies the start commit instant time
  'read.streaming.check-interval' = '4' -- specifies the check interval for finding new source commits, default 60s.
);

-- Then query the table in stream mode
select * from t1;

六、删除数据

在流查询中使用数据时,Hudi Flink 源还可以接受来自底层数据源的更改日志,然后按行级应用 UPDATE 和 DELETE。然后就可以在 Hudi 上为各种  RDBMS同步一个 NEAR-REAL-TIME 快照。


往期推荐

Apache Hudi 核心概念及基础用例

Hudi on Spark 的使用


请使用浏览器的分享功能分享到微信等