【概述】
这是hudi系列的第一篇文章,先从核心概念,存储的文件格式加深对概念的理解,后续再逐步对使用(spark/flink入hudi,hudi同步hive等)、原理(压缩机制,索引,聚族等)展开分享~
【什么是数据湖】
简单来说,数据湖技术是计算引擎和底层存储格式之间的一种数据组织格式,用来定义数据、元数据的组织方式,并实现以下的功能:
支持事务(ACID)
支持流批一体
支持schema演化和schema结束
支持多种底层数据存储HDFS、OSS、S3
从实现上来说,基于分布式文件系统之上,以传统关系型数据库的方式对外提供使用。
开源的数据湖实现有Hudi、IceBerg、Delta。
【hudi介绍】
Apache hudi代表Hadoop Upserts Deletes Incrementals。能够使HDFS数据集在分钟级的延时内支持变更,也支持下游系统对这个数据集的增量处理。
hudi数据集通过自定义的InputFormat兼容当前hadoop生态系统,包括Hive、Presto、Trino、Spark、Flink,使得终端用户可以无缝的对接。
Hudi会维护一个时间轴(这个是hudi的核心),在每次执行操作时(如写入、删除、压缩等),均会带有一个时间戳。通过时间轴,可以实现在仅查询某个时间点之后成功提交的数据,或是仅查询某个时间点之前的数据。这样可以避免扫描更大的时间范围,并非常高效地只消费更改过的文件。
上面是一些理论上的介绍,简单的使用,官网也有对应的例子,这里就不再啰嗦,下面我们介绍下hudi的一些核心概念,hudi的持久化文件及文件格式。
【相关概念】
1. 表类型
hudi中表有两种类型
MOR(Merge on Read)
在读取时进行合并处理的表。通常而言,写入时其数据以日志形式写入到行式存储到文件中,然后通过压缩将行式存储文件转为列式存储文件。读取时,则可能需要将存储在日志文件中的数据和存储在列式文件中的数据进行合并处理,得到用户期望查询的结果。
COW(Copy on Write)
在写入的时候进行拷贝合并处理的表。每次写入时,就完成数据的合并处理,并以列式存储格式存储,即没有增量的日志文件。
两者的一些对比
权衡 |
COW |
MOR |
数据延迟 |
更高 |
更低 |
更新代价(I/O) |
更高 |
更低 |
parquet文件大小 |
更小 |
更大 |
写放大 |
更高 |
更低(取决于压缩策略) |
2. 时间轴
hudi维护了在不同时间点中(instant time)在表上的所有(instant)操作的时间轴,这有助于提供表的即时视图,同时还能有效的提供顺序检索数据。
instant由以下组件组成:
instant action:对数据集(表)的操作类型(动作)。
instant time:通常是一个时间戳,它按照操作开始时间的顺序单调递增。
state:当前的状态
关键的操作类型包括:
commit
原子的将一批数据写入数据集(表)中
cleans
清除数据集(表)中不再需要的老版本文件
delta_commit
增量提交,表示将一批记录原子的写入MOR类型的表中,其中一些/所有数据可能仅被写入增量日志文件中
compaction
通常而言,是将基于行式的日志文件移动更新到列式文件中。
rollback
表明提交或增量提交不成功后的回滚,此时会删除写过程中产生的任意分区文件。
savepoint
将某些文件组标识为"已保存",这样在清理时不会进行删除。在灾备或数据恢复的场景中,有助于恢复到时间轴上的某个点。
任意给定的instant只能处于下面的其中一个状态:
REQUESTED:指明一个动作已经被调度,但还未进行执行
INFLIGHT:指明一个动作正在被执行
COMPLETED:指明时间轴上的一个已完成的动作
状态由requested->inflight->complete进行转换。
3. 视图
hudi支持三种类型的视图:
读优化视图(Read Optimized Queries)
该视图仅将最新文件切片中的基本/列文件暴露给查询,并保证与非hudi列式数据集相比,具有相同的列式查询性能。简单而言,对于MOR表来说,仅读取提交或压缩后的列式存储文件,而不读取增量提交的日志文件。
增量视图(Incremental Queries)
对该视图的查询只能看到从某个提交/压缩后写入数据集的新数据。该视图有效地提供了更改流,来支持增量数据。
实时视图(Snapshot Queries)
在此视图上的查询将某个增量提交操作中数据集的最新快照。该视图通过动态合并最新的基本文件来提供近实时的数据集。
视图类型和表的关系为:
COW | MOR |
|
实时视图 |
Y |
Y |
增量视图 |
Y |
Y |
读优化视图 |
N |
Y |
【持久化文件】
如果上面的概念还有些抽象,那么来看看写入hudi的数据是如何在hdfs上存储的,再来理解前面提到的概念。
根据官网的示例,写入表中的数据,其在hdfs上存储的文件,大概是这样的:
[root@localhost ~] hdfs dfs -ls -R /user/hncscwc/hudiedemo
drwxr-xr-x - root supergroup 0 2021-11-30 14:39 /user/hncscwc/hudidemo/.hoodie
drwxr-xr-x - root supergroup 0 2021-11-30 14:39 /user/hncscwc/hudidemo/.hoodie/.aux
drwxr-xr-x - root supergroup 0 2021-11-30 14:39 /user/hncscwc/hudidemo/.hoodie/.aux/.bootstrap
drwxr-xr-x - root supergroup 0 2021-11-30 14:39 /user/hncscwc/hudidemo/.hoodie/.aux/.bootstrap/.fileids
drwxr-xr-x - root supergroup 0 2021-11-30 14:39 /user/hncscwc/hudidemo/.hoodie/.aux/.bootstrap/.partitions
drwxr-xr-x - root supergroup 0 2021-11-30 14:39 /user/hncscwc/hudidemo/.hoodie/.temp
-rw-r--r-- 3 root supergroup 2017 2021-11-30 14:39 /user/hncscwc/hudidemo/.hoodie/20211130143947.deltacommit
-rw-r--r-- 3 root supergroup 0 2021-11-30 14:39 /user/hncscwc/hudidemo/.hoodie/20211130143947.deltacommit.inflight
-rw-r--r-- 3 root supergroup 0 2021-11-30 14:39 /user/hncscwc/hudidemo/.hoodie/20211130143947.deltacommit.requested
drwxr-xr-x - root supergroup 0 2021-11-30 14:39 /user/hncscwc/hudidemo/.hoodie/archived
-rw-r--r-- 3 root supergroup 388 2021-11-30 14:39 /user/hncscwc/hudidemo/.hoodie/hoodie.properties
drwxr-xr-x - root supergroup 0 2021-11-30 14:39 /user/hncscwc/hudidemo/par1
-rw-r--r-- 3 root supergroup 960 2021-11-30 14:39 /user/hncscwc/hudidemo/par1/.f9037b56-d84c-4b9a-87db-7cae41ab2505_20211130143947.log.1_2-4-0
-rw-r--r-- 3 root supergroup 93 2021-11-30 14:39 /user/hncscwc/hudidemo/par1/.hoodie_partition_metadata
从hdfs的存储文件中可以看出几点:
表的数据都存储在指定配置目录中(这里为/user/hncscwc)
数据大概分为多个目录存储,其中.hoodie目录下存储元数据相关的信息,本质上也就是时间轴对应的相关数据,以分区命名(这里为par1)的目录中则存放数据表在该分区中的具体数据。
先来看看.hoodie目录下元数据相关的持久化文件:这里包括:
yyyyMMddHHmmss.deltacommit
记录MOR表一次事务的执行结果,包括该事务对哪些分区的哪些数据(日志)文件进行了操作,对(日志)文件操作的类型(插入或更新),写入的长度,表的元数据信息等内容。文件内容以json格式存储。
文件中几个比较重要的字段有:
partitionToWrtieStats
以分区为key,记录每个分区的实际操作信息,包括本次事务写入的分区的ID、路径、写入/删除/更新的记录数、实际写入的字节长度等。
compacted
标记本次提交操作是否是压缩操作触发进行的
extraMetadata
最重要的是schema字段,记录了表的schema信息。
另外需要注意:文件名中yyyyMMddHHmmss为本次事务提交的时间戳,其后缀为deltacommit,并且对应文件内容非空,即表示该事务已经完成,相关的文件还有yyyyMMddHHmmss.deltacommit.inflight 和 yyyyMMddHHmmss.deltacommit.requested。恰好对应前面概念中提到的instant对应的三种状态。也就是说,通过将内容写入到不同后缀的文件中,来表示某个操作的当前状态。
一个简单示例为:
{
"partitionToWriteStats" : {
"par1" : [ {
"fileId" : "f9037b56-d84c-4b9a-87db-7cae41ab2505",
"path" : "par1/.f9037b56-d84c-4b9a-87db-7cae41ab2505_20211130143947.log.1_2-4-0",
"prevCommit" : "20211130143947",
"numWrites" : 1,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 1,
"totalWriteBytes" : 960,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : "par1",
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 960,
"minEventTime" : null,
"maxEventTime" : null,
"logVersion" : 1,
"logOffset" : 0,
"baseFile" : "",
"logFiles" : [ ".f9037b56-d84c-4b9a-87db-7cae41ab2505_20211130143947.log.1_2-4-0" ]
} ]
},
"compacted" : false,
"extraMetadata" : {
"schema" : "{"type":"record","name":"record","fields":[{"name":"uuid","type":["null","string"],"default":null},{"name":"name","type":["null","string"],"default":null},{"name":"age","type":["null","int"],"default":null},{"name":"ts","type":["null",{"type":"long","logicalType":"timestamp-millis"}],"default":null},{"name":"partition","type":["null","string"],"default":null}]}"
},
"operationType" : null,
"totalCreateTime" : 0,
"totalUpsertTime" : 6446,
"totalRecordsDeleted" : 0,
"totalLogRecordsCompacted" : 0,
"fileIdAndRelativePaths" : {
"f9037b56-d84c-4b9a-87db-7cae41ab2505" : "par1/.f9037b56-d84c-4b9a-87db-7cae41ab2505_20211130143947.log.1_2-4-0"
},
"writePartitionPaths" : [ "par1" ],
"totalScanTime" : 0,
"totalCompactedRecordsUpdated" : 0,
"totalLogFilesCompacted" : 0,
"totalLogFilesSize" : 0,
"minAndMaxEventTime" : {
"Optional.empty" : {
"val" : null,
"present" : false
}
}
}
yyyyMMddHHmmscnblogs.commit
与deltacommit类似,不过通常是COW表一次事务的执行结果,或者是压缩的执行结果。但文件内容和deltacommit基本相同,文件内容同样采用json格式存储。
hoodie.properties
该文件记录表的相关属性,例如:
#Properties saved on Tue Nov 30 14:39:29 CST 2021
#Tue Nov 30 14:39:29 CST 2021
hoodie.compaction.payload.class=org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
hoodie.table.precombine.field=ts
hoodie.table.name=t1
hoodie.archivelog.folder=archived
hoodie.table.type=MERGE_ON_READ
hoodie.table.version=2
hoodie.table.partition.fields=partition
hoodie.timeline.layout.version=1
hoodie.compaction.payload.class:数据插入/更新时对payload的处理类
hoodie.table.precombine.field:写入之前进行预合并处理的字段(数据)
hoodie.table.name:表的名称
hoodie.archivelog.folder:表的归档路径
hoodie.table.type:表的类型(MOR或COW)
hoodie.table.version:表的版本号(默认为2)
hoodie.table.partition.fields:表的分区字段,每个分区按照分区字段的值作为对应的目录名称,其数据就存储分区的目录中
hoodie.timeline.layout.version:时间轴布局的版本(默认为1)
以上几个文件应该是最常见的,除此之外,你可能还会看到如下文件:
yyyyMMddHHmmscnblogs.compaction.requested/inflight
压缩操作的具体内容,包括压缩操作的时间戳,以及对哪些分区下的哪些文件进行压缩合并。
压缩操作的文件内容是按一个标准avro格式存储的,可以通过avro-tool工具将文件内容转换为json来查看。例如:
{
"operations":{
"array":[
{
"baseInstantTime":{"string":"20220106084115"},
"deltaFilePaths":{
"array":[".97ae031c-189b-4ade-9044-781e840c7e01_20220106084115.log.1_2-4-0"]},
"dataFilePath":null,
"fileId":{"string":"97ae031c-189b-4ade-9044-781e840c7e01"},
"partitionPath":{"string":"par1"},
"metrics":{
"map":{
"TOTAL_LOG_FILES":1.0,
"TOTAL_IO_READ_MB":0.0,
"TOTAL_LOG_FILES_SIZE":3000.0,
"TOTAL_IO_WRITE_MB":