刚接触spark,很多东西不太明白,现在的需求是这样的,假设我在外部声明了一个字段,在map中对这个字段进行了赋值,然后在reduce中对这个字段进行取值操作。我以wordcount为例,
object WordCount {
var str:String=null
   def main(args: Array[String]) {
     if (args.length < 1) {
       System.err.println("Usage: <file>")
      System.exit(1)
     }
  
     val conf = new SparkConf()
    val sc = new SparkContext(conf)
     val line = sc.textFile(args(0))
  
val counts= line.flatMap(_.split(" ")).map(word=>{
str="welcome to spark"
(word, 1)
})
println(str)
val finalRdd= counts.reduceByKey((x,y)=>{
x+y
println(str)
}).collect().foreach(println)
  
     sc.stop()
  }
 }
这是一个简单的wordcount代码,我只是声明了一个全局变量str,并且在map中对str进行了赋值操作,我想知道为什么map和reduce之间的那个println语句打印出来的str是null,是不是因为我没有进行action操作,map语句还没有执行的原因么?我希望将map中的str的赋值的内容传到reduce中去,在reduce中进行操作,请问有什么办法吗?

解决方案 »

  1.   

    你定义的str是本地的变量,不能够被集群中其他节点共享使用,你可以通过broadcast(广播)出去,使所有worker节点共享此变量使用。
      

  2.   

    据我所知广播变量不能再map中定义,而我的值需要通过map生成,需要将值传给广播变量,在序列化的时候会出现问题。
      

  3.   

    你分析的是对的 因为没有进行action的操作,所以RDD只是记忆了操作,而并没有进行操作。
    import org.apache.spark.{SparkConf, SparkContext}/**
      * Created by mahuichao on 16/8/12.
      */
    object Test04 {
      var str: String = ""  def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("test04").setMaster("local[2]")
        val sc = new SparkContext(conf)
        sc.setLogLevel("FATAL")    val path = "/Users/mahuichao/Downloads/test.txt"
        val file = sc.textFile(path)    file.flatMap(_.split(" ")).map { word =>
          str = "hello the crude world"
          (word, (1, str))
          //      (word,1)
        }.reduceByKey {
          case (x: (Int, String), y: (Int, String)) =>
            println("I am the value of str:" + str)
            (x._1 + y._1, str)
        }.map { case (x, (y1, y2)) =>
          (x, y1)
        }.collect().foreach(println)  }
    }