加入收藏 | 设为首页 | 会员中心 | 我要投稿 湖南网 (https://www.hunanwang.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Apache Spark Delta Lake写数据行使及实现道理代码理会

发布时间:2019-10-02 16:25:24 所属栏目:教程 来源:明惠
导读:Delta Lake 写数据是其最根基的成果,并且其行使和现有的 Spark 写 Parquet 文件根基同等,在先容 Delta Lake 实现道理之前先来看看怎样行使它,详细行使如下: df.write.format(delta).save(/data/yangping.wyp/delta/test/) //数据凭证dt分区 df.write.f

Delta Lake 全部的更新操纵都是在事宜中举办的,deltaLog.withNewTransaction 就是一个事宜,withNewTransaction 的实现如下:

  1. def withNewTransaction[T](thunk: OptimisticTransaction => T): T = { 
  2.   try { 
  3.     // 更新当前表事宜日记的快照 
  4.     update() 
  5.     // 初始化乐观事宜锁工具 
  6.     val txn = new OptimisticTransaction(this) 
  7.     // 开启事宜 
  8.     OptimisticTransaction.setActive(txn) 
  9.     // 执行写数据操纵 
  10.     thunk(txn) 
  11.   } finally { 
  12.     // 封锁事宜 
  13.     OptimisticTransaction.clearActive() 
  14.   } 

在开启事宜之前,必要更新当前表事宜的快照,由于在执行写数据之前,这张表也许已经被修改了,执行 update 操纵之后,就可以拿到当前表的最新版本,紧接着开启乐观事宜锁。thunk(txn) 就是必要执行的事宜操纵,对应 deltaLog.withNewTransaction 内里的全部代码。

我们回到上面的 run 要领。val actions = write(txn, sparkSession) 就是执行写数据的操纵,它的实现如下:

  1.   def write(txn: OptimisticTransaction, sparkSession: SparkSession): Seq[Action] = { 
  2.     import sparkSession.implicits._ 
  3.     // 假如不是第一次往表内里写数据,必要判定写数据的模式是否切合前提 
  4.     if (txn.readVersion > -1) { 
  5.       // This table already exists, check if the insert is valid. 
  6.       if (mode == SaveMode.ErrorIfExists) { 
  7.         throw DeltaErrors.pathAlreadyExistsException(deltaLog.dataPath) 
  8.       } else if (mode == SaveMode.Ignore) { 
  9.         return Nil 
  10.       } else if (mode == SaveMode.Overwrite) { 
  11.         deltaLog.assertRemovable() 
  12.       } 
  13.     } 
  14.   
  15.     // 更新表的模式,好比是否包围现有的模式,是否和现有的模式举办 merge 
  16.     updateMetadata(txn, data, partitionColumns, configuration, isOverwriteOperation) 
  17.   
  18.     // 是否界说分区过滤前提 
  19.     val replaceWhere = options.replaceWhere 
  20.     val partitionFilters = if (replaceWhere.isDefined) { 
  21.       val predicates = parsePartitionPredicates(sparkSession, replaceWhere.get) 
  22.       if (mode == SaveMode.Overwrite) { 
  23.         verifyPartitionPredicates( 
  24.           sparkSession, txn.metadata.partitionColumns, predicates) 
  25.       } 
  26.       Some(predicates) 
  27.     } else { 
  28.       None 
  29.     } 
  30.   
  31.     // 第一次写数据初始化事宜日记的目次 
  32.     if (txn.readVersion < 0) { 
  33.       // Initialize the log path 
  34.       deltaLog.fs.mkdirs(deltaLog.logPath) 
  35.     } 
  36.   
  37.     // 写数据到文件体系中 
  38.     val newFiles = txn.writeFiles(data, Some(options)) 
  39.       
  40.     val deletedFiles = (mode, partitionFilters) match { 
  41.        // 全量包围,直接拿出缓存在内存中最新事宜日记快照内里的全部 AddFile 文件 
  42.       case (SaveMode.Overwrite, None) => 
  43.         txn.filterFiles().map(_.remove) 
  44.       // 从事宜日记快照中获取对应分区内里的全部 AddFile 文件 
  45.       case (SaveMode.Overwrite, Some(predicates)) => 
  46.         // Check to make sure the files we wrote out were actually valid. 
  47.         val matchingFiles = DeltaLog.filterFileList( 
  48.           txn.metadata.partitionColumns, newFiles.toDF(), predicates).as[AddFile].collect() 
  49.         val invalidFiles = newFiles.toSet -- matchingFiles 
  50.         if (invalidFiles.nonEmpty) { 
  51.           val badPartitions = invalidFiles 
  52.             .map(_.partitionValues) 
  53.             .map { _.map { case (k, v) => s"$k=$v" }.mkString("/") } 
  54.             .mkString(", ") 
  55.           throw DeltaErrors.replaceWhereMismatchException(replaceWhere.get, badPartitions) 
  56.         } 
  57.   
  58.         txn.filterFiles(predicates).map(_.remove) 
  59.       case _ => Nil 
  60.     } 
  61.   
  62.     newFiles ++ deletedFiles 
  63.   } 

假如 txn.readVersion == -1,声名是第一次写数据到 Delta Lake 表,以是当这个值大于 -1 的时辰,必要判定一下写数据的操纵是否正当。

因为 Delta Lake 底层行使的是 Parquet 名目,以是 Delta Lake 表也支持模式的增进归并等,这就是 updateMetadata 函数对应的操纵。

由于 Delta Lake 表支持分区,以是我们也许在写数据的时辰指定某个分区举办包围。

(编辑:湖南网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读