我现在有个RDD内容是这样的: result4:  org.apache.spark.rdd.RDD[(String, Array[(String, String)])]  标记为:(a,  Array[(b,c)]), 其中a,b,c都是String; 
实际数据: Array[(String, Array[(String, String)])] = Array((219495999,Array((1285059912,1), (1285059912,2), (1049968715,1))))
我想把他转换成这种格式的rdd: 
(219495999, 1285059912, 3)  // 这里把1285059912 做了合并
(219495999, 1049968715,1)
应该怎么写?求大神支招~~   不知道我有没有说清楚问题... 我的代码:// result4 就是上面的实际数据
val result5 = result4.flatMap{ x =>
     val m = scala.collection.mutable.Map[String,Int]()
     x._2.foreach{ y =>
          var tmp = m.getOrElse(y._1, 0)
          tmp = tmp + y._2.toInt
          m(y._1) = tmp
     }
     val z = for((k,v) <- m) yield (x._1, k, v)
     z.toArray
}
// 这段代码不能执行,报错:
//error: polymorphic expression cannot be instantiated to expected type;
//found   : [B >: (String, String, Int)]Array[B]
//required: TraversableOnce[?]
//            z.toArray

解决方案 »

  1.   

    groupByKey?
      

  2.   

    def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName(this.getClass.getName + "").forDse
        sparkConf.set("spark.cores.max", "2") //最多使用留个cores
          .set("spark.executor.memory", "512M") //每个node使用1G内存
        val sc = new SparkContext(sparkConf)    val base = createTestData(sc)
        val translate = base.flatMap {
          x =>
            x._2.map(f => (x._1 + "#" + f._1, f._2.toLong))
        }
        val cc = translate.reduceByKey(_ + _).map(x=>(x._1.split("#")(0),x._1.split("#")(1),x._2))
        cc.collect()
      }  def createTestData(sc: SparkContext) = {
        val aa = Array((219495999, Array((1285059912, 1), (1285059912, 2), (1049968715, 1))))
        sc.parallelize(aa)
      }
      

  3.   


    val b = result4.flatMap{ case (index, elements) =>
      elements.map {case (value1, value2) => (index, value1, value2)}
    }