鄙人刚开始接触spark大数据处理,最近在做一些数据挖掘的实验,其中一个对比算法借用了spark库中的经典数据挖掘算法FP-Growth,我这里的集群是三台机器:1*master(2cores,4G mem), 2*worker(4cores,8G mem)。在进行的实验中挖掘的数据集是利用IBM的数据产生器生成的T40I10D100K.txt,大小14.76 MB。因为该数据集中重复出现的数字较少,所以支持度设为了1%。分区数为16。结果挖掘过程中,出现了堆溢出的错误,应该是由于growth过程中递归建立条件FP-Tree。这里我就产生了一个疑问,我的集群两台worker加起来可用内存也有12GB的样子,怎么会连15MB的数据都处理不了。是因为支持度过低,还是因为FP-Growth进行频繁项集的挖掘势必会造成这么大的内存消耗?    请各位有经验的前辈赐教,谢谢。

解决方案 »

  1.   

    这是调用FP-Growth的源码
    object PFP {
      def main(args: Array[String]): Unit = {
        val texts = mutable.Map(
       // "T25I10D10K.txt"->List(0.005,0.004,0.003,0.002),
       // "mushroom.txt"->List(0.01))
       // "chess.txt"->List(0.4))
       // "accidents.txt"->List(0.1))
       // "T10I4D100K.txt"->List(0.005,0.004,0.003,0.002,0.001),
        //"T40I10D100K.txt"->List(0.01))
        "connect-4.txt"->List(0.3))
       // "kddcup99.txt"->List(0.0001,0.00009,0.00008,0.00007,0.00006))
       //"USCensus.txt"->List(0.5))
     //  "connect-4.txt"->List(0.5))    val conf =new SparkConf().setAppName("PFP_scala")
        val sc =new SparkContext(conf)    texts.foreach{ text =>
        val writer =new PrintWriter(new File("/root/app/scala2.10/PFP/"+text._1))
        val data= sc.textFile("/usr/local/eclipsews/"+text._1)
        val startTime = System.currentTimeMillis()
        val transactions: RDD[Array[String]] = data.map(s => s.trim.split(' '))
        val ioTime = System.currentTimeMillis() - startTime    text._2.foreach{ support =>
        for(i<-0 to 0){    val time1 = System.currentTimeMillis()
        val fpg = new FPGrowth()
        .setMinSupport(support)
        .setNumPartitions(16)    val model = fpg.run(transactions)
        val process = java.lang.Runtime.getRuntime.exec("/root/app/scala2.10/PFP/checkHDFS.sh")
        process.waitFor();
        model.freqItemsets.saveAsTextFile("/usr/local/PFP")    val endTime = System.currentTimeMillis()
        val mineTime =endTime - time1
        //hehe.foreach { itemset =>println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)}
        val time = mineTime + ioTime
        writer.write("database:  "+text._1+"  support:  "+support+ " iotime:  " +ioTime +" mineTime "+ mineTime+  "  time:  "+ time+ "\n")
          }
        }
        writer.close()
       }