副问题[/!--empirenews.page--]
许多用Spark Streaming 的伴侣应该行使过broadcast,大大都环境下广播变量都是以单例模式声明的有没有粉丝想过为什么?浪尖在这里帮各人说明一下,有以下几个缘故起因:
- 广播变量大大都环境下是不会改观的,行使单例模式可以镌汰spark streaming每次job天生执行,一再活成广播变量带来的开销。
- 单例模式也要做同步。这个对付许多新手来说可以不消思量同步题目,缘故起因很简朴由于新手不会调解spark 措施task的调治模式,而默认回收FIFO的调治模式,根基不会发生并发题目。1).若是你设置了Fair调治模式,同时修改了Spark Streaming运行的并行执行的job数,默以为1,那么就要加上同步代码了。2).尚有一个缘故起因,在多输出流的环境下共享broadcast,同时设置了Fair调治模式,也会发生并发题目。
- 留意。有些时辰好比广播设置文件,法则等必要改观broadcast,在行使fair的时辰可以在foreachrdd内里行使局部变量作为广播,停止彼此滋扰。
先看例子,后头慢慢发表内部机制。
1.例子
下面是一个双重搜查式的broadcast变量的声明方法。
- object WordBlacklist {
-
- @volatile private var instance: Broadcast[Seq[String]] = null
-
- def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
- if (instance == null) {
- synchronized {
- if (instance == null) {
- val wordBlacklist = Seq("a", "b", "c")
- instance = sc.broadcast(wordBlacklist)
- }
- }
- }
- instance
- }
- }
广播变量的行使要领如下:
- val lines = ssc.socketTextStream(ip, port)
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
- wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
- // Get or register the blacklist Broadcast
- val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
- // Get or register the droppedWordsCounter Accumulator
- val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
- // Use blacklist to drop words and use droppedWordsCounter to count them
- val counts = rdd.filter { case (word, count) =>
- if (blacklist.value.contains(word)) {
- droppedWordsCounter.add(count)
- false
- } else {
- true
- }
- }.collect().mkString("[", ", ", "]")
- val output = s"Counts at time $time $counts"
- println(output)
- println(s"Dropped ${droppedWordsCounter.value} word(s) totally")
- println(s"Appending to ${outputFile.getAbsolutePath}")
- Files.append(output + "n", outputFile, Charset.defaultCharset())
- }
2.观念增补

(编辑:湖南网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|