Delta Lake 全部的更新操纵都是在事宜中举办的,deltaLog.withNewTransaction 就是一个事宜,withNewTransaction 的实现如下:
- def withNewTransaction[T](thunk: OptimisticTransaction => T): T = {
- try {
- // 更新当前表事宜日记的快照
- update()
- // 初始化乐观事宜锁工具
- val txn = new OptimisticTransaction(this)
- // 开启事宜
- OptimisticTransaction.setActive(txn)
- // 执行写数据操纵
- thunk(txn)
- } finally {
- // 封锁事宜
- OptimisticTransaction.clearActive()
- }
- }
在开启事宜之前,必要更新当前表事宜的快照,由于在执行写数据之前,这张表也许已经被修改了,执行 update 操纵之后,就可以拿到当前表的最新版本,紧接着开启乐观事宜锁。thunk(txn) 就是必要执行的事宜操纵,对应 deltaLog.withNewTransaction 内里的全部代码。
我们回到上面的 run 要领。val actions = write(txn, sparkSession) 就是执行写数据的操纵,它的实现如下:
- def write(txn: OptimisticTransaction, sparkSession: SparkSession): Seq[Action] = {
- import sparkSession.implicits._
- // 假如不是第一次往表内里写数据,必要判定写数据的模式是否切合前提
- if (txn.readVersion > -1) {
- // This table already exists, check if the insert is valid.
- if (mode == SaveMode.ErrorIfExists) {
- throw DeltaErrors.pathAlreadyExistsException(deltaLog.dataPath)
- } else if (mode == SaveMode.Ignore) {
- return Nil
- } else if (mode == SaveMode.Overwrite) {
- deltaLog.assertRemovable()
- }
- }
-
- // 更新表的模式,好比是否包围现有的模式,是否和现有的模式举办 merge
- updateMetadata(txn, data, partitionColumns, configuration, isOverwriteOperation)
-
- // 是否界说分区过滤前提
- val replaceWhere = options.replaceWhere
- val partitionFilters = if (replaceWhere.isDefined) {
- val predicates = parsePartitionPredicates(sparkSession, replaceWhere.get)
- if (mode == SaveMode.Overwrite) {
- verifyPartitionPredicates(
- sparkSession, txn.metadata.partitionColumns, predicates)
- }
- Some(predicates)
- } else {
- None
- }
-
- // 第一次写数据初始化事宜日记的目次
- if (txn.readVersion < 0) {
- // Initialize the log path
- deltaLog.fs.mkdirs(deltaLog.logPath)
- }
-
- // 写数据到文件体系中
- val newFiles = txn.writeFiles(data, Some(options))
-
- val deletedFiles = (mode, partitionFilters) match {
- // 全量包围,直接拿出缓存在内存中最新事宜日记快照内里的全部 AddFile 文件
- case (SaveMode.Overwrite, None) =>
- txn.filterFiles().map(_.remove)
- // 从事宜日记快照中获取对应分区内里的全部 AddFile 文件
- case (SaveMode.Overwrite, Some(predicates)) =>
- // Check to make sure the files we wrote out were actually valid.
- val matchingFiles = DeltaLog.filterFileList(
- txn.metadata.partitionColumns, newFiles.toDF(), predicates).as[AddFile].collect()
- val invalidFiles = newFiles.toSet -- matchingFiles
- if (invalidFiles.nonEmpty) {
- val badPartitions = invalidFiles
- .map(_.partitionValues)
- .map { _.map { case (k, v) => s"$k=$v" }.mkString("/") }
- .mkString(", ")
- throw DeltaErrors.replaceWhereMismatchException(replaceWhere.get, badPartitions)
- }
-
- txn.filterFiles(predicates).map(_.remove)
- case _ => Nil
- }
-
- newFiles ++ deletedFiles
- }
- }
假如 txn.readVersion == -1,声名是第一次写数据到 Delta Lake 表,以是当这个值大于 -1 的时辰,必要判定一下写数据的操纵是否正当。
因为 Delta Lake 底层行使的是 Parquet 名目,以是 Delta Lake 表也支持模式的增进归并等,这就是 updateMetadata 函数对应的操纵。
由于 Delta Lake 表支持分区,以是我们也许在写数据的时辰指定某个分区举办包围。 (编辑:湖南网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|