我现在有个RDD内容是这样的: result4: org.apache.spark.rdd.RDD[(String, Array[(String, String)])] 标记为:(a, Array[(b,c)]), 其中a,b,c都是String;
实际数据: Array[(String, Array[(String, String)])] = Array((219495999,Array((1285059912,1), (1285059912,2), (1049968715,1))))
我想把他转换成这种格式的rdd:
(219495999, 1285059912, 3) // 这里把1285059912 做了合并
(219495999, 1049968715,1)
应该怎么写?求大神支招~~ 不知道我有没有说清楚问题... 我的代码:// result4 就是上面的实际数据
val result5 = result4.flatMap{ x =>
val m = scala.collection.mutable.Map[String,Int]()
x._2.foreach{ y =>
var tmp = m.getOrElse(y._1, 0)
tmp = tmp + y._2.toInt
m(y._1) = tmp
}
val z = for((k,v) <- m) yield (x._1, k, v)
z.toArray
}
// 这段代码不能执行,报错:
//error: polymorphic expression cannot be instantiated to expected type;
//found : [B >: (String, String, Int)]Array[B]
//required: TraversableOnce[?]
// z.toArray
实际数据: Array[(String, Array[(String, String)])] = Array((219495999,Array((1285059912,1), (1285059912,2), (1049968715,1))))
我想把他转换成这种格式的rdd:
(219495999, 1285059912, 3) // 这里把1285059912 做了合并
(219495999, 1049968715,1)
应该怎么写?求大神支招~~ 不知道我有没有说清楚问题... 我的代码:// result4 就是上面的实际数据
val result5 = result4.flatMap{ x =>
val m = scala.collection.mutable.Map[String,Int]()
x._2.foreach{ y =>
var tmp = m.getOrElse(y._1, 0)
tmp = tmp + y._2.toInt
m(y._1) = tmp
}
val z = for((k,v) <- m) yield (x._1, k, v)
z.toArray
}
// 这段代码不能执行,报错:
//error: polymorphic expression cannot be instantiated to expected type;
//found : [B >: (String, String, Int)]Array[B]
//required: TraversableOnce[?]
// z.toArray
解决方案 »
- 在安装了xen的ubuntu中运行xm,出现ImportError:libxenstore.so.3.0:cannot open shared 等等
- 专用网络访问:公有云趋于成熟
- 【求助】不到10人的办公室搭建服务器方案
- 如何将本地开发的系统迁移到云端,数据存储问题 求大神
- 讨论帖:CloudFoundry/IronFoundry这类PaaS平台是不是必须的
- 大数据云计算项目解决方案求助
- AWS S3 使用的jar
- docker for window10 设置里面的 memory 只有3840M
- Executor的Storage Memory一直缓慢增长,Spark自身不回收
- Docker 可以继承基础镜像中的tool么?
- 用idea看spark源码编译不通过
- spark如何在一个进程中启动多个sparkContent,并保持稳定?
val sparkConf = new SparkConf().setAppName(this.getClass.getName + "").forDse
sparkConf.set("spark.cores.max", "2") //最多使用留个cores
.set("spark.executor.memory", "512M") //每个node使用1G内存
val sc = new SparkContext(sparkConf) val base = createTestData(sc)
val translate = base.flatMap {
x =>
x._2.map(f => (x._1 + "#" + f._1, f._2.toLong))
}
val cc = translate.reduceByKey(_ + _).map(x=>(x._1.split("#")(0),x._1.split("#")(1),x._2))
cc.collect()
} def createTestData(sc: SparkContext) = {
val aa = Array((219495999, Array((1285059912, 1), (1285059912, 2), (1049968715, 1))))
sc.parallelize(aa)
}
val b = result4.flatMap{ case (index, elements) =>
elements.map {case (value1, value2) => (index, value1, value2)}
}