加入收藏 | 设为首页 | 会员中心 | 我要投稿 湖南网 (https://www.hunanwang.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Apache Spark Delta Lake写数据行使及实现道理代码理会

发布时间:2019-10-02 16:25:24 所属栏目:教程 来源:明惠
导读:Delta Lake 写数据是其最根基的成果,并且其行使和现有的 Spark 写 Parquet 文件根基同等,在先容 Delta Lake 实现道理之前先来看看怎样行使它,详细行使如下: df.write.format(delta).save(/data/yangping.wyp/delta/test/) //数据凭证dt分区 df.write.f
副问题[/!--empirenews.page--]

Apache Spark Delta Lake 写数据行使及实现道理代码理会

Delta Lake 写数据是其最根基的成果,并且其行使和现有的 Spark 写 Parquet 文件根基同等,在先容 Delta Lake 实现道理之前先来看看怎样行使它,详细行使如下:

  1. df.write.format("delta").save("/data/yangping.wyp/delta/test/") 
  2.   
  3. //数据凭证 dt 分区 
  4. df.write.format("delta").partitionBy("dt").save("/data/yangping.wyp/delta/test/") 
  5.   
  6. // 包围之前的数据 
  7. df.write.format("delta").mode(SaveMode.Overwrite).save("/data/yangping.wyp/delta/test/") 

各人可以看出,行使写 Delta 数据长短常简朴的,这也是 Delte Lake 先容的 100% 兼容 Spark。

Delta Lake 写数据道理

前面简朴相识了怎样行使 Delta Lake 来写数据,本小结我们将深入先容 Delta Lake 是怎样担保写数据的根基道理以及怎样担保事宜性。

得益于 Apache Spark 强盛的数据源 API,我们可以很利便的给 Spark 添加任何数据源,Delta Lake 也不破例。Delta Lake 就是行使 DataSource V1 版本的 API 实现的一种新的数据源,我们挪用 df.write.format("delta") 着实底层挪用的是 org.apache.spark.sql.delta.sources.DeltaDataSource 类。为了简朴起见,本文先容的是 Delta Lake 批量写的实现,及时流写 Delta Lake 本文不涉及,后头有机遇再先容。 Delta Lake 批量写扩展了 org.apache.spark.sql.sources.CreatableRelationProvider 特质,并实现了个中的要领。我们挪用上面的写数据要领起首会挪用 DeltaDataSource 类的 createRelation 要领,它的详细实现如下:

  1. override def createRelation( 
  2.     sqlContext: SQLContext, 
  3.     mode: SaveMode, 
  4.     parameters: Map[String, String], 
  5.     data: DataFrame): BaseRelation = { 
  6.   
  7.   // 写数据的路径 
  8.   val path = parameters.getOrElse("path", { 
  9.     throw DeltaErrors.pathNotSpecifiedException 
  10.   }) 
  11.   
  12.   // 分区字段 
  13.   val partitionColumns = parameters.get(DeltaSourceUtils.PARTITIONING_COLUMNS_KEY) 
  14.     .map(DeltaDataSource.decodePartitioningColumns) 
  15.     .getOrElse(Nil) 
  16.   
  17.   
  18.   // 事宜日记工具 
  19.   val deltaLog = DeltaLog.forTable(sqlContext.sparkSession, path) 
  20.   
  21.   // 真正的写操纵进程 
  22.   WriteIntoDelta( 
  23.     deltaLog = deltaLog, 
  24.     mode = mode, 
  25.     new DeltaOptions(parameters, sqlContext.sparkSession.sessionState.conf), 
  26.     partitionColumns = partitionColumns, 
  27.     configuration = Map.empty, 
  28.     data = data).run(sqlContext.sparkSession) 
  29.   
  30.   deltaLog.createRelation() 

个中 mode 就是保持数据的模式,支持 Append、Overwrite、ErrorIfExists 以及 Ignore 等。parameters 这个转达的参数,好比分区字段、数据生涯路径以及 Delta 支持的一些参数(replaceWhere、mergeSchema、overwriteSchema 等,详细拜见 org.apache.spark.sql.delta.DeltaOptions);data 就是我们必要生涯的数据。

createRelation 要领紧接着就是获取数据生涯的路径,分区字段等信息。然后初始化 deltaLog,deltaLog 的初始化会做许多工作,好比会读取磁盘全部的事宜日记(_delta_log 目次下),并构建最新事宜日记的最新快照,内里可以拿到最新数据的版本。因为 deltaLog 的初始化本钱较量高,以是 deltaLog 初始化完之后会缓存到 deltaLogCache 中,这是一个行使 Guava 的 CacheBuilder 类实现的一个缓存,缓存的数据保持一小时,缓存巨细可以通过 delta.log.cacheSize 参数举办配置。只要写数据的路径是一样的,就只必要初始化一次 deltaLog,后头直接从缓存中拿即可。除非之前缓存的 deltaLog 被整理了,可能无效才会再次初始化。DeltaLog 类是 Delta Lake 中最重要的类之一,涉及的内容很是多,以是我们会单独行使一篇文章举办先容。

紧接着初始化 WriteIntoDelta,WriteIntoDelta 扩展自 RunnableCommand,Delta Lake 中的更新、删除、归并都是扩展这个类的。初始化完 WriteIntoDelta 之后,就会挪用 run 要领执行真正的写数据操纵。WriteIntoDelta 的 run 要领实现如下:

  1. override def run(sparkSession: SparkSession): Seq[Row] = { 
  2.     deltaLog.withNewTransaction { txn => 
  3.       val actions = write(txn, sparkSession) 
  4.       val operation = DeltaOperations.Write(mode, Option(partitionColumns), options.replaceWhere) 
  5.       txn.commit(actions, operation) 
  6.     } 
  7.     Seq.empty 

(编辑:湖南网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读