Apache Hudi
什么是 Apache Hudi
Apache Hudi 是一个事务性数据湖平台,它将数据库和数据仓库的功能引入到数据湖中。Hudi 用一个强大的全新增量处理框架取代了缓慢的传统批处理数据处理方式,从而实现低延迟的分钟级分析。
Apache Hudi 特点
? 全面支持数据湖工作负载的可变性 借助 Hudi 快速且可插拔的索引功能,您可以轻松地更新和删除数据,即使是流式工作负载也能够轻松应对。Hudi 完全支持无序数据、突发流量和数据去重。
? 增量处理新数据,提升效率 Hudi 使用增量流式处理取代了传统的数据湖批处理管道,可以更快地摄取数据并缩短分析工作负载的处理时间。
? 为数据湖提供 ACID 事务保证 Hudi 为数据湖带来了事务保证,包括一致的原子写入以及专为长时间运行的湖事务设计的并发控制。
? 通过时间旅行功能解锁历史数据 通过时间旅行功能,您可以查询历史数据并回滚到之前的表版本,从而了解数据随时间推移的变化,并通过查看提交历史记录来审计数据更改。
? 支持可互操作的多云生态系统 Hudi 拥有广泛的生态系统支持,为流行的数据源和查询引擎提供即插即用的选项,方便您构建面向未来的架构,并与您选择的供应商互操作。
? 提供用于高性能分析的全面表服务 Hudi 提供完全自动化的表服务,持续安排和协调集群、压缩、清理、文件大小调整和索引等任务,确保表始终可用。
? 丰富的平台,助您快速构建湖仓 Hudi 内置了自动从 Debezium 和 Kafka 等服务摄取数据的工具,以及用于轻松发现的自动目录同步功能,使您能够轻松构建湖仓。
? 通过多模式索引加速查询 Hudi 首创的多模式索引子系统可以显著提升大型/宽表的写入事务速度和查询性能。
? 借助模式演进和强制执行机制,打造弹性管道 通过轻松更改 Hudi 表的当前模式,您可以适应随时间推移而变化的数据,并通过快速失败和避免数据损坏来确保管道弹性。
为什么选择 Apache Hudi
Hudi 平台拥有丰富的服务和工具,利用这些服务和工具可以让你的数据湖为各种应用程序所用,例如个性化推荐、机器学习、客户 360 等!
? 可信平台 久经沙场并在全球一些最大的数据湖中得到生产验证。
? 开源 Hudi 是一个蓬勃发展、不断壮大的社区,它由来自全球各地的人们共同构建。
? 派生表 在数据湖上无缝地创建和管理 SQL 表,以构建多阶段增量管道。
? 数据流 利用内置的 CDC 源和工具进行流式数据摄取。
经过一番寻找,找到了两个方案,
apache/hudi-rs
关于hudi-rs
一个为Apache Hudi 设计的原生Rust语言库hudi-rs,并且提供了与Python的绑定。感觉这个会成为DuckDB原生支持Hudi的主要组成库。
安装
pip install hudi
示例代码
访问Hudi的代码
from hudi import HudiTable
import pyarrow as pa
import pyarrow.compute as pc
hudi_table = HudiTable("/tmp/trips_table")
records = hudi_table.read_snapshot()
arrow_table = pa.Table.from_batches(records)
result = arrow_table.select(
["rider", "ts", "fare"]).filter(
pc.field("fare") > 20.0)
result.to_pandas()
DuckDB 访问arrow
import duckdb
arrow = df.to_arrow()
duckdb.sql("select * from arrow")
Eventual-Inc/Daft
Daft 是一个分布式数据查询引擎,专为 Python 设计,实现于 Rust 语言。它是为了在云环境中处理大规模数据而设计的。以下是 Daft 的一些关键特点和功能:
Daft 特点
? Daft 是一个分布式数据框架,用于多模态数据的分布式数据查询。
? 它提供了一个熟悉的交互式 API,使用懒加载的 Python 数据框架,以便快速进行交互式迭代。
? 它专注于查询优化器的强大功能,重写查询以尽可能高效。
? 提供三大数据湖(Apache Iceberg、Apache Hudi、Delta Lake)的支持。
? 它支持丰富的多模态类型系统,例如图像、URL、张量等。
? 它基于 Apache Arrow 内存格式构建,实现了无缝交换。
? 它为云环境构建,具有与 S3 云存储集成的记录设置 I/O 性能。
安装 Daft
pip install -U "getdaft[hudi]"
示例代码
访问Hudi的代码
# Read Apache Hudi table into a Daft DataFrame.
import daft
df = daft.read_hudi("../data/trips_table/")
df = df.select("rider", "ts", "fare").where(df["fare"] > 20.0)
df.show()
import duckdb
duckdb.sql("""
select rider,driver,fare
from arrow_table""")
许可
Daft 使用 Apache 2.0 许可证。
在AWS S3上读写Hudi
? minio 这里以minio 模拟AWS S3 环境
curl --progress-bar -O https://dl.min.io/server/minio/release/darwin-arm64/minio
chmod +x minio
export MINIO_ROOT_USER=admin
export MINIO_ROOT_PASSWORD=password
./minio server /tmp/minio --console-address ":9001"
Hudi 生成 使用PySpark
? 安装PySpark
pip install pyspark
? 创建
~/.aws/credentials
写入以下内容,
# ~/.aws/credentials
[default]
aws_access_key_id = admin
aws_secret_access_key = password
aws_endpoint_url = http://127.0.0.1:9000
import os
os.environ["AWS_ENDPOINT_URL"]="http://127.0.0.1:9000"
os.environ["AWS_ACCESS_KEY_ID"]="admin"
os.environ["AWS_SECRET_ACCESS_KEY"]="password"
# Apache Hudi-PySpark Configuration
from typing import *
from pyspark import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Initialize Spark session
spark = SparkSession.builder \
.appName("Hudi Table") \
.config("spark.driver.memory", "12g") \
.config("spark.executor.memory", "12g") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.jars.packages", "org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0,org.apache.hadoop:hadoop-aws:3.2.4,com.amazonaws:aws-java-sdk:1.12.262") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.profile.ProfileCredentialsProvider") \
.config("spark.hadoop.fs.s3a.endpoint",os.getenv('AWS_ENDPOINT_URL')) \
.config("spark.hadoop.fs.s3a.access.key", os.getenv('AWS_ACCESS_KEY_ID')) \
.config("spark.hadoop.fs.s3a.secret.key", os.getenv('AWS_SECRET_ACCESS_KEY')) \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.connection.ssl.enabled","false") \
.getOrCreate()
print("Spark Running")
tableName="trips_table"
s3_path = f"s3a://hudi/{trips_table}/"
# pyspark
columns = ["ts","uuid","rider","driver","fare","city"]
data =[(1695159649087,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
(1695091554788,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
(1695046462179,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
(1695516137016,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"),
(1695115999911,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai")]
inserts = spark.createDataFrame(data).toDF(*columns)
hudi_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.partitionpath.field': 'city'
}
inserts.write.format("hudi"). \
options(**hudi_options). \
mode("overwrite"). \
save(s3_path)
使用DuckDB 读取
? hudi-rs
import os
os.environ["AWS_ENDPOINT_URL"]="http://127.0.0.1:9000"
os.environ["AWS_ACCESS_KEY_ID"]="admin"
os.environ["AWS_SECRET_ACCESS_KEY"]="password"
os.environ["AWS_ALLOW_HTTP"]="true"
import duckdb
import pyarrow as pa
import pyarrow.compute as pc
from hudi import HudiTable
def read_hudi():
table_uri ='s3a://hudi/trips_table'
hudi_table = HudiTable(table_uri)
records = hudi_table.read_snapshot()
arrow_table = pa.Table.from_batches(records)
return arrow_table
arrow_table =read_hudi()
duckdb.sql("""
select rider,ts,fare
from arrow_table
where fare>20
""").df()
? Daft
def daft_duckdb():
s3config = daft.io.S3Config(access_key="password",
endpoint_url="http://127.0.0.1:9000",
key_id='admin')
df = daft.read_hudi('s3://hudi/trips_table',
io_config=daft.io.IOConfig(s3=s3config))
arrow = df.to_arrow()
return duckdb.sql("select * from arrow")
daft_duckdb().show()