import org.apache.spark.broadcast._
import scala.collection.mutable.ArrayBufferimport java.io._
object SimpleApp { var broadcast1: Broadcast[ArrayBuffer[String]] = _
val ip_grp_start = ArrayBuffer[String]() def matchword(s: (String, Int)): List[(String ,Int)] = { val fw = new FileWriter("/home/hadoop/a.txt", true)
val out = new PrintWriter(fw) out.println("11111111111111111"+broadcast1.value.length)
out.close()
return List(s)
} def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val line = sc.textFile("wordcount.txt") ip_grp_start += "fsda"
ip_grp_start += "dsfsf" broadcast1 = line.sparkContext.broadcast(ip_grp_start) val words = line.flatMap(line => line.split(" "))
val wordpair = words.map(word => (word,1)) val word = wordpair.flatMap(x => matchword(x))
val pair = word.reduceByKey(_+_) pair.collect().foreach(println)
sc.stop()
}
}
import scala.collection.mutable.ArrayBufferimport java.io._
object SimpleApp { var broadcast1: Broadcast[ArrayBuffer[String]] = _
val ip_grp_start = ArrayBuffer[String]() def matchword(s: (String, Int)): List[(String ,Int)] = { val fw = new FileWriter("/home/hadoop/a.txt", true)
val out = new PrintWriter(fw) out.println("11111111111111111"+broadcast1.value.length)
out.close()
return List(s)
} def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val line = sc.textFile("wordcount.txt") ip_grp_start += "fsda"
ip_grp_start += "dsfsf" broadcast1 = line.sparkContext.broadcast(ip_grp_start) val words = line.flatMap(line => line.split(" "))
val wordpair = words.map(word => (word,1)) val word = wordpair.flatMap(x => matchword(x))
val pair = word.reduceByKey(_+_) pair.collect().foreach(println)
sc.stop()
}
}
解决方案 »
- 关于VMware ESX/ESXi Server Support Openstack
- OpenStack专区与OpenStack群组
- 【每日译帖】创建Cinder卷时候状态一直是 creating,求解救
- 如何学习云计算
- 云存储实现
- 刚刚接触docker,想请问有没有可能在一个docker中启动另一个docker
- 百度云登不进去
- 可否搭建服务器,不使用校园网来访问外网。
- 求解,通过dockerfile构建samba镜像的容器服务无法启动,状态exited
- 阿里云数据仓库为什么要按照三层:ods,dw,app三层去设计?而不是四层,两层,这样设计的原因是什么?
- 想用AWS,但是没有信仰卡,有没有什么解决办法
- eclipse开发spark应用程序 spark2.1.0 导入哪个jar包?
17/02/19 15:07:47 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.2:56023 (size: 20.4 KB, free: 366.3 MB)
17/02/19 15:07:48 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 192.168.1.2, executor 0): java.lang.NullPointerException
at SimpleApp$.matchword(SimpleApp.scala:20)
at SimpleApp$$anonfun$4.apply(SimpleApp.scala:38)
at SimpleApp$$anonfun$4.apply(SimpleApp.scala:38)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)上面是错误报的空指针的错误。其实就是传到闭包函数里,这个广播变量的值为空。
val signPrefixes = sc.broadcast(loadCallSignTable())
val countryContactCounts = contactCounts.map{case (sign, count) =>
val country = lookupInArray(sign, signPrefixes.value)
(country, count)
}.reduceByKey((x, y) => x + y)
See how the signPrefixes broadcast val is defined in the main method, and how its value passed to lookupInArray method.
我估计是broadcast变量被发送到Executor上的时间导致的,
也就是说 还没有执行 broadcast1 = line.sparkContext.broadcast(ip_grp_start) 这句代码。broadcast1已经被发送到Executor上了。我暂时的解决办法是,不定义 var broadcast1。而是 val broadcast1 = line.sparkContext.broadcast(ip_grp_start),
然后将 broadcast1作为 matchword 函数的参数传递过去
def matchword(s: (String, Int), broadcast1: Broadcast[ArrayBuffer[String]]): List[(String ,Int)]