天天减肥网,内容丰富有趣,生活中的好帮手!
天天减肥网 > 【Hudi】Apache Hudi:不一样的存储 不一样的计算

【Hudi】Apache Hudi:不一样的存储 不一样的计算

时间:2019-01-20 09:40:39

相关推荐

【Hudi】Apache Hudi:不一样的存储 不一样的计算

本篇是来自好友孟尧总的一篇文章。我发现,真正的纯粹是自发的,没有任何外部的干扰,因为我们都不是为了钱、为了生活做技术、写作。只有这样,我们才能真正的洒脱。通过文章能够看到孟总对待技术的态度、思维方式。

一切,是因为我们都有所期待。

一切,是因为我们都有理想。

一切,是因为我们敬畏技术。

一切,是因为我们不光做共同的技术,更有共同技术文化信仰。

虽然相距600公里,技术会让我们重新聚在一起。

期待。

目录

Hudi是什么

Hudi的应用场景

Hudi的核心概念

Hudi支持的存储类型

Hudi的SparkSQL使用

Hudi的WriteClient使用

如何使用索引

生产环境下的推荐配置

1

Hudi是什么

Apache Hudi(Hadoop Upserts Deletes and Incrementals,简称Hudi,发音为Hoodie)由UBer开源,它可以以极低的延迟将数据快速摄取到HDFS或云存储(S3)的工具,最主要的特点支持记录级别的插入更新(Upsert)和删除,同时还支持增量查询。

本质上,Hudi并不是一种新的文件格式,相反,它仅仅是充分利用了开源的列格式/行格式的文件作为数据的存储形式,并在数据写入的同时生成特定的索引,以便于在读取时提供更加快速的查询性能。

Hudi自身无法完成对数据的读写操作,它强依赖于外部的Spark、Flink、Presto和Impala等计算引擎才可以使用,目前尤其对Spark依赖严重(在0.7.0中新增了Flink支持)。

2

Hudi的应用场景

1

近实时摄取

将外部源(点击流日志、数据库BinLog、API)的数据摄取到Hadoop数据湖是一种必要的数据迁移过程,但现有的大多数迁移方案都是通过多种摄取工具来解决的。而Hudi就是一种通用的增量数据处理框架,目标是集成在各种现有的计算引擎中,从而缩短以往冗长的数据摄取链路(各种组件相互配合使用),更加稳定且有效的完成对多种数据源的摄取,如下:

2

近实时分析

SQL on Hadoop解决方案(如Presto和Spark SQL)表现出色,一般可以在几秒钟内完成查询。而Hudi可以提供一个面向实时分析更有效的替代方案,并支持对存储在HDFS中更大规模数据集的实时分析。此外,它是一个非常轻量级的库,没有额外的组件依赖(如专用于实时分析的HBase集群),使用它并不会增加操作开销。

3

增量处理管道

过去的增量处理往往划分成小时的分区为单位,落在此分区内的数据写入完成时,这使得数据的新鲜程度可以有效提高。如果有些数据迟到时,唯一的补救措施是通过重跑来保证正确性,但这又会导致增加整个系统的开销。Hudi支持Record级别的方式从上游消费新数据,从而可以仅处理发生变更的数据到相应的表分区,同时还可以将分区的粒度缩短到分钟级,从而不会导致额外的系统资源开销。

4

HDFS数据分发

一个常见的用例是先在Hadoop体系中进行处理数据,然后再分发回在线服务存储系统,以提供应用程序使用。在这种用例中一般都会引入诸如Kafka之类的队列系统,以防止目标存储系统被压垮。但如果不使用Kafka的情况下,仅将每次运行的Spark管道更新插入的输出转换为Hudi数据集,也可以有效地解决这个问题,然后以增量方式获取数据(就像读取Kafka topic一样)写入服务存储层。

3

Hudi的核心概念

1

时间轴(Timeline)

Hudi的核心是在所有的表中维护了一个包含在不同的即时(Instant)时间对数据集操作(比如新增、修改或删除)的时间轴(Timeline),在每一次对Hudi表的数据集操作时都会在该表的Timeline上生成一个Instant,从而可以实现在仅查询某个时间点之后成功提交的数据,或是仅查询某个时间点之前的数据,有效避免了扫描更大时间范围的数据。同时,还可以高效地只查询更改前的文件(例如在某个Instant提交了更改操作后,仅query某个时间点之前的数据,则仍可以query修改前的数据)。

时间轴(Timeline)的实现类(位于hudi-common-0.6.0.jar中):

注意:

由于hudi-spark-bundle.jar和hudi-hadoop-mr-bundle.jar属于Uber类型的jar包,已经将hudi-common-0.6.0.jar的所有class打包进去了。

时间轴相关的实现类位于org.mon.table.timeline包下。

最顶层的接口约定类为:

HoodieTimeline。

默认使用的时间轴类:

HoodieDefaultTimeline继承自HoodieTimeline。

活动时间轴类为:

HoodieActiveTimeline(此类维护最近12小时内的时间,可配置)。

存档时间轴类为:

HoodieArchivedTimeline(超出12小时的时间在此类中维护,可配置)。

时间轴(Timeline)的核心组件:

2

文件组织形式

Hudi将DFS上的数据集组织到基本路径(HoodieWriteConfig.BASEPATHPROP)下的目录结构中。数据集分为多个分区(DataSourceOptions.PARTITIONPATHFIELDOPT_KEY),这些分区与Hive表非常相似,是包含该分区的数据文件的文件夹。

在每个分区内,文件被组织为文件组,由文件id充当唯一标识。每个文件组包含多个文件切片,其中每个切片包含在某个即时时间的提交/压缩生成的基本列文件(.parquet)以及一组日志文件(.log),该文件包含自生成基本文件以来对基本文件的插入/更新。Hudi采用MVCC设计,其中压缩操作将日志和基本文件合并以产生新的文件切片,而清理操作则将未使用的/较旧的文件片删除以回收DFS上的空间。

3

索引机制(4类6种)

Hudi通过索引机制提供高效的Upsert操作,该机制会将一个RecordKey+PartitionPath组合的方式作为唯一标识映射到一个文件ID,而且,这个唯一标识和文件组/文件ID之间的映射自记录被写入文件组开始就不会再改变。Hudi内置了4类(6个)索引实现,均是继承自顶层的抽象类HoodieIndex而来,如下:

注意:

全局索引:

指在全表的所有分区范围下强制要求键保持唯一,即确保对给定的键有且只有一个对应的记录。

全局索引提供了更强的保证,也使得更删的消耗随着表的大小增加而增加(O(表的大小)),更适用于是小表。

非全局索引:仅在表的某一个分区内强制要求键保持唯一,它依靠写入器为同一个记录的更删提供一致的分区路径,但由此同时大幅提高了效率,因为索引查询复杂度成了O(更删的记录数量)且可以很好地应对写入量的扩展。

4

查询视图(3类)

读优化视图 : 直接查询基本文件(数据集的最新快照),其实就是列式文件(Parquet)。并保证与非Hudi列式数据集相比,具有相同的列式查询性能。

增量视图 : 仅查询新写入数据集的文件,需要指定一个Commit/Compaction的即时时间(位于Timeline上的某个Instant)作为条件,来查询此条件之后的新数据。

实时快照视图 : 查询某个增量提交操作中数据集的最新快照,会先进行动态合并最新的基本文件(Parquet)和增量文件(Avro)来提供近实时数据集(通常会存在几分钟的延迟)。

4

Hudi支持的存储类型

1

写时复制(Copy on Write,COW)表

COW表主要使用列式文件格式(Parquet)存储数据,在写入数据过程中,执行同步合并,更新数据版本并重写数据文件,类似RDBMS中的B-Tree更新。

更新:在更新记录时,Hudi会先找到包含更新数据的文件,然后再使用更新值(最新的数据)重写该文件,包含其他记录的文件保持不变。当突然有大量写操作时会导致重写大量文件,从而导致极大的I/O开销。

2)读取:在读取数据集时,通过读取最新的数据文件来获取最新的更新,此存储类型适用于少量写入和大量读取的场景。

2

读时合并(Merge On Read,MOR)表

MOR表是COW表的升级版,它使用列式(parquet)与行式(avro)文件混合的方式存储数据。在更新记录时,类似NoSQL中的LSM-Tree更新。

更新:在更新记录时,仅更新到增量文件(Avro)中,然后进行异步(或同步)的compaction,最后创建列式文件(parquet)的新版本。此存储类型适合频繁写的工作负载,因为新记录是以追加的模式写入增量文件中。

读取:在读取数据集时,需要先将增量文件与旧文件进行合并,然后生成列式文件成功后,再进行查询。

3

COW和MOR的对比

4

COW和MOR支持的视图

5

Hudi的SparkSQL使用

Hudi支持对文件系统(HDFS、LocalFS)和Hive的读写操作,以下分别使用COW和MOR存储类型来操作文件系统和Hive的案例。

1

文件系统操作

基于COW表的LocalFS/HDFS使用

package com.mengyao.hudiimport com.mengyao.Configuredimport org.apache.spark.sql.SaveMode._import org.apache.hudi.DataSourceReadOptions._import org.apache.hudi.DataSourceWriteOptions._import org.mon.model.EmptyHoodieRecordPayloadimport org.apache.hudi.config.HoodieIndexConfig._import org.apache.hudi.config.HoodieWriteConfig._import org.apache.hudi.index.HoodieIndeximport org.apache.spark.SparkConfimport org.apache.spark.sql.functions._import org.apache.spark.sql.{DataFrame, SparkSession}import scala.collection.JavaConverters._/*** Spark on Hudi(COW表) to HDFS/LocalFS* @ClassName Demo1* @Description* @Created by: MengYao* @Date: -01-11 10:10:53* @Version V1.0*/object Demo1 {private val APP_NAME = Demo1.getClass.getSimpleNameprivate val MASTER = "local[2]"val SOURCE = "hudi"val insertData = Array[TemperatureBean](new TemperatureBean("4301",1,28.6,"-12-07 12:35:33"),new TemperatureBean("4312",0,31.4,"-12-07 12:25:03"),new TemperatureBean("4302",1,30.1,"-12-07 12:32:17"),new TemperatureBean("4305",3,31.5,"-12-07 12:33:11"),new TemperatureBean("4310",2,29.9,"-12-07 12:34:42"))val updateData = Array[TemperatureBean](new TemperatureBean("4310",2,30.4,"-12-07 12:35:42")// 设备ID为4310的传感器发生修改,温度值由29.9->30.4,时间由12:34:42->12:35:42)val deleteData = Array[TemperatureBean](new TemperatureBean("4310",2,30.4,"-12-07 12:35:42")// 设备ID为4310的传感器要被删除,必须与最新的数据保持一致(如果字段值不同时无法删除))def main(args: Array[String]): Unit = {System.setProperty(Configured.HADOOP_HOME_DIR, Configured.getHadoopHome)// 创建SparkConfval conf = new SparkConf().set("spark.master", MASTER).set("spark.app.name", APP_NAME).setAll(Configured.sparkConf().asScala)// 创建SparkSessionval spark = SparkSession.builder().config(conf).getOrCreate()// 关闭日志spark.sparkContext.setLogLevel("OFF")// 导入隐式转换import spark.implicits._import DemoUtils._// 类似Hive中的DB(basePath的schema决定了最终要操作的文件系统,如果是file:则为LocalFS,如果是hdfs:则为HDFS)val basePath = "file:/D:/tmp"// 类似Hive中的Tableval tableName = "tbl_temperature_cow"// 数据所在的路径val path = s"$basePath/$tableName"// 插入数据insert(spark.createDataFrame(insertData.toBuffer.asJava, classOf[TemperatureBean]), tableName, "deviceId", "deviceType", "cdt", path)// 修改数据// update(spark.createDataFrame(updateData.toBuffer.asJava, classOf[TemperatureBean]), tableName, "deviceId", "deviceType", "cdt", path)// 删除数据// delete(spark.createDataFrame(deleteData.toBuffer.asJava, classOf[TemperatureBean]), tableName, "deviceId", "deviceType", "cdt", path)// 【查询方式1:默认为快照(基于行或列获取最新视图)查询// query(spark.read.format(SOURCE).load(s"$path/*/*").orderBy($"deviceId".asc))// query(spark.read.format(SOURCE).options(buildQuery(QUERY_TYPE_SNAPSHOT_OPT_VAL)).load(s"$path/*/*").withColumn("queryType", lit("查询方式为:快照(默认)")).orderBy($"deviceId".asc))// 【查询方式2:读时优化// query(spark.read.format(SOURCE).options(buildQuery(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)).load(s"$path/*/*").withColumn("queryType", lit("查询方式为:读时优化")).orderBy($"deviceId".asc))// 【查询方式3:增量查询// 先取出最近一次提交的时间val commitTime = spark.read.format(SOURCE).load(s"$path/*/*").dropDuplicates("_hoodie_commit_time").select($"_hoodie_commit_time".as("commitTime")).orderBy($"commitTime".desc).first().getAs(0)// 再查询最近提交时间之后的数据query(spark.read.format(SOURCE).options(buildQuery(QUERY_TYPE_INCREMENTAL_OPT_VAL,Option((commitTime.toLong-2).toString))).load(s"$path/*/*").withColumn("queryType", lit("查询方式为:增量查询")).orderBy($"deviceId".asc).toDF())spark.close()}/*** 新增数据* @param df 数据集* @param tableName Hudi表* @param primaryKey主键列名* @param partitionField分区列名* @param changeDateField变更时间列名* @param path数据的存储路径*/def insert(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = {df.write.format(SOURCE).options(Map(// 要操作的表TABLE_NAME->tableName,// 操作的表类型(默认COW)TABLE_TYPE_OPT_KEY -> COW_TABLE_TYPE_OPT_VAL,// 执行insert操作OPERATION_OPT_KEY->INSERT_OPERATION_OPT_VAL,// 设置主键列RECORDKEY_FIELD_OPT_KEY->primaryKey,// 设置分区列,类似Hive的表分区概念PARTITIONPATH_FIELD_OPT_KEY->partitionField,// 设置数据更新时间列,该字段数值大的数据会覆盖小的,必须是数值类型PRECOMBINE_FIELD_OPT_KEY->changeDateField,// 要使用的索引类型,可用的选项是[SIMPLE|BLOOM|HBASE|INMEMORY],默认为布隆过滤器INDEX_TYPE_PROP-> HoodieIndex.IndexType.BLOOM.name,// 执行insert操作的shuffle并行度INSERT_PARALLELISM->"2"))// 如果数据存在会覆盖.mode(Overwrite).save(path)}/*** 修改数据* @param df 数据集* @param tableName Hudi表* @param primaryKey主键列名* @param partitionField分区列名* @param changeDateField变更时间列名* @param path数据的存储路径*/def update(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = {df.write.format(SOURCE).options(Map(// 要操作的表TABLE_NAME->tableName,// 操作的表类型(默认COW)TABLE_TYPE_OPT_KEY -> COW_TABLE_TYPE_OPT_VAL,// 执行upsert操作OPERATION_OPT_KEY->UPSERT_OPERATION_OPT_VAL,// 设置主键列RECORDKEY_FIELD_OPT_KEY->primaryKey,// 设置分区列,类似Hive的表分区概念PARTITIONPATH_FIELD_OPT_KEY->partitionField,// 设置数据更新时间列,该字段数值大的数据会覆盖小的PRECOMBINE_FIELD_OPT_KEY->changeDateField,// 要使用的索引类型,可用的选项是[SIMPLE|BLOOM|HBASE|INMEMORY],默认为布隆过滤器INDEX_TYPE_PROP-> HoodieIndex.IndexType.BLOOM.name,// 执行upsert操作的shuffle并行度UPSERT_PARALLELISM-> "2"))// 如果数据存在会覆盖.mode(Append).save(path)}/*** 删除数据* @param df 数据集* @param tableName Hudi表* @param primaryKey主键列名* @param partitionField分区列名* @param changeDateField变更时间列名* @param path数据的存储路径*/def delete(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = {df.write.format(SOURCE).options(Map(// 要操作的表TABLE_NAME->tableName,// 操作的表类型(默认COW)TABLE_TYPE_OPT_KEY -> COW_TABLE_TYPE_OPT_VAL,// 执行delete操作OPERATION_OPT_KEY->DELETE_OPERATION_OPT_VAL,// 设置主键列RECORDKEY_FIELD_OPT_KEY->primaryKey,// 设置分区列,类似Hive的表分区概念PARTITIONPATH_FIELD_OPT_KEY->partitionField,// 设置数据更新时间列,该字段数值大的数据会覆盖小的PRECOMBINE_FIELD_OPT_KEY->changeDateField,// 要使用的索引类型,可用的选项是[SIMPLE|BLOOM|HBASE|INMEMORY],默认为布隆过滤器INDEX_TYPE_PROP-> HoodieIndex.IndexType.BLOOM.name,// 执行delete操作的shuffle并行度DELETE_PARALLELISM->"2",// 删除策略有软删除(保留主键且其余字段为null)和硬删除(从数据集中彻底删除)两种,此处为硬删除PAYLOAD_CLASS_OPT_KEY->classOf[EmptyHoodieRecordPayload].getName))// 如果数据存在会覆盖.mode(Append).save(path)}/*** 查询类型* <br>Hoodie具有3种查询模式:</br>*<br>1、默认是快照模式(Snapshot mode,根据行和列数据获取最新视图)</br>*<br>2、增量模式(incremental mode,查询从某个commit时间片之后的数据)</br>*<br>3、读时优化模式(Read Optimized mode,根据列数据获取最新视图)</br>* @param queryType* @param queryTime* @return*/def buildQuery(queryType: String, queryTime: Option[String]=Option.empty) = Map(queryType match {// 如果是读时优化模式(read_optimized,根据列数据获取最新视图)case QUERY_TYPE_READ_OPTIMIZED_OPT_VAL => QUERY_TYPE_OPT_KEY->QUERY_TYPE_READ_OPTIMIZED_OPT_VAL// 如果是增量模式(incremental mode,查询从某个时间片之后的新数据)case QUERY_TYPE_INCREMENTAL_OPT_VAL => QUERY_TYPE_OPT_KEY->QUERY_TYPE_INCREMENTAL_OPT_VAL// 默认使用快照模式查询(snapshot mode,根据行和列数据获取最新视图)case _ => QUERY_TYPE_OPT_KEY->QUERY_TYPE_SNAPSHOT_OPT_VAL},if(queryTime.nonEmpty) BEGIN_INSTANTTIME_OPT_KEY->queryTime.get else BEGIN_INSTANTTIME_OPT_KEY->"0")}```基于MOR表的LocalFS/HDFS使用```scalapackage com.mengyao.hudiimport com.mengyao.Configuredimport org.apache.hudi.DataSourceReadOptions._import org.apache.hudi.DataSourceWriteOptions._import org.mon.model.EmptyHoodieRecordPayloadimport org.apache.hudi.config.HoodieIndexConfig.INDEX_TYPE_PROPimport org.apache.hudi.config.HoodieWriteConfig._import org.apache.hudi.index.HoodieIndeximport org.apache.spark.sql.{DataFrame, SparkSession}import org.apache.spark.sql.SaveMode._import org.apache.spark.SparkConfimport org.apache.spark.sql.functions.litimport scala.collection.JavaConverters._/*** Spark on Hudi(MOR表) to HDFS/LocalFS* @ClassName Demo1* @Description* @Created by: MengYao* @Date: -01-11 10:10:53* @Version V1.0*/object Demo2 {private val APP_NAME: String = Demo1.getClass.getSimpleNameprivate val MASTER: String = "local[2]"val SOURCE: String = "hudi"val insertData = Array[TemperatureBean](new TemperatureBean("4301",1,28.6,"-12-07 12:35:33"),new TemperatureBean("4312",0,31.4,"-12-07 12:25:03"),new TemperatureBean("4302",1,30.1,"-12-07 12:32:17"),new TemperatureBean("4305",3,31.5,"-12-07 12:33:11"),new TemperatureBean("4310",2,29.9,"-12-07 12:34:42"))val updateData = Array[TemperatureBean](new TemperatureBean("4310",2,30.4,"-12-07 12:35:42")// 设备ID为4310的传感器发生修改,温度值由29.9->30.4,时间由12:34:42->12:35:42)val deleteData = Array[TemperatureBean](new TemperatureBean("4310",2,30.4,"-12-07 12:35:42")// 设备ID为4310的传感器要被删除,必须与最新的数据保持一致(如果字段值不同时无法删除))def main(args: Array[String]): Unit = {System.setProperty(Configured.HADOOP_HOME_DIR, Configured.getHadoopHome)// 创建SparkConfval conf = new SparkConf().set("spark.master", MASTER).set("spark.app.name", APP_NAME).setAll(Configured.sparkConf().asScala)// 创建SparkSessionval spark = SparkSession.builder().config(conf).getOrCreate()// 关闭日志spark.sparkContext.setLogLevel("OFF")// 导入隐式转换import spark.implicits._import DemoUtils._// 类似Hive中的DB(basePath的schema决定了最终要操作的文件系统,如果是file:则为LocalFS,如果是hdfs:则为HDFS)val basePath = "file:/D:/tmp"// 类似Hive中的Tableval tableName = "tbl_temperature_mor"// 数据所在的路径val path = s"$basePath/$tableName"// 插入数据insert(spark.createDataFrame(insertData.toBuffer.asJava, classOf[TemperatureBean]), tableName, "deviceId", "deviceType", "cdt", path)// 修改数据// update(spark.createDataFrame(updateData.toBuffer.asJava, classOf[TemperatureBean]), tableName, "deviceId", "deviceType", "cdt", path)// 删除数据// delete(spark.createDataFrame(deleteData.toBuffer.asJava, classOf[TemperatureBean]), tableName, "deviceId", "deviceType", "cdt", path)// 【查询方式1:默认为快照(基于行或列获取最新视图)查询// query(spark.read.format(SOURCE).load(s"$path/*/*").orderBy($"deviceId".asc))// query(spark.read.format(SOURCE).options(buildQuery(QUERY_TYPE_SNAPSHOT_OPT_VAL)).load(s"$path/*/*").withColumn("queryType", lit("查询方式为:快照(默认)")).orderBy($"deviceId".asc))// 【查询方式2:读时优化// query(spark.read.format(SOURCE).options(buildQuery(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)).load(s"$path/*/*").withColumn("queryType", lit("查询方式为:读时优化")).orderBy($"deviceId".asc))// 【查询方式3:增量查询// 先取出最近一次提交的时间val commitTime:String = spark.read.format(SOURCE).load(s"$path/*/*").dropDuplicates("_hoodie_commit_time").select($"_hoodie_commit_time".as("commitTime")).orderBy($"commitTime".desc).first().getAs(0)// 再查询最近提交时间之后的数据query(spark.read.format(SOURCE).options(buildQuery(QUERY_TYPE_INCREMENTAL_OPT_VAL,Option((commitTime.toLong-2).toString))).load(s"$path/*/*").withColumn("queryType", lit("查询方式为:增量查询")).orderBy($"deviceId".asc).toDF())spark.close()}/*** 新增数据* @param df 数据集* @param tableName Hudi表* @param primaryKey主键列名* @param partitionField分区列名* @param changeDateField变更时间列名* @param path数据的存储路径*/def insert(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = {df.write.format(SOURCE).options(Map(// 要操作的表TABLE_NAME->tableName,// 操作的表类型(使用MOR)TABLE_TYPE_OPT_KEY -> MOR_TABLE_TYPE_OPT_VAL,// 执行insert操作OPERATION_OPT_KEY->INSERT_OPERATION_OPT_VAL,// 设置主键列RECORDKEY_FIELD_OPT_KEY->primaryKey,// 设置分区列,类似Hive的表分区概念PARTITIONPATH_FIELD_OPT_KEY->partitionField,// 设置数据更新时间列,该字段数值大的数据会覆盖小的,必须是数值类型PRECOMBINE_FIELD_OPT_KEY->changeDateField,// 要使用的索引类型,可用的选项是[SIMPLE|BLOOM|HBASE|INMEMORY],默认为布隆过滤器INDEX_TYPE_PROP-> HoodieIndex.IndexType.BLOOM.name,// 执行insert操作的shuffle并行度INSERT_PARALLELISM->"2"))// 如果数据存在会覆盖.mode(Overwrite).save(path)}/*** 修改数据* @param df 数据集* @param tableName Hudi表* @param primaryKey主键列名* @param partitionField分区列名* @param changeDateField变更时间列名* @param path数据的存储路径*/def update(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = {df.write.format(SOURCE).options(Map(// 要操作的表TABLE_NAME->tableName,// 操作的表类型(使用MOR)TABLE_TYPE_OPT_KEY -> MOR_TABLE_TYPE_OPT_VAL,// 执行upsert操作OPERATION_OPT_KEY->UPSERT_OPERATION_OPT_VAL,// 设置主键列RECORDKEY_FIELD_OPT_KEY->primaryKey,// 设置分区列,类似Hive的表分区概念PARTITIONPATH_FIELD_OPT_KEY->partitionField,// 设置数据更新时间列,该字段数值大的数据会覆盖小的PRECOMBINE_FIELD_OPT_KEY->changeDateField,// 要使用的索引类型,可用的选项是[SIMPLE|BLOOM|HBASE|INMEMORY],默认为布隆过滤器INDEX_TYPE_PROP-> HoodieIndex.IndexType.BLOOM.name,// 执行upsert操作的shuffle并行度UPSERT_PARALLELISM-> "2"))// 如果数据存在会覆盖.mode(Append).save(path)}/*** 删除数据* @param df 数据集* @param tableName Hudi表* @param primaryKey主键列名* @param partitionField分区列名* @param changeDateField变更时间列名* @param path数据的存储路径*/def delete(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = {df.write.format(SOURCE).options(Map(// 要操作的表TABLE_NAME->tableName,// 操作的表类型(使用MOR)TABLE_TYPE_OPT_KEY -> MOR_TABLE_TYPE_OPT_VAL,// 执行delete操作OPERATION_OPT_KEY->DELETE_OPERATION_OPT_VAL,// 设置主键列RECORDKEY_FIELD_OPT_KEY->primaryKey,// 设置分区列,类似Hive的表分区概念PARTITIONPATH_FIELD_OPT_KEY->partitionField,// 设置数据更新时间列,该字段数值大的数据会覆盖小的PRECOMBINE_FIELD_OPT_KEY->changeDateField,// 要使用的索引类型,可用的选项是[SIMPLE|BLOOM|HBASE|INMEMORY],默认为布隆过滤器INDEX_TYPE_PROP-> HoodieIndex.IndexType.BLOOM.name,// 执行delete操作的shuffle并行度DELETE_PARALLELISM->"2",// 删除策略有软删除(保留主键且其余字段为null)和硬删除(从数据集中彻底删除)两种,此处为硬删除PAYLOAD_CLASS_OPT_KEY->classOf[EmptyHoodieRecordPayload].getName))// 如果数据存在会覆盖.mode(Append).save(path)}/*** 查询类型* <br>Hoodie具有3种查询模式:</br>*<br>1、默认是快照模式(Snapshot mode,根据行和列数据获取最新视图)</br>*<br>2、增量模式(incremental mode,查询从某个commit时间片之后的数据)</br>*<br>3、读时优化模式(Read Optimized mode,根据列数据获取最新视图)</br>* @param queryType* @param queryTime* @return*/def buildQuery(queryType: String, queryTime: Option[String]=Option.empty): Map[String, String] = {Map(queryType match {// 如果是读时优化模式(read_optimized,根据列数据获取最新视图)case QUERY_TYPE_READ_OPTIMIZED_OPT_VAL => QUERY_TYPE_OPT_KEY->QUERY_TYPE_READ_OPTIMIZED_OPT_VAL// 如果是增量模式(incremental mode,查询从某个时间片之后的新数据)case QUERY_TYPE_INCREMENTAL_OPT_VAL => QUERY_TYPE_OPT_KEY->QUERY_TYPE_INCREMENTAL_OPT_VAL// 默认使用快照模式查询(snapshot mode,根据行和列数据获取最新视图)case _ => QUERY_TYPE_OPT_KEY->QUERY_TYPE_SNAPSHOT_OPT_VAL},if(queryTime.nonEmpty) BEGIN_INSTANTTIME_OPT_KEY->queryTime.get else BEGIN_INSTANTTIME_OPT_KEY->"0")}}

6

Hudi的WriteClient使用

package com.mengyao.hudi;import com.mengyao.Configured;import org.apache.hudi.client.HoodieWriteClient;import org.apache.hudi.client.WriteStatus;import org.mon.model.HoodieRecord;import org.mon.model.OverwriteWithLatestAvroPayload;import org.apache.hudi.config.HoodieIndexConfig;import org.apache.hudi.config.HoodieWriteConfig;import org.apache.hudi.index.HoodieIndex;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import scala.collection.JavaConverters;import java.text.SimpleDateFormat;import java.util.Arrays;import java.util.Date;import java.util.HashMap;/*** WriteClient模式是直接使用RDD级别api进行Hudi编程*Application需要使用HoodieWriteConfig对象,并将其传递给HoodieWriteClient构造函数。HoodieWriteConfig可以使用以下构建器模式构建。* @ClassName WriteClientMain* @Description* @Created by: MengYao* @Date: -01-26 10:40:29* @Version V1.0*/public class WriteClientMain {private static final String APP_NAME = WriteClientMain.class.getSimpleName();private static final String MASTER = "local[2]";private static final String SOURCE = "hudi";public static void main(String[] args) {// 创建SparkConfSparkConf conf = new SparkConf().setAll(JavaConverters.mapAsScalaMapConverter(Configured.sparkConf()).asScala());// 创建SparkContextJavaSparkContext jsc = new JavaSparkContext(MASTER, APP_NAME, conf);// 创建Hudi的WriteConfigHoodieWriteConfig hudiCfg = HoodieWriteConfig.newBuilder().forTable("tableName").withSchema("avroSchema").withPath("basePath").withProps(new HashMap()).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();// 创建Hudi的WriteClientHoodieWriteClient hudiWriteCli = new HoodieWriteClient<OverwriteWithLatestAvroPayload>(jsc, hudiCfg);// 1、执行新增操作JavaRDD<HoodieRecord> insertData = jsc.parallelize(Arrays.asList());String insertInstantTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());JavaRDD<WriteStatus> insertStatus = hudiWriteCli.insert(insertData, insertInstantTime);// 【注意:为了便于理解,以下所有判断Hudi操作数据的状态不进行额外的方法封装】if (insertStatus.filter(ws->ws.hasErrors()).count()>0) {// 当提交后返回的状态中包含error时hudiWriteCli.rollback(insertInstantTime);// 从时间线(insertInstantTime)中回滚,插入失败} else {mit(insertInstantTime, insertStatus);// 否则提交时间线(insertInstantTime)中的数据,到此,插入完成}// 2、也可以使用批量加载的方式新增数据String builkInsertInstantTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());JavaRDD<WriteStatus> bulkInsertStatus = hudiWriteCli.bulkInsert(insertData, builkInsertInstantTime);if (bulkInsertStatus.filter(ws->ws.hasErrors()).count()>0) {// 当提交后返回的状态中包含error时hudiWriteCli.rollback(builkInsertInstantTime);// 从时间线(builkInsertInstantTime)中回滚,批量插入失败} else {mit(builkInsertInstantTime, bulkInsertStatus);// 否则提交时间线(builkInsertInstantTime)中的数据,到此,批量插入完成}// 3、执行修改or新增操作JavaRDD<HoodieRecord> updateData = jsc.parallelize(Arrays.asList());String updateInstantTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());JavaRDD<WriteStatus> updateStatus = hudiWriteCli.upsert(updateData, updateInstantTime);if (updateStatus.filter(ws->ws.hasErrors()).count()>0) {// 当提交后返回的状态中包含error时hudiWriteCli.rollback(updateInstantTime);// 从时间线(updateInstantTime)中回滚,修改失败} else {mit(updateInstantTime, updateStatus);// 否则提交时间线(updateInstantTime)中的数据,到此,修改完成}// 4、执行删除操作JavaRDD<HoodieRecord> deleteData = jsc.parallelize(Arrays.asList());String deleteInstantTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());JavaRDD<WriteStatus> deleteStatus = hudiWriteCli.delete(deleteData, deleteInstantTime);if (deleteStatus.filter(ws->ws.hasErrors()).count()>0) {// 当提交后返回的状态中包含error时hudiWriteCli.rollback(deleteInstantTime);// 从时间线(deleteInstantTime)中回滚,删除失败} else {mit(deleteInstantTime, deleteStatus);// 否则提交时间线(deleteInstantTime)中的数据,到此,删除完成}// 退出WriteClienthudiWriteCli.close();// 退出SparkContextjsc.stop();}}

7

如何使用索引

1

使用SparkSQL的数据源配置

在SparkSQL的数据源配置中,面向读写的通用配置参数均通过options或option来指定,可用的功能包括:定义键和分区、选择写操作、指定如何合并记录或选择要读取的视图类型。

df.write.format("hudi").options(Map(// 要操作的表TABLE_NAME->tableName,// 操作的表类型(默认COW)TABLE_TYPE_OPT_KEY -> COW_TABLE_TYPE_OPT_VAL,/*** 执行insert/upsert/delete操作,默认是upsert* OPERATION_OPT_KEY->INSERT_OPERATION_OPT_VAL,* BULK_INSERT_OPERATION_OPT_VAL,* UPSERT_OPERATION_OPT_VAL,* DELETE_OPERATION_OPT_VAL,*/OPERATION_OPT_KEY->INSERT_OPERATION_OPT_VAL,// 设置主键列RECORDKEY_FIELD_OPT_KEY->primaryKey,// 设置分区列,类似Hive的表分区概念PARTITIONPATH_FIELD_OPT_KEY->partitionField,// 设置数据更新时间列,该字段数值大的数据会覆盖小的,必须是数值类型PRECOMBINE_FIELD_OPT_KEY->changeDateField,/*** 要使用的索引类型,可用的选项是[SIMPLE|BLOOM|HBASE|INMEMORY],默认为布隆过滤器* INDEX_TYPE_PROP -> HoodieIndex.IndexType.SIMPLE.name,*HoodieIndex.IndexType.GLOBAL_SIMPLE.name,*HoodieIndex.IndexType.INMEMORY.name,*HoodieIndex.IndexType.HBASE.name,*HoodieIndex.IndexType.BLOOM.name,*HoodieIndex.IndexType.GLOBAL_SIMPLE.name,*/ INDEX_TYPE_PROP -> HoodieIndex.IndexType.BLOOM.name,// 执行insert操作的shuffle并行度INSERT_PARALLELISM->"2"))// 如果数据存在会覆盖.mode(Overwrite).save(path)

2

使用WriteClient方式配置

WriteClient是使用基于Java的RDD级别API进行编程的的一种方式,需要先构建HoodieWriteConfig对象,然后再作为参数传递给HoodieWriteClient构造函数。

// 创建Hudi的WriteConfigHoodieWriteConfig hudiCfg = HoodieWriteConfig.newBuilder().forTable("tableName").withSchema("avroSchema").withPath("basePath").withProps(new HashMap()).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM// HoodieIndex.IndexType.GLOBAL_BLOOM// HoodieIndex.IndexType.INMEMORY// HoodieIndex.IndexType.HBASE// HoodieIndex.IndexType.SIMPLE// HoodieIndex.IndexType.GLOBAL_SIMPLE).build()).build();

3

声明索引的关键参数

在Hudi中,使用索引的关键参数主要有2个,即hoodie.index.type和hoodie.index.class两个。这两个参数只需要配置其中一个即可,原因如下:

4

索引参数在源码中的实现

可以在Hudi索引超类HoodieIndex的源码中看到createIndex方法的定义和实现:

public abstract class HoodieIndex<T extends HoodieRecordPayload> implements Serializable {protected final HoodieWriteConfig config;protected HoodieIndex(HoodieWriteConfig config) {this.config = config;}public static <T extends HoodieRecordPayload> HoodieIndex<T> createIndex(HoodieWriteConfig config) throws HoodieIndexException {if (!StringUtils.isNullOrEmpty(config.getIndexClass())) {Object instance = ReflectionUtils.loadClass(config.getIndexClass(), new Object[]{config});if (!(instance instanceof HoodieIndex)) {throw new HoodieIndexException(config.getIndexClass() + " is not a subclass of HoodieIndex");} else {return (HoodieIndex)instance;}} else {switch(config.getIndexType()) {case HBASE:return new HBaseIndex(config);case INMEMORY:return new InMemoryHashIndex(config);case BLOOM:return new HoodieBloomIndex(config);case GLOBAL_BLOOM:return new HoodieGlobalBloomIndex(config);case SIMPLE:return new HoodieSimpleIndex(config);case GLOBAL_SIMPLE:return new HoodieGlobalSimpleIndex(config);default:throw new HoodieIndexException("Index type unspecified, set " + config.getIndexType());}}}@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)public abstract JavaPairRDD<HoodieKey, Option<Pair<String, String>>> fetchRecordLocation(JavaRDD<HoodieKey> var1, JavaSparkContext var2, HoodieTable<T> var3);@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)public abstract JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> var1, JavaSparkContext var2, HoodieTable<T> var3) throws HoodieIndexException;@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)public abstract JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> var1, JavaSparkContext var2, HoodieTable<T> var3) throws HoodieIndexException;@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)public abstract boolean rollbackCommit(String var1);@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)public abstract boolean isGlobal();@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)public abstract boolean canIndexLogFiles();@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)public abstract boolean isImplicitWithStorage();public void close() {}public static enum IndexType {HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM,SIMPLE,GLOBAL_SIMPLE;private IndexType() {}}}

8

生产环境下的推荐配置

spark.driver.extraClassPath=/etc/hive/confspark.driver.extraJavaOptions=-XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprofspark.driver.maxResultSize=2gspark.driver.memory=4gspark.executor.cores=1spark.executor.extraJavaOptions=-XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprofspark.executor.id=driverspark.executor.instances=300spark.executor.memory=press=truespark.kryoserializer.buffer.max=512mspark.serializer=org.apache.spark.serializer.KryoSerializerspark.shuffle.service.enabled=truespark.sql.hive.convertMetastoreParquet=falsespark.submit.deployMode=clusterspark.task.cpus=1spark.task.maxFailures=4spark.yarn.driver.memoryOverhead=1024spark.yarn.executor.memoryOverhead=3072spark.yarn.max.executor.failures=100```

如果觉得《【Hudi】Apache Hudi:不一样的存储 不一样的计算》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。