导读 本文介绍了 Apache Hudi 从零到一:理解写入流程和操作(三)。本文翻译自原英文博客
https://blog.datumagic.com/p/apache-hudi-from-zero-to-one-310。
主要内容包括以下几个部分:
1. 整体写入流程
2. 写入操作
3. 回顾
分享嘉宾|许世彦 Onehouse 开源项目负责人
编辑整理|刘金辉
出品社区|DataFun
在上一篇文章中,我们讨论了 Hudi 查询类型及其与 Spark 的集成。在这篇文章中,我们将深入研究另一个方面——写入流,以 Spark 作为示例引擎。在写入数据时可以调整许多配置和设置。因此,这篇文章的目的不是作为一个完整的使用指南。相反,我的主要目标是展示内部数据流并分解所涉及的步骤。这将使读者更深入地了解运行和微调 Hudi 应用程序。各种实际使用示例请查阅 Hudi 的官方文档页面。
01
整体写入流程
下图说明了在执行引擎的上下文中 Hudi 写入操作中涉及的典型高级步骤。我将对本节中的每个步骤进行简要介绍。
1. 创建写入客户端
Hudi 写入客户端作为写入操作的入口点,其写入能力是通过创建与引擎兼容的写入客户端实例来实现的。例如,Spark 使用 SparkRDDWriteClient ,Flink 使用 HoodieFlinkWriteClient,Kafka Connect 生成 HoodieJavaWriteClient。通常,此步骤涉及将用户提供的配置与现有 Hudi 表属性进行适配,然后将最终配置集传递给客户端。
2. 转换输入
在写入客户端处理输入数据之前,会进行多次转换,包括 HoodieRecord 的构造和结构适配。让我们更深入地研究 HoodieRecord,因为它是写入路径中的基础模型。
Hudi 使用由 “recordKey” 和 “partitionPath” 组成的 HoodieKey 模型标识唯一记录。这些值通过实现 KeyGenerator API 进行填充。此 API 提供了灵活的输入架构,能够实现将自定义字段提取和转换为指定的键。有关使用示例,请参阅文档页面。
“currentLocation” 和 “newLocation” 都由一个 Hudi Timeline 的操作时间戳和 FileGroup 的 ID 组成。回想一下博客一中的逻辑 FileGroup 和 FileSlice 概念,时间戳指向特定 FileGroup 中的 FileSlice。“location”属性用于使用逻辑信息查找物理文件。如果“currentLocation”不为 null,则表示表中具有相同键的记录所在位置,而“newLocation”则指定传入记录的写入位置。
“data” 字段是一种通用类型,包含记录的实际字节内容,也被称为有效载荷。通常,此属性实现 了 HoodieRecordPayload,它指导引擎如何将旧记录与新记录合并。从 0.13.0 版本开始,引入了一个新的实验接口 HoodieRecordMerger,旨在替换 HoodieRecordPayload 并作为统一的合并 API。
3. 开始提交
在此步骤中,写入客户端始终检查表的时间线上是否剩余任何失败的操作,并在启动写入操作之前通过在时间线上创建“已请求”的提交操作来相应地执行回滚。
4. 准备记录
提供的 HoodieRecord 可以选择性地根据用户配置和操作类型进行去重和索引。如果需要去重,则具有相同键的记录将会被合并。如果需要索引,则如果记录存在,将填充“currentLocation”。使用各种索引类型进行索引逻辑的主题至关重要,需要专门撰写一篇文章。为了了解写入流,只需要记住一个关键点:索引负责查找给定记录的物理文件。
5. 分区记录
这是一个必不可少的预写入步骤,用于确定哪些记录进入哪个文件组,并最终进入哪个物理文件。传入的记录将被分配到更新桶和插入桶,这意味着后续文件写入的策略不同。每个桶代表一个用于分布式处理的 RDD 分区,就像 Spark 一样。
6. 写入存储
这是实际 I/O 操作发生的时机。物理数据文件将会使用写入句柄被创建或追加。在此之前,标记文件也可能会在.hoodie/.temp/ 目录被创建,表示对相应数据文件执行的写入操作类型。这在高效的回滚和冲突解决场景中很有价值。
7. 更新索引
将数据写入磁盘后,可能需要立即更新索引数据,以确保读写正确性。这尤其适用于在写入过程中未同步更新的索引类型,例如HBase服务中使用的 HBase 索引。
8. 提交更改
在最后一步中,写入客户端将执行多个任务以确保正确完成了事务写入。例如,执行预提交验证,检查是否与并发写入操作发生冲突、将提交元数据保存到时间线、与标记文件进行 WriteStatus 协调等。
02
写入操作
更新插入数据是湖仓管道中的常见方案。在本节中,我们将详细探讨 CoW 表的 Upsert 流,然后简要概述所有其他支持的写入操作。
1. Upsert 更新插入
(1)写入客户端启动提交,并在 Timeline 上创建“请求”操作。
(2)输入记录经过准备步骤:合并重复项,并由索引填充目标文件位置。在此过程中,我们拥有要写入的确切记录,并知道表中存在哪些记录,以及它们各自的位置 (FileGroups)。
(3)准备好的记录被分到“更新”和“插入”桶。最初,构造 WorkloadProfile 来收集有关相关物理分区中更新和插入次数的信息。然后,此数据被序列化为时间轴上“飞行中”的动作。随后,根据 WorkloadProfile 生成存储桶来保存记录。对于更新,每个更新文件组都分配为一个更新存储桶。在插入的情况下,小文件处理逻辑开始发挥作用:任何小于指定阈值(由
hoodie.parquet.small.file.limit 确定)的 BaseFile 都将成为插入的候选文件,其封闭的 FileGroup 被指定为更新桶。如果不存在此类 BaseFile,则将分配插入桶,并将为它们创建新的文件组。
(4)然后,存储桶中的记录通过文件写入句柄处理,以便实现持久化存储。对于更新桶中的记录,将使用“合并”句柄,从而在现有文件组中创建新的 FileSlices(通过与旧 FileSlice 中的数据合并来实现)。对于插入桶中的记录,将使用“创建”句柄,从而创建全新的文件组。这个过程是由 HoodieExecutor 完成的,它采用生产者-消费者模式来读取和写入记录。
(5)一旦所有数据被写入,文件写入处理返回 WriteStatus 的集合,其中包含有关写入的元数据,包括错误数、执行的插入数、总写入大小(以字节为单位)等。此信息将发送回 Spark 驱动程序进行聚合。如果未发生任何错误,则写入客户端将生成提交元数据,并将其作为已完成的操作保留在时间线上。
更新插入到 MoR 表遵循非常相似的流程,使用一组不同的条件来确定用于更新和插入的文件写入句柄的类型。
2. Insert & Bulk Insert 插入与批量插入
Insert 流程与 Upsert 非常相似,主要区别在于没有索引步骤。这意味着整个写入过程更快(如果关闭去重,则速度会更快),但可能会导致表中出现重复项。
批量插入遵循与普通插入相同的语义,这意味着由于缺少索引,它也可能导致重复。但是,区别在于没有对批量插入进行小文件处理。记录分区策略由 BulkInsertSortMode 设置确定,也可以通过实现自定义 BulkInsertPartitioner。默认情况下,大容量插入还为 Spark 启用行写入模式,在“转换输入”步骤中绕过 Avro 数据模型转换,并直接使用引擎原生的数据模型行。该模式可提供更高效的写入。
总体而言,批量插入通常比插入性能更高,但可能需要额外的配置调整来解决小文件问题。
3. Delete 删除
Delete 流程可以看作是 Upsert 流程的一个特例。主要区别在于,在“转换输入”步骤中,输入记录将转换为 HoodieKeys 并传递到后续阶段,因为这些是识别要删除的记录所需的最少数据。需要注意的是,此过程会导致硬删除,这意味着目标记录将不存在于相应 FileGroup 的新 FileSlices 中。
4. Delete Partition 删除分区
与上面介绍的流程相比,删除分区遵循完全不同的流程。它不是输入记录,而是采用物理分区路径列表,该列表通过
hoodie.datasource.write.partitions.to.delete 进行配置。由于没有输入记录,因此索引、分区和写入存储等过程不适用。“删除分区”将目标分区路径的所有文件组ID 保存在时间线上的 .replacecommit操作中,确保后续写入器和读取器将其视为已删除。
5. Insert Overwrite & Insert Overwrite Table 插入覆盖与插入覆盖表
插入覆盖完全重写使用者提供的记录重写分区。此流程可以有效地看作是“删除分区”和“批量插入”的组合:它从输入记录中提取受影响的分区路径,将这些分区中的所有现有文件组标记为已删除,并创建新的文件组来存储传入的记录。
“插入覆盖表”是“插入覆盖”的变体。它不是从输入记录中提取受影响的分区路径,而是获取表的所有分区路径以进行覆盖。
03
回顾
在这篇文章中,我们探讨了 Hudi 写入路径中常见的主要步骤,深入研究了 CoW Upsert 流程,并详细介绍了记录分区逻辑,并介绍了所有其他写入操作。
以上就是本次分享的内容,谢谢大家。