直接上简化后的代码:class TestClass extends Serializable {
val map=Map[String,String]();
private def addItem(s:String){
val sArr=s.split(",");
map(sArr(0))=sArr(1);
println("***TEST item added: "+sArr(0)+"->"+sArr(1));
println("***TEST map size: "+map.size);
} def test(){
val itemsFile = spark.sparkContext.textFile("./items.txt");
itemsFile.foreach( addItem(_) );
        //问题:下行代码输出为0!
println("***TEST map size is "+map.size);
}
}addItem()就是把一个(K,v)加到一个类成员变量map里。test()本意是,读入一个文件(里面每一行是一个(k,v)对)到RDD,然后处理每一行,把相应的(k,v)加到类成员变量map里。调用test()时,可以看到addItem()每次都被成功调用,类成员变量map的size也一直增长。但到执行最后一行代码的时候,map被清空了!size为0注意:如果我不用定义一个类来实现,而是直接写additem,test方法,运行就没问题。求为什么会这样?怎么解决?谢谢!

解决方案 »

  1.   

    You misunderstand the distribution meaning in Spark.The map instance running in your method addItem() is NOT the same as in your driver, which you try to print in the test().Change your code to print the map instance identityHashCode will prove it:
    class TestClass extends Serializable {
      val map=Map[String,String]();
      private def addItem(s:String){
      val sArr=s.split(",");
      map(sArr(0))=sArr(1);
      println("***TEST item added: "+sArr(0)+"->"+sArr(1));
      println("***TEST map size: "+map.size);
      println(s"identityHashCode of map is ${System.identityHashCode(map)")
    }def test(){
      val itemsFile = spark.sparkContext.textFile("./items.txt");
      itemsFile.foreach( addItem(_) );
            //问题:下行代码输出为0!
      println("***TEST map size is "+map.size);
       println(s"identityHashCode of map is ${System.identityHashCode(map)")
    }
    }
    See the 2 lines I added, when you run in the cluster, you will find the identityHashCode printed in 2 methods are NOT the same.
      

  2.   

    谢谢您的回复!那请问如何得到spark的处理结果?(就是test()方法里itemsFile.foreach( addItem(_) );)刚开始学习spark,的确对spark的运行机制、本质知之甚少。不过用spark的分布式处理能力处理变量,却不能方便的保留结果,感觉spark应该提供相关机制这样才更方便啊。
      

  3.   

    Spark 当然可以保留结果. I will demo in English, hope you don't mind, as typing Chinese is too slow for me.You basically has 2 ways to keep your result:1) Bring the data back to driver (using collect() API. This works on both RDD and DATAFRAME). But keep in mind that this will bring whole dataset back to driver (one node), so if the dataset is very big, then you will have memory pressure for that one node.
    2) Saving the result dataset into a distributed storage. This is a normal way in most case. You can save your result into HDFS, S3, Cassandra, Hbase etc.It looks like your originally code is trying to do in RDD api, so I demo in RDD api too, but even though RDD API is more powerful, but it misses the catalyst optimization, so its performance is not as good as DataFrame API in most cases. It looks like your originally code is trying to dedup of the data (Since you are using MAP), but I have to warn you that it is dangerous to do what you tried to do. Keep in mind that in Spark or any distribution framework, the ORDER is never promised, unless you sort the data first (But it is a very expensive operation).  For example, if your text data is liking:
    1,value1
    2,value2
    3,value3
    1,valueNewThere is not promise that the last value "1, valueNew" will replace "1,value1", as they could be read by different machine at totally different time, so "1, valueNew" could read on machine 2 before "1,value1" read on machine1, then using "valueNew" to replace "value1" is never good idea, unless your data has one more field to give the order, like following:1,value1,1
    2,value2,2
    3,value3,3
    1,valueNew,4So we can use the 3rd column to tell us which should replace which, or use "offset" bytes from the file as the ordering column too.Anyway, the following example show you how to group them together and using "collect" bring them back to the driver
    scala> spark.version
    res2: String = 2.1.0scala> val textFile = sc.makeRDD(Array("1,value1", "2,value2", "3,value3", "1,valueNew"))
    textFile: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at makeRDD at <console>:24scala> val keyvalue = textFile.map(s => (s.split(",")(0), s.split(",")(1)))
    keyvalue: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[1] at map at <console>:26scala> keyvalue.groupByKey.collect.foreach(println)
    (2,CompactBuffer(value2))
    (3,CompactBuffer(value3))
    (1,CompactBuffer(value1, valueNew))
      

  4.   

    非常感谢您的回复!以后spark的学习中还要多多向您请教!
    已关注您的ID :D
      

  5.   

    不客气,如果你的英文不错的话,可以考虑自己上这个课:
    https://www.coursera.org/learn/scala-spark-big-dataIt is online, free, and give you clear picture about how Spark/Distribution works.