直接上简化后的代码:class TestClass extends Serializable {
val map=Map[String,String]();
private def addItem(s:String){
val sArr=s.split(",");
map(sArr(0))=sArr(1);
println("***TEST item added: "+sArr(0)+"->"+sArr(1));
println("***TEST map size: "+map.size);
} def test(){
val itemsFile = spark.sparkContext.textFile("./items.txt");
itemsFile.foreach( addItem(_) );
//问题:下行代码输出为0!
println("***TEST map size is "+map.size);
}
}addItem()就是把一个(K,v)加到一个类成员变量map里。test()本意是,读入一个文件(里面每一行是一个(k,v)对)到RDD,然后处理每一行,把相应的(k,v)加到类成员变量map里。调用test()时,可以看到addItem()每次都被成功调用,类成员变量map的size也一直增长。但到执行最后一行代码的时候,map被清空了!size为0注意:如果我不用定义一个类来实现,而是直接写additem,test方法,运行就没问题。求为什么会这样?怎么解决?谢谢!
val map=Map[String,String]();
private def addItem(s:String){
val sArr=s.split(",");
map(sArr(0))=sArr(1);
println("***TEST item added: "+sArr(0)+"->"+sArr(1));
println("***TEST map size: "+map.size);
} def test(){
val itemsFile = spark.sparkContext.textFile("./items.txt");
itemsFile.foreach( addItem(_) );
//问题:下行代码输出为0!
println("***TEST map size is "+map.size);
}
}addItem()就是把一个(K,v)加到一个类成员变量map里。test()本意是,读入一个文件(里面每一行是一个(k,v)对)到RDD,然后处理每一行,把相应的(k,v)加到类成员变量map里。调用test()时,可以看到addItem()每次都被成功调用,类成员变量map的size也一直增长。但到执行最后一行代码的时候,map被清空了!size为0注意:如果我不用定义一个类来实现,而是直接写additem,test方法,运行就没问题。求为什么会这样?怎么解决?谢谢!
class TestClass extends Serializable {
val map=Map[String,String]();
private def addItem(s:String){
val sArr=s.split(",");
map(sArr(0))=sArr(1);
println("***TEST item added: "+sArr(0)+"->"+sArr(1));
println("***TEST map size: "+map.size);
println(s"identityHashCode of map is ${System.identityHashCode(map)")
}def test(){
val itemsFile = spark.sparkContext.textFile("./items.txt");
itemsFile.foreach( addItem(_) );
//问题:下行代码输出为0!
println("***TEST map size is "+map.size);
println(s"identityHashCode of map is ${System.identityHashCode(map)")
}
}
See the 2 lines I added, when you run in the cluster, you will find the identityHashCode printed in 2 methods are NOT the same.
2) Saving the result dataset into a distributed storage. This is a normal way in most case. You can save your result into HDFS, S3, Cassandra, Hbase etc.It looks like your originally code is trying to do in RDD api, so I demo in RDD api too, but even though RDD API is more powerful, but it misses the catalyst optimization, so its performance is not as good as DataFrame API in most cases. It looks like your originally code is trying to dedup of the data (Since you are using MAP), but I have to warn you that it is dangerous to do what you tried to do. Keep in mind that in Spark or any distribution framework, the ORDER is never promised, unless you sort the data first (But it is a very expensive operation). For example, if your text data is liking:
1,value1
2,value2
3,value3
1,valueNewThere is not promise that the last value "1, valueNew" will replace "1,value1", as they could be read by different machine at totally different time, so "1, valueNew" could read on machine 2 before "1,value1" read on machine1, then using "valueNew" to replace "value1" is never good idea, unless your data has one more field to give the order, like following:1,value1,1
2,value2,2
3,value3,3
1,valueNew,4So we can use the 3rd column to tell us which should replace which, or use "offset" bytes from the file as the ordering column too.Anyway, the following example show you how to group them together and using "collect" bring them back to the driver
scala> spark.version
res2: String = 2.1.0scala> val textFile = sc.makeRDD(Array("1,value1", "2,value2", "3,value3", "1,valueNew"))
textFile: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at makeRDD at <console>:24scala> val keyvalue = textFile.map(s => (s.split(",")(0), s.split(",")(1)))
keyvalue: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[1] at map at <console>:26scala> keyvalue.groupByKey.collect.foreach(println)
(2,CompactBuffer(value2))
(3,CompactBuffer(value3))
(1,CompactBuffer(value1, valueNew))
已关注您的ID :D
https://www.coursera.org/learn/scala-spark-big-dataIt is online, free, and give you clear picture about how Spark/Distribution works.