import org.apache.spark.broadcast._
import scala.collection.mutable.ArrayBufferimport java.io._
object SimpleApp {    var broadcast1: Broadcast[ArrayBuffer[String]] = _
    val ip_grp_start = ArrayBuffer[String]()    def matchword(s: (String, Int)): List[(String ,Int)] = {        val fw = new FileWriter("/home/hadoop/a.txt", true)
        val out = new PrintWriter(fw)        out.println("11111111111111111"+broadcast1.value.length)
        out.close()
        return List(s)
    }    def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("Simple Application")
        val sc = new SparkContext(conf)
        val line = sc.textFile("wordcount.txt")        ip_grp_start += "fsda"
        ip_grp_start += "dsfsf"        broadcast1 = line.sparkContext.broadcast(ip_grp_start)        val words = line.flatMap(line => line.split(" "))
        val wordpair = words.map(word => (word,1))        val word = wordpair.flatMap(x => matchword(x))
        val pair = word.reduceByKey(_+_)        pair.collect().foreach(println)
        sc.stop()
    }
}

解决方案 »

  1.   

    17/02/19 15:07:46 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.1.2:56023 (size: 2.8 KB, free: 366.3 MB)
    17/02/19 15:07:47 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.2:56023 (size: 20.4 KB, free: 366.3 MB)
    17/02/19 15:07:48 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 192.168.1.2, executor 0): java.lang.NullPointerException
    at SimpleApp$.matchword(SimpleApp.scala:20)
    at SimpleApp$$anonfun$4.apply(SimpleApp.scala:38)
    at SimpleApp$$anonfun$4.apply(SimpleApp.scala:38)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)上面是错误报的空指针的错误。其实就是传到闭包函数里,这个广播变量的值为空。
      

  2.   

    这是我写的测试广播变量函数的例子,不知道为何传进去会为空,如果我在运行时候去掉--master参数,就会变成单机运行,运行就不会报任何错误。求大神解释。。
      

  3.   

    The reason is that your broadcast var is defined in the class/object level. When the class is initialized in the worker node, it will ony see a NULL, instead of the value you assigned in main method.Change the broadcast scope to the main method. You can pass the value to your method.See the detail example here:https://github.com/databricks/learning-spark/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/ChapterSixExample.scala#L75
        val signPrefixes = sc.broadcast(loadCallSignTable())
        val countryContactCounts = contactCounts.map{case (sign, count) =>
          val country = lookupInArray(sign, signPrefixes.value)
          (country, count)
        }.reduceByKey((x, y) => x + y)
    See how the signPrefixes broadcast val is defined in the main method, and how its value passed to lookupInArray method.
      

  4.   

    我也碰到这个问题,还没有找到解决方法。
    我估计是broadcast变量被发送到Executor上的时间导致的,
    也就是说 还没有执行 broadcast1 = line.sparkContext.broadcast(ip_grp_start) 这句代码。broadcast1已经被发送到Executor上了。我暂时的解决办法是,不定义 var broadcast1。而是  val  broadcast1 = line.sparkContext.broadcast(ip_grp_start),
    然后将 broadcast1作为 matchword 函数的参数传递过去
    def matchword(s: (String, Int), broadcast1: Broadcast[ArrayBuffer[String]]): List[(String ,Int)]