Hudi on Spark 的使用

本文将分享如何在 spark 环境下使用 Apache Hudi。

  • 使用 Spark 数据源,我将通过代码演示使用 Hudi 默认的 Table Type(Copy on Write)来 insert 或 update Hudi 表。

  • 在每次写入操作之后,我将演示如何以快照和增量方式读取数据。

一、准备阶段

Hudi 适用于 Spark-2.4.3+ 或者 Spark 3.x 版本。Spark 3 版本适配如下所示:

Hudi Spark 3.X
0.12.x 3.3.x (default build), 3.2.x, 3.1.x
0.11.x 3.2.x (default build, Spark bundle only), 3.1.x
0.10.x 3.1.x (default build), 3.0.x
0.7.0 - 0.9.0 3.0.x
0.6.0 and prior not supported

default build 表示是用来构建hudi-spark3-bundle的。

Hudi 支持使用 Spark SQL 通过 HoodieSparkSessionExtension sql 扩展来写入和读取数据。我们可以在 Spark 安装目录使用 Hudi 运行 Spark SQL:

  • Spark 3.3

# Spark 3.3
spark-sql --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
  • Spark 3.2

# Spark 3.2
spark-sql --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.13.1 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
  • Spark 3.1

# Spark 3.1
spark-sql --packages org.apache.hudi:hudi-spark3.1-bundle_2.12:0.13.1 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
  • Spark 2.4

# Spark 2.4
spark-sql --packages org.apache.hudi:hudi-spark2.4-bundle_2.11:0.13.1 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

对于 Spark 3.2 及以上版本, 必须配置spark_catalog: --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'

二、创建表

Spark SQL 需要显示创建表命令。

1. 关于表的一些概念

  • 表类型
    Hudi 表的两种类型:Copy-On-Write (COW) 和 Merge-On-Read (MOR) 都可以使用 Spark SQL 进行创建,可以在创建表时指定表类型:type = 'cow' 或者 type = 'mor'

  • 分区表和非分区表
    我们可以使用 Spark SQL 创建分区表或非分区表。
    在创建分区表时,可以使用partitioned by语句指定分区字段。
    如果创建表时不加partitioned by语句,则表示创建一个非分区表。

  • 内部表和外部表
    Spark SQL 支持两种表,即:内部表和外部表。
    当我们使用location语句或create external table语句来创建表时,表示创建一个外部表,否则就创建一个内部表。

注意:

  • 从 Hudi 0.10.0 开始,创建表时必须指定primaryKey

  • primaryKeypreCombineFieldtype都是大小写敏感的。

  • 创建 MOR 表时必须指定preCombineField

  • 当设置了primaryKeypreCombineFieldtype或者其他配置时,tblproperties的优先级高于options的配置。

  • 新创建的 Hudi 表默认会设置hoodie.datasource.write.hive_style_partitioning=true

2. 创建一个非分区表

-- create a cow table, with primaryKey 'uuid' and without preCombineField provided
create table hudi_cow_nonpcf_tbl (
  uuid int,
  name string,
  price double
using hudi
tblproperties (
  primaryKey = 'uuid'
);


-- create a mor non-partitioned table with preCombineField provided
create table hudi_mor_tbl (
  id int,
  name string,
  price double,
  ts bigint
using hudi
tblproperties (
  type = 'mor',
  primaryKey = 'id',
  preCombineField = 'ts'
);

3. 创建一个分区表

-- create a partitioned, preCombineField-provided cow table
create table hudi_cow_pt_tbl (
  id bigint,
  name string,
  ts bigint,
  dt string,
  hh string
using hudi
tblproperties (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 )
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl';

4. 为已经存在的 Hudi 表创建一个表

create table hudi_existing_tbl using hudi
location '/tmp/hudi/hudi_existing_table';

注意:创建表时不需要指定 schema,除非分区字段已经存在了,Hudi 可以自动重组 Schema。

5. CTAS

Hudi 支持 CTAS (Create Table As Select) on Spark SQL。为了高效的将数据加载进 Hudi 表,CTAS 在写入操作时使用了 bulk insert

  • 例子1:创建一个不使用 preCombineField 的非分区表,类型为 COW:

-- CTAS: create a non-partitioned cow table without preCombineField
create table hudi_ctas_cow_nonpcf_tbl
using hudi
tblproperties (primaryKey = 'id')
as
select 1 as id'a1' as name10 as price;
  • 例子2:创建一个使用 primarykey 的分区表,类型为 COW:

-- CTAS: create a partitioned, preCombineField-provided cow table
create table hudi_ctas_cow_pt_tbl
using hudi
tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts')
partitioned by (dt)
as
select 1 as id'a1' as name10 as price, 1000 as ts, '2021-12-01' as dt;
  • 例子3:使用 CTAS 命令从另一个表加载数据到当前表

# create managed parquet table
create table parquet_mngd using parquet location 'file:///tmp/parquet_dataset/*.parquet';

# CTAS by loading data into hudi table
create table hudi_ctas_cow_pt_tbl2 using hudi location 'file:/tmp/hudi/hudi_tbl/' options (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 )
partitioned by (datestr) as select * from parquet_mngd;

6. 创建表的属性

当我们创建 Hudi 表时,可以设置一些属性,下面是一些主要设置:

属性名称 默认值 属性说明
primaryKey uuid 表的主键字段,多个字段以逗号分隔
preCombineField
表的 pre-combine 字段
type cow 表类型,type = 'cow' 表示 COPY-ON-WRITE 表, type = 'mor' 表示 MERGE-ON-READ 表

三、数据导入

-- insert into non-partitioned table
insert into hudi_cow_nonpcf_tbl select 1'a1'20;
insert into hudi_mor_tbl select 1'a1'201000;

-- insert dynamic partition
insert into hudi_cow_pt_tbl partition (dt, hh)
select 1 as id'a1' as name1000 as ts, '2021-12-09' as dt, '10' as hh;

-- insert static partition
insert into hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='11'select 2'a2'1000;

① 如果设置了preCombineKeyinsert into使用upsert导入方式,否则使用insert
② Hudi 也支持bulk_insert导入方式,只需要设置两个参数:hoodie.sql.bulk.insert.enablehoodie.sql.insert.mode,例如:

-- upsert mode for preCombineField-provided table
insert into hudi_mor_tbl select 1'a1_1'201001;
select idname, price, ts from hudi_mor_tbl;
1   a1_1    20.0    1001

-- bulk_insert mode for preCombineField-provided table
set hoodie.sql.bulk.insert.enable=true;
set hoodie.sql.insert.mode=non-strict;

insert into hudi_mor_tbl select 1'a1_2'201002;
select idname, price, ts from hudi_mor_tbl;
1   a1_1    20.0    1001
1   a1_2    20.0    1002

四、数据查询

 select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0

注意:从 0.9.0 开始,Hudi 支持了内置的文件索引(HoodieFileIndex)来查询 Hudi 表,它支持分区裁剪和元表查询,这会大大提升查询性能。它还支持非全局路径查询,意味着我们可以根据基本路径来查询,而无需子啊查询路径中指定“*”。

1. 时间线查询

Timeline 维护了所有数据文件和目录的时间戳信息,并按照时间顺序组织它们。这使得 HUDI 能够追踪数据集的完整变更历史,并支持 time-travel query。通过 Timeline,用户可以查询数据集在不同时间点的版本,回溯到过去的数据状态。例如:

create table hudi_cow_pt_tbl (
  id bigint,
  name string,
  ts bigint,
  dt string,
  hh string
using hudi
tblproperties (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 )
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl';

insert into hudi_cow_pt_tbl select 1'a0'1000'2021-12-09''10';
select * from hudi_cow_pt_tbl;

-- record id=1 changes `name`
insert into hudi_cow_pt_tbl select 1'a1'1001'2021-12-09''10';
select * from hudi_cow_pt_tbl;

-- time travel based on first commit time, assume `20220307091628793`
select * from hudi_cow_pt_tbl timestamp as of '20220307091628793' where id = 1;
-- time travel based on different timestamp formats
select * from hudi_cow_pt_tbl timestamp as of '2022-03-07 09:16:28.100' where id = 1;
select * from hudi_cow_pt_tbl timestamp as of '2022-03-08' where id = 1;

五、数据更新

Spark SQL 提供了两种 DML 方式来更新 Hudi 表:Merge-Into 和 Update.

1. Update

  • 语法

UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE boolExpression]
  • 例子

update hudi_mor_tbl set price = price * 2, ts = 1111 where id = 1;

update hudi_cow_pt_tbl set name = 'a1_1', ts = 1001 where id = 1;

-- update using non-PK field
update hudi_cow_pt_tbl set ts = 1001 where name = 'a1';

注意:Update 操作需要指定 preCombineField

2. MergeInto

  • 语法

MERGE INTO tableIdentifier AS target_alias
USING (sub_query | tableIdentifier) AS source_alias
ON 
WHEN MATCHED [ AND  ] THEN  ]
WHEN NOT MATCHED [ AND  ]  THEN  ]

 =A equal bool condition 
  =
  DELETE  |
  UPDATE SET *  |
  UPDATE SET column1 = expression1 [, column2 = expression2 ...]
  =
  INSERT *  |
  INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])
  • 例子

-- source table using hudi for testing merging into non-partitioned table
create table merge_source (id intname string, price double, ts bigintusing hudi
tblproperties (primaryKey = 'id', preCombineField = 'ts');
insert into merge_source values (1"old_a1"22.22900), (2"new_a2"33.332000), (3"new_a3"44.442000);

merge into hudi_mor_tbl as target
using merge_source as source
on target.id = source.id
when matched then update set *
when not matched then insert *
;

-- source table using parquet for testing merging into partitioned table
create table merge_source2 (id intname string, flag string, dt string, hh stringusing parquet;
insert into merge_source2 values (1"new_a1"'update''2021-12-09''10'), (2"new_a2"'delete''2021-12-09''11'), (3"new_a3"'insert''2021-12-09''12');

merge into hudi_cow_pt_tbl as target
using (
  select idname'1000' as ts, flag, dt, hh from merge_source2
source
on target.id = source.id
when matched and flag != 'delete' then
 update set id = source.id, name = source.name, ts = source.ts, dt = source.dt, hh = source.hh
when matched and flag = 'delete' then delete
when not matched then
 insert (idname, ts, dt, hh) values(source.id, source.name, source.ts, source.dt, source.hh)
;

六、增量查询

Hudi 还可以根据指定时间戳来查询历史数据流。例如:

// spark-shell
// reload data
spark.
  read.
  format("hudi").
  load(basePath).
  createOrReplaceTempView("hudi_trips_snapshot")

val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2) // commit time we are interested in

// incrementally query data
val tripsIncrementalDF = spark.read.format("hudi").
  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
  load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")

spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()

七、Structured Streaming

Hudi 支持 Spark Structured Streaming。

  • Streaming Read

// spark-shell
// reload data
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Overwrite).
  save(basePath)

// read stream and output results to console
spark.readStream.
  format("hudi").
  load(basePath).
  writeStream.
  format("console").
  start()

// read stream to streaming df
val df = spark.readStream.
        format("hudi").
        load(basePath)
  • Streaming Write

// spark-shell
// prepare to stream write to new table
import org.apache.spark.sql.streaming.Trigger

val streamingTableName = "hudi_trips_cow_streaming"
val baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
val checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"

// create streaming df
val df = spark.readStream.
        format("hudi").
        load(basePath)

// write stream to new hudi table
df.writeStream.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, streamingTableName).
  outputMode("append").
  option("path", baseStreamingPath).
  option("checkpointLocation", checkpointLocation).
  trigger(Trigger.Once()).
  start()

八、删除数据

Hudi 支持两种删除类型:

1. Soft Deletes

这会保留记录的 key,只是将所有其他字段的值清空。 Soft Deletes中包含空值的记录始终保留在存储中,并且永远不会被删除。

// spark-shell
spark.
  read.
  format("hudi").
  load(basePath).
  createOrReplaceTempView("hudi_trips_snapshot")
// fetch total records count
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()
// fetch two records for soft deletes
val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2)

// prepare the soft deletes by ensuring the appropriate fields are nullified
val nullifyColumns = softDeleteDs.schema.fields.
  map(field => (field.name, field.dataType.typeName)).
  filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1)
    && !Array("ts""uuid""partitionpath").contains(pair._1)))

val softDeleteDf = nullifyColumns.
  foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))(
    (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2)))

// simply upsert the table after setting these fields to null
softDeleteDf.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(OPERATION_OPT_KEY, "upsert").
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Append).
  save(basePath)

// reload data
spark.
  read.
  format("hudi").
  load(basePath).
  createOrReplaceTempView("hudi_trips_snapshot")

// This should return the same total count as before
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
// This should return (total - 2count as two records are updated with nulls
spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()

2. Hard Deletes

删除物理记录。

delete from hudi_cow_nonpcf_tbl where uuid = 1;

delete from hudi_mor_tbl where id % 2 = 0;

-- delete using non-PK field
delete from hudi_cow_pt_tbl where name = 'a1';

九、Insert Overwrite

-- insert overwrite non-partitioned table
insert overwrite hudi_mor_tbl select 99'a99'20.0900;
insert overwrite hudi_cow_nonpcf_tbl select 99'a99'20.0;

-- insert overwrite partitioned table with dynamic partition
insert overwrite table hudi_cow_pt_tbl select 10'a10'1100'2021-12-09''10';

-- insert overwrite partitioned table with static partition
insert overwrite hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='12'select 13'a13'1100;

十、其他操作

1. Alter Table

--rename to:
ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2;

--add column:
ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string);

--change column:
ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint;

--set properties;
alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');

2. Partition 操作

--show partition:
show partitions hudi_cow_pt_tbl;

--drop partition:
alter table hudi_cow_pt_tbl drop partition (dt='2021-12-09', hh='10');
 

请使用浏览器的分享功能分享到微信等