小白终于进入了职场,从事大数据方面的事变!
分到项目组了,搬砖的时辰碰着了一个这样的题目。
要求:用spark实现oracle的存储进程上钩较部门。
坑:因为报表中包括了一个ID字段,其要求是差异的地区拥有差异的地区ID,且ID在数据库表中的属性为主键。Oracle的存储进程中回收的是自界说序列,回收发号的情势实现ID独一且切合地区特征。
填坑进程:
要领一:sql.functions 中monotonically_increasing_id
。
回收import org.apache.spark.sql.functions.中的
monotonically_increasing_id函数。 行使demo如下: //从数据库中加载表TEST_EMP进入内存,而且取ENAME和EMPNO两列
val dfEmp=sqlContext.read.options(conUtil.con("TEST_EMP")) .format("jdbc").load() .select("ENAME","EMPNO") val test =dfEmp .withColumn("TEST_NO",monotonically_increasing_id) //向oracle中写数据,这个函数的行使条件是必要确定表"EMP_TMP"存在。且向这张表写入数据的时辰最好字段举办对应,假如列多余数据库中的列数则会呈现参数过多的错误。
JdbcUtils.saveTable(test,url,"EMP_TMP",properties)
//代码功效如下所示,在数据库中天生了一个从0开始自增的列
|
ENAME
EMPNO
TEST_NO
SMITH
7369
0
ALLEN
7499
1
WARD
7521
2
JONES
7566
3
?
这个要领有一个弱点:序列是从0开始的,monotonically_increasing_id函数无法接管参数,以是我们无法用其按照我们的营业举办指定序列。
以是,有一个设法于是去看了一下该要领的源码,发下如下特点:
起首看到函数的界说def monotonically_increasing_id(): Column = withExpr { MonotonicallyIncreasingID() }
深入查察MonotonicallyIncreasingID() ,详细源码如下:
private[sql] case class MonotonicallyIncreasingID() extends LeafExpression with Nondeterministic {
/**
* Record ID within each partition. By being transient,count‘s value is reset to 0 every time
* we serialize and deserialize and initialize it.
*/
@transient private[this] var count: Long = _
@transient private[this] var partitionMask: Long = _
override protected def initInternal(): Unit = {
count = 0L
partitionMask = TaskContext.getPartitionId().toLong << 33
}
override def nullable: Boolean = false
override def dataType: DataType = LongType
override protected def evalInternal(input: InternalRow): Long = {
val currentCount = count
count += 1
partitionMask + currentCount
}
override def genCode(ctx: CodeGenContext,ev: GeneratedExpressionCode): String = {
val countTerm = ctx.freshName("count")
val partitionMaskTerm = ctx.freshName("partitionMask")
ctx.addMutableState(ctx.JAVA_LONG,countTerm,s"$countTerm = 0L;")
ctx.addMutableState(ctx.JAVA_LONG,partitionMaskTerm,
s"$partitionMaskTerm = ((long) org.apache.spark.TaskContext.getPartitionId()) << 33;")
ev.isNull = "false"
s"""
final ${ctx.javaType(dataType)} ${ev.value} = $partitionMaskTerm + $countTerm;
$countTerm++;
"""
}
}
我们可以发明这个类中重写了父类的initInternal()要领,指定了初始值count=0L,enmm这样子的话我们可不行以通过复写该类中的初始值来满意我们的营业需求
override protected def initInternal(): Unit = {
count = 0L
partitionMask = TaskContext.getPartitionId().toLong << 33
}
(别想太多,一个营业涉及那么多序列,总不能用一次改一次吧,虽然假如技能过硬,本身写一套要领以及类,用来吸取参数1:序列起始值,参数2:序列终止值。当前技能不足且加班 导致这个设法凉凉)
要领二:rdd算子中的zipWithIndex()要领
代码demo如下:
val dfEmp=sqlContext.read.options(conUtil.con("TEST_EMP"))
.format("jdbc").load()
.select("ENAME","EMPNO")
//对读取的dfEmp举办schema加列操纵,增进一列且指定列数据范例
val schma=dfEmp.schema.add(StructField("TEST_NO",LongType))
val temp=dfEmp.rdd.zipWithIndex()
//可以在row中指定我们本身营业需求的序列初始值
val changed= temp.map(t => Row.merge(t._1,Row(t._2+340000000)))
val in=sqlContext.createDataFrame(changed,schma)
JdbcUtils.saveTable(in,properties)
功效如下所示:
ENAME
EMPNO
TEST_NO
?SMITH
?7369
?300000000
ALLEN?
?7499
?300000001
?WARD
?7521
?300000002
?到此,入职的第一个坑填好了!貌似要领二还可以或许用zipWithUniqueId()要领举办实现,因为时刻不足就没有逐一的实行了,假如列位小搭档们有空可以实行一下! 同时,假如小搭档们有越发好的要领,求分享!求指导!感激!!!!! 接待留言!!!!
(编辑:湖南网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!