欢迎关注Hadoop、Spark、Flink、Hive、Hbase、Flume等大数据资料分享微信公共账号:iteblog_hadoop
  1. 文章总数:1060
  2. 浏览总数:14,243,231
  3. 评论:4167
  4. 分类目录:111 个
  5. 注册用户数:7028
  6. 最后更新:2019年9月21日
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
iteblog_hadoop
大数据技术博客公众号bigdata_ai
开发爱好者社区:
bigdata_ai

深入理解 Apache Spark Delta Lake 的事务日志

事务日志是理解 Delta Lake 的关键,因为它是贯穿许多最重要功能的通用模块,包括 ACID 事务、可扩展的元数据处理、时间旅行(time travel)等。本文我们将探讨事务日志(Transaction Log)是什么,它在文件级别是如何工作的,以及它如何为多个并发读取和写入问题提供优雅的解决方案。

事务日志(Transaction Log)是什么

Delta Lake 事务日志(也称为 DeltaLog)是 Delta Lake 表上执行每次事务的有序记录。具体形式如下:

iteblog@iteblog:/tmp/delta-table/_delta_log|
⇒  ll
total 280
-rw-r--r--  1 iteblog  wheel   1.5K  8 21 14:19 00000000000000000000.json
-rw-r--r--  1 iteblog  wheel   1.1K  8 21 14:31 00000000000000000001.json
-rw-r--r--  1 iteblog  wheel   791B  8 21 14:31 00000000000000000002.json
-rw-r--r--  1 iteblog  wheel   3.9K  8 21 14:31 00000000000000000003.json
-rw-r--r--  1 iteblog  wheel   1.2K  8 21 19:38 00000000000000000004.json
-rw-r--r--  1 iteblog  wheel   1.2K  8 21 19:39 00000000000000000005.json
-rw-r--r--  1 iteblog  wheel   1.2K  8 21 19:40 00000000000000000006.json
-rw-r--r--  1 iteblog  wheel   1.2K  8 21 19:40 00000000000000000007.json
-rw-r--r--  1 iteblog  wheel   1.2K  8 21 19:40 00000000000000000008.json
-rw-r--r--  1 iteblog  wheel   1.2K  8 21 19:40 00000000000000000009.json
-rw-r--r--  1 iteblog  wheel    15K  8 21 19:40 00000000000000000010.checkpoint.parquet
-rw-r--r--  1 iteblog  wheel   1.2K  8 21 19:40 00000000000000000010.json
-rw-r--r--  1 iteblog  wheel   1.2K  8 21 19:40 00000000000000000011.json
-rw-r--r--  1 iteblog  wheel   1.2K  8 21 19:40 00000000000000000012.json
-rw-r--r--  1 iteblog  wheel   1.2K  8 21 19:40 00000000000000000013.json
-rw-r--r--  1 iteblog  wheel   1.2K  8 21 19:40 00000000000000000014.json
-rw-r--r--  1 iteblog  wheel   1.2K  8 21 19:40 00000000000000000015.json
-rw-r--r--  1 iteblog  wheel   1.2K  8 21 19:40 00000000000000000016.json

事务日志主要用途是什么?

单一事实来源

Delta Lake 构建于 Apache Spark™ 之上,允许多个写和读操作同时对给定表进行操作。为了始终向用户显示正确的数据视图,事务日志可作为单一事实来源(single source of truth) - 中央存储库,用于跟踪用户对表所做的所有更改。

当用户第一次读取 Delta Lake 表或在打开的表上运行一个新查询,该表自上次读取以来已被修改,Spark 会检查事务日志来查看已向表写入的新事务,然后使用这些新更改更新最终用户的表。这可确保用户表的版本始终与最新查询中的主记录同步,并且用户无法对表进行不同的,冲突的更改。

Delta Lake 上的原子性实现

原子性是 ACID 事务的四个属性之一,它可以保证在 Delta Lake 上执行的操作(如 INSERT 或 UPDATE )要么全部成功要么全部不成功。 如果没有此属性,硬件故障或软件错误很容易导致数据仅部分写入表中,从而导致数据混乱或损坏。

事务日志是 Delta Lake 能够提供原子性保证的机制。无论如何,如果它没有记录在事务日志中,它就不会发生。通过只记录完全执行的事务,并使用该记录作为唯一的真相来源,事务日志允许用户对其数据进行推理;并且即使数据在 PB 级别上,我们也可以对这些数据的准确性高枕无忧。

事务日志是如何工作的

将事务分解为原子提交

每当用户执行修改表的操作(例如插入、更新或删除)时,Delta Lake 将该操作分解为一系列由以下一个或多个操作组成的离散步骤:

  • Add file:添加一个数据文件;
  • Remove file:删除一个数据文件;
  • Update metadata:更新表的元数据(例如更改表的名称,模式或分区);
  • Set transaction:Structured Streaming 作业已经提交的具有给定 ID 的微批次记录;
  • Change protocol:通过将事务日志切换到最新的软件协议来启用新特性;
  • Commit info:包含有关提交的信息以及该操作是在何时何地进行的。

然后这些操作将按照有序的原子单位记录在事务日志中,称为提交。

例如,假设用户创建一个事务以向表中添加新列,并向其中添加更多数据。Delta Lake 会将该事务分解为多个部分,一旦事务完成,就将它们添加到事务日志中,如下所示:

  • Update metadata:更改模式以包含新列;
  • Add file:每个添加的新文件。

文件级别的事务日志

当用户创建 Delta Lake 表时,将在 _delta_log 子目录中自动创建该表的事务日志。 当他或她对该表进行更改时,这些更改将作为有序的原子提交记录在事务日志中。 每个提交都以 JSON 文件的形式写出,从 000000.json 开始。对表的其他更改按升序数字顺序生成后续 JSON 文件,所以下一次提交被写入到 000001.json 文件,下下次修改写入到 000002.json 文件,依此类推。

Diving Into Delta Lake: Unpacking The Transaction Log
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

因此,如果我们通过从数据文件 1.parquet 和 2.parquet 向表中添加记录。该事务将自动添加到事务日志中,并以 000000.json 的形式保存到磁盘。然后,我们改变主意并决定删除这些文件并添加一个新文件(3.parquet)。 这些操作将记录为事务日志中的下一个提交,也就是 000001.json,如下所示。

Diving Into Delta Lake: Unpacking The Transaction Log
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

尽管 1.parquet 和 2.parquet 不再是我们 Delta Lake 表的一部分,但它们的添加和删除仍记录在事务日志中,因为这些操作是在我们的表上执行的 - 尽管它们最终相互抵消了。 Delta Lake 仍然保留这样的原子提交,以确保在需要审计表或使用“时间旅行”来查看表在给定时间点的样子时,我们能够准确地做到这一点。

此外,即使我们从表中删除了基础数据文件,Spark 也不会立刻从磁盘中删除文件。用户可以使用 VACUUM 命令删除不再需要的文件。如下:

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToTable)

deltaTable.vacuum()        // vacuum files not required by versions older than the default retention period

deltaTable.vacuum(100)     // vacuum files not required by versions more than 100 hours old

使用检查点文件(Checkpoint Files)快速重新计算状态

一旦我们提交了10次事务日志,Delta Lake 就会在相同的 _delta_log 子目录中以 Parquet 格式保存一个检查点文件(如上面的 00000000000000000010.checkpoint.parquet 文件)。每 10 次提交 Delta Lake 会自动生成检查点文件,这个是通过参数 checkpointInterval 参数设置。

Diving Into Delta Lake: Unpacking The Transaction Log
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

这些检查点文件在某个时间点保存表的整个状态 - 以原生的 Parquet 格式保存,Spark 可以快速轻松地读取。换句话说,它们为 Spark reader 提供了一种“快捷方式”来完全复制表的状态,从而允许 Spark 避免重新处理可能存在的数千个低效的小 JSON 文件。

为了提高速度,Spark可以运行一个 listFrom 操作来查看事务日志中的所有文件,快速跳转到最新的检查点文件,并且只处理自保存了最新的检查点文件以来提交的JSON。

为了演示这是如何工作的,假设我们已经创建了提交,并且事务日志已经记录到 000007.json。Spark 加快了提交的速度,并在内存中自动缓存了表的最新版本。与此同时,其他一些写入者(可能是您过于热心的队友)已经向表中写入了新数据,并事务日志已经记录到 0000012.json 了。

为了合并这些新事务并更新表的状态,Spark 将运行 listFrom 方法来查看版本7之后对表的新更改。

Diving Into Delta Lake: Unpacking The Transaction Log
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

Spark可以直接跳到最近的检查点文件(上图中的 0000010.checkpoint.parquet 文件),而不需要处理所有中间 JSON 文件,因为这个检查点文件包含 commit #10 中表的整个状态。现在,Spark 只需执行 0000011.json 和 0000012.json 的增量处理即可获得表的当前状态。然后 Spark 将表的版本12的状态缓存到内存中。通过遵循此工作流程,Delta Lake 能够使用 Spark 以高效的方式始终更新表的状态。

处理多个并发的读取和写入

现在我们已经在高层次上了解了事务日志如何工作的,让我们来谈谈并发性。到目前为止,我们的示例主要涵盖了用户线性提交事务或至少没有冲突的情况。 但是当 Delta Lake 处理多个并发读写时会发生什么?

答案很简单,由于 Delta Lake 由 Apache Spark 提供支持,因此不仅可以让多个用户同时修改表 - 这是预期的。 为了处理这些情况,Delta Lake 采用了乐观的并发控制。

什么是乐观并发控制?

乐观并发控制是一种处理并发事务的方法,它假定不同用户对表所做的事务(更改)可以在不相互冲突的情况下完成。它的速度快得令人难以置信,因为当处理 PB 级的数据时,用户很可能同时处理数据的不同部分,从而允许他们同时完成不冲突的事务。

例如,假设你和我正在一起玩拼图游戏。只要我们都在做拼图的不同部分——比如你在角落里,我在边缘上——我们没有理由不能同时做更大拼图的那一部分,并且以两倍的速度完成拼图。只有当我们同时需要相同的部件时,才会产生冲突。这就是乐观并发控制。

相反,一些数据库系统使用悲观锁定的概念,这是假设最坏的情况——即使我们有10,000块拼图,在某个时候我们肯定需要相同的拼图——这导致了太多的冲突。为了解决这个问题,它的理由是,应该只允许一个人同时做拼图,并把其他人都锁在房间外面。这不是一个快速(或友好)解决难题的方法!

当然,即使使用乐观并发控制,有时用户也会尝试同时修改数据的相同部分。幸运的是,Delta Lake 有相应的协议处理它。

乐观地解决冲突

为了提供ACID事务,Delta Lake 有一个协议,用于确定提交应该如何排序(在数据库中称为 serializability),并确定在同时执行两个或多个提交时应该做什么。Delta Lake通过实现互斥(mutual exclusion)规则来处理这些情况,然后尝试乐观地解决任何冲突。该协议允许Delta Lake遵循ACID隔离原则,该原则确保多个并发写操作之后的表的结果状态与那些连续发生的写操作相同,并且是彼此隔离的。

一般来说,这个过程是这样进行的

  • 记录起始表的版本;
  • 记录读和写操作;
  • 尝试提交;
  • 如果有人已经提交了,检查一下你读到的内容是否有变化;
  • 重复上面的步骤。

为了了解这一切是如何实时进行的,让我们看一下下面的图表,看看 Delta Lake 在冲突突然出现时是如何管理冲突的。假设两个用户从同一个表中读取数据,然后每个用户都尝试向表中添加一些数据。

Diving Into Delta Lake: Unpacking The Transaction Log
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop
  • Delta Lake 记录在进行任何更改之前读取的表的起始表版本(版本0);
  • 用户1和2都试图同时向表添加一些数据。在这里,我们遇到了一个冲突,因为接下来只有一个提交可以被记录为 000001.json;
  • Delta Lake使用“互斥”概念处理这种冲突,这意味着只有一个用户能够成功提交 000001.json。用户1的提交被接受,而用户2的提交被拒绝;
  • Delta Lake 更倾向于乐观地处理这种冲突,而不是为用户2抛出错误。 它检查是否对表进行了任何新的提交,并悄悄地更新表以反映这些更改,然后在新更新的表上重试用户2的提交(不进行任何数据处理),最后成功提交 000002.json。

在绝大多数情况下,这种和解是悄无声息地、天衣无缝地、成功地进行的。但是,如果 Delta Lake 无法乐观地解决不可调和的问题(例如,如果用户1删除了用户2也删除的文件),那么惟一的选择就是抛出一个错误。

最后要注意的是,由于在 Delta Lake 表上进行的所有事务都直接存储到磁盘中,因此这个过程满足 ACID 持久性的特性,这意味着即使在系统发生故障时,它也会保持。

其他用户案例

时间旅行(Time Travel)

每个表都是事务日志中记录的所有提交的总和的结果—不多也不少。事务日志提供了一步一步的指导,详细描述了如何从表的原始状态转换到当前状态。

因此,我们可以通过从原始表开始重新创建表在任何时间点的状态,并且只处理在该点之前提交的数据。这种强大的功能被称为“时间旅行”,或数据版本控制,在任何情况下都是救星。有关更多信息,请参考介绍大型数据湖的 Delta 时间旅行

数据血统(Data Lineage)和调试

作为对 Delta Lake 表所做的每个更改的最终记录,事务日志为用户提供了可验证的数据血统,这对于治理、审计和合规性目的非常有用。它还可以用于跟踪一个意外更改或管道中的一个 bug 的起源,以追溯到导致该更改的确切操作。用户可以运行 DESCRIBE HISTORY 来查看所做更改的元数据。

总结

在本博客中,我们深入研究 Delta Lake 事务日志的工作原理。我们讨论了:

  • 事务日志是什么,它是如何构造的,以及提交如何作为文件存储在磁盘上;
  • 事务日志如何作为一个单一的事实来源,允许 Delta Lake 实现原子性原则;
  • Delta Lake 如何计算每个表的状态——包括它如何使用事务日志来跟踪最近的检查点,以及它如何解决“小文件”问题;
  • 通过使用 Apache Spark 的强大功能来大规模处理元数据;
  • 使用乐观并发控制允许多个并发读和写,即使在表发生更改时也是如此;
  • Delta Lake 如何使用互斥来确保正确地线性(serialized)提交,以及在发生冲突时如何默默地重试提交。

本文翻译自:Diving Into Delta Lake: Unpacking The Transaction Log

本博客文章除特别声明,全部都是原创!
转载本文请加上:转载自过往记忆(https://www.iteblog.com/)
本文链接: 【深入理解 Apache Spark Delta Lake 的事务日志】(https://www.iteblog.com/archives/2584.html)
喜欢 (2)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!