聊聊Delta Lake的ACID事务特性

对于搞传统数据的同学来说,ACID这个词再熟悉不过了,它是一个缩写词,指的是定义事务的 4 个关键属性集:原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)。 如果数据库操作具有这些 ACID 属性,则可以称为 ACID 事务,应用这些操作的数据存储系统称为事务系统。 ACID 事务保证表的每次读、写或修改都具有以下属性:

原子性(Atomicity)

事务中的每个语句(读取、写入、更新或删除数据)都被视为单个单元。要么执行整个语句,要么不执行任何语句。例如,如果你的流数据源在中途发生故障,此属性可以防止发生数据丢失和损坏。

一致性(Consistency)

确保事务仅以预定义的、可预测的方式对表进行更改。事务一致性可确保数据中的损坏或错误不会对表的完整性造成意外后果。

隔离性(Isolation)

当多个用户同时从同一个表中读取和写入时,其事务的隔离可确保并发事务不会相互干扰或影响。每个请求都互不干扰,即使它们实际上是同时发生的。

持久性(Durability)

确保成功执行的事务对数据所做的更改将被保存,即使在系统出现故障的情况下也是如此。

ACID 事务长期以来一直是数据仓库最令人羡慕的特性之一,也正因为如此,Delta Lake 将它引入到了数据湖。即使新数据实时写入Delta Lake表中,也允许用户看到一致的数据视图,因为每次写入都是一个孤立的事务,记录在有序的事务日志中。

Delta Lake 采用尽可能最高级别的隔离(可串行化隔离),确保对单个表的读写一致且可靠。通过实现 ACID 事务,Delta Lake 有效解决了传统大数据平台Lambda 架构的几个不足之处:它的复杂性、不一致的数据视图以及 Lambda 管道不可避免地崩溃后所需的返工和重新处理。用户可以对其数据执行多个并发事务,如果数据源或流出现错误,Delta Lake 会取消事务的执行,以确保数据保持干净和完整。ACID 事务的美妙之处在于用户可以信任存储在 Delta Lake 中的数据。数据分析师使用 Delta Lake 表对其数据执行 ETL,为仪表板做好准备,可以保证看到的 KPI 代表了数据的真实状态。使用 Delta Lake 表执行特征工程的机器学习工程师可以 100% 确信所有转换和聚合操作要么完全按照预期执行,要么根本没有执行(在这种情况下,通过设置警报,然后用户就会收到通知)。

扯了这么多,还是停留在理论层面上,接下来结合实际例子来理解一下。

复习一下前面讲到的知识,传统的Data Lake和Delta Lake在物理表现形式上有什么不同?对的,在往传统的Data Lake里写数据时,你只要声明格式为delta就可以了,你的Data Lake就是Delta Lake了。

另外一个,Delta Lake的物理表现形式就是delta table。

如果忘记了,记得回去看看:聊聊Delta Lake - 1

在正式开始之前,我们需要做一些准备工作:

一个是对象存储服务,我们这里用Azure Data Lake Gen2,其实就是Azure Storage account Container这个服务。

Storage account名称:deltalakestore

Container名称:deltademo。而且该container下面没有任何数据。

接下来我们来模拟抽取原始数据,然后写入delta lake表中。

这里我们使用Databricks自带的people数据,是以parquet格式存储的。

# 数据路径"dbfs:/databricks-datasets/learning-spark-v2/people/people-10m.parquet/"

可以看到,一共有8个parquet文件,我们先读下面这个parquet文件:

part-00000-04bbf354-608c-43f2-9a81-7df139d1af69.snappy.parquet

people_data_path = "/databricks-datasets/learning-spark-v2/people/people-10m.parquet/part-00000-04bbf354-608c-43f2-9a81-7df139d1af69.snappy.parquet"df = spark.read.format('parquet').load(people_data_path)display(df)

可以看到,数据是超过1w条的,为了测试,我们只取少量数据:

from pyspark.sql.functions import col# 只取2条数据df0 = df.filter(col('id') <= 5)display(df0)

现在df0里有5条数据,在写数据之前,我们配置一下写入目标存储账户容器的权限,否则没权限写入到目标容器。

storeage_account = "deltalakestore"sas_token = "?xxxxxxxxxxxxxxxxxxxxxxxxxxxx"spark.conf.set(f"fs.azure.account.auth.type.{storeage_account}.dfs.core.chinacloudapi.cn", "SAS")spark.conf.set(f"fs.azure.sas.token.provider.type.{storeage_account}.dfs.core.chinacloudapi.cn", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")spark.conf.set(f"fs.azure.sas.fixed.token.{storeage_account}.dfs.core.chinacloudapi.cn", sas_token)

开始往目标容器写入数据,并且是以delta格式写入数据:

写数据之前,我们先要检查目标容器内是否已经存在delta table

from delta.tables import *from pyspark.sql.functions import *
container_name = "deltademo"delta_table_location = f"abfss://{container_name}@{storeage_account}.dfs.core.chinacloudapi.cn/createbydeltatableapi"DeltaTable.isDeltaTable(spark, delta_table_location)

返回的是False,也就是没有delta table,那我们需要先创建delta table,这里我们使用delta table Python API来创建:

# 如果目标delta表不存在,就创建新表,否则什么也不做DeltaTable.createIfNotExists(spark) \    .addColumn("id", "INT") \    .addColumn("firstName", "STRING") \    .addColumn("middleName", "STRING") \    .addColumn("lastName", "STRING", comment = "surname") \    .addColumn("gender", "STRING") \    .addColumn("birthDate", "TIMESTAMP") \    .addColumn("ssn", "STRING") \    .addColumn("salary", "INT") \    .property("description", "table with people data and created by delta table api") \    .location(delta_table_location) \    .execute()

执行上面代码之后,我们去目标容器的文件夹下面观察一下:

可以看到只有一个文件夹:_delta_log,因为我们只是创建了一个空表,没有写数据。

点开_delta_log文件夹看看:

这里回答后台粉丝问的一个问题,在容器文件夹里如何判断是data lake还是delta lake?就看数据文件夹下是否有_delta_log这个文件夹。

可以看到一个以一串0为文件名的json文件,先记住这个文件。

我们回到Databricks Notebook,开始把上面筛选出的5条数据写到delta表中:

df0.write.format('delta').mode('append').option('path', delta_table_location).save()

执行上面写数据的代码之后,返回容器查看一下:

可以看到,已经有一个parquet文件了,这个就是我们刚刚写入的数据。

我们确认一下:

delta_table_data = spark.read.load(delta_table_location)display(delta_table_data)

确实是我们写入的数据。

然后再进_delta_log文件夹查看一下:

可以发现,新增了一个json文件,而且文件名的数字是递增的。

我们再查看一下这张delta表的历史记录:

display(spark.sql("DESCRIBE HISTORY delta.`{}`".format(delta_table_location)))

可以看到,当前有两个版本,version 0的operation是CREATE TABLE,version 1的operation是WRITE,这个与我们上面的操作一致。

我们接下来多做几次写入操作,依次写入id从6到15的数据。

检查一下数据:

再看一下delta表的历史

当前11个版本。

再回去看_delta_log文件夹:

可以看到,json文件名最大也是10,而且多了一个1到6的压缩json文件。

好了,这次就先分享到这里!

点击关注本公众号,解锁更多Databricks知识!


请使用浏览器的分享功能分享到微信等