使用spark 1.6 原始数据如下
val baseDF=hiveContext.sql(newSql)ID ID2 C1 C2 C3 C4 C5 .....C33
CM1 a 1 1 1 0 0
CM2 a 1 1 0 1 0
CM3 a 1 0 1 1 1
CM4 a 1 1 1 1 1
CM5 a 1 1 1 1 1
1k2 b 0 0 1 1 1
1K3 b 1 1 1 1 1
1K1 b 0 0 0 0 1
ID ID2 C1 C2 C3 C4 C5 .....C33
CM1 a 1 1 1 0 0
CM2 a 0 0 0 1 0
CM3 a 0 0 0 0 1
CM4 a 0 0 0 0 0
CM5 a 0 0 0 0 0
1K1 b 0 0 0 0 1
1k2 b 0 0 1 1 0
1K3 b 1 1 0 0 0逻辑是根据ID2做groupby 然后找ID 最小同时Cn 为1 的设置为1 ,其他的设置为0
而Cn 最多为C33
如果用case class 会超过上限这是我目前尝试的 ,出来结果是错的 ,理解过后应该是用错方法 应该要用group
case class testGoods(ID: String, ID2: String, C1 : String, C2 : String)val cartMap = new HashMap[String, Set[(String,String,String)]] with MultiMap[String,(String,String,String)]val baseDF=hiveContext.sql(newSql)val testRDD=baseDF.mapPartitions( partition => {
while (partition.hasNext) {
val record = partition.next()
val ID = record.getString(0)
if (ID != null && ID != "null") {
val ID2=record.getString(1)
val C1=record.getString(2)
val C2=record.getString(3)
cartMap.addBinding(ID2, (ID,C1,C2))
}
}
cartMap.iterator
})val recordList = new mutable.ListBuffer[testGoods]()
val testRDD1=testRDD.mapPartitions( partition => {
while (partition.hasNext) {
val record = partition.next()
val ID2=record._1
val recordRow= record._2
val sortedRecordRow = TreeSet[(String,String,String)]() ++ recordRow
val dic=new mutable.HashMap[String,String]
for(v<-sortedRecordRow) {
val ID = v._1
val C1 = v._2
val C2 = v._3 if (dic.contains(ID2)){
val goodsValue=dic.get(ID2)
if("1".equals(goodsValue)){
recordList.append(new testGoods(ID, ID2, "0", C2))
}else{
dic.put(ID2,C1)
recordList.append(new testGoods(ID, ID2, C1,C2))
}
}else{
dic.put(ID2,C1)
recordList.append(new testGoods(ID, ID2, C1, C2))
}
}
}
recordList.iterator
})
val searchToItemNewDF = hiveContext.createDataFrame(testRDD1).repartition(1)
.rdd.map { r => r.mkString("\t") }
.saveAsTextFile("/data/testRDD1")
查了网路上 好像大家都用 groupBy +agg 实现 自己也尝试使用
baseDF.groupBy("ID2").agg((collect_list($"ID"), collect_list($"ID2")))
但试了好久都没办法把上面使用mapPartitions的逻辑用上
请问如何使用DataFrame实现?
val baseDF=hiveContext.sql(newSql)ID ID2 C1 C2 C3 C4 C5 .....C33
CM1 a 1 1 1 0 0
CM2 a 1 1 0 1 0
CM3 a 1 0 1 1 1
CM4 a 1 1 1 1 1
CM5 a 1 1 1 1 1
1k2 b 0 0 1 1 1
1K3 b 1 1 1 1 1
1K1 b 0 0 0 0 1
ID ID2 C1 C2 C3 C4 C5 .....C33
CM1 a 1 1 1 0 0
CM2 a 0 0 0 1 0
CM3 a 0 0 0 0 1
CM4 a 0 0 0 0 0
CM5 a 0 0 0 0 0
1K1 b 0 0 0 0 1
1k2 b 0 0 1 1 0
1K3 b 1 1 0 0 0逻辑是根据ID2做groupby 然后找ID 最小同时Cn 为1 的设置为1 ,其他的设置为0
而Cn 最多为C33
如果用case class 会超过上限这是我目前尝试的 ,出来结果是错的 ,理解过后应该是用错方法 应该要用group
case class testGoods(ID: String, ID2: String, C1 : String, C2 : String)val cartMap = new HashMap[String, Set[(String,String,String)]] with MultiMap[String,(String,String,String)]val baseDF=hiveContext.sql(newSql)val testRDD=baseDF.mapPartitions( partition => {
while (partition.hasNext) {
val record = partition.next()
val ID = record.getString(0)
if (ID != null && ID != "null") {
val ID2=record.getString(1)
val C1=record.getString(2)
val C2=record.getString(3)
cartMap.addBinding(ID2, (ID,C1,C2))
}
}
cartMap.iterator
})val recordList = new mutable.ListBuffer[testGoods]()
val testRDD1=testRDD.mapPartitions( partition => {
while (partition.hasNext) {
val record = partition.next()
val ID2=record._1
val recordRow= record._2
val sortedRecordRow = TreeSet[(String,String,String)]() ++ recordRow
val dic=new mutable.HashMap[String,String]
for(v<-sortedRecordRow) {
val ID = v._1
val C1 = v._2
val C2 = v._3 if (dic.contains(ID2)){
val goodsValue=dic.get(ID2)
if("1".equals(goodsValue)){
recordList.append(new testGoods(ID, ID2, "0", C2))
}else{
dic.put(ID2,C1)
recordList.append(new testGoods(ID, ID2, C1,C2))
}
}else{
dic.put(ID2,C1)
recordList.append(new testGoods(ID, ID2, C1, C2))
}
}
}
recordList.iterator
})
val searchToItemNewDF = hiveContext.createDataFrame(testRDD1).repartition(1)
.rdd.map { r => r.mkString("\t") }
.saveAsTextFile("/data/testRDD1")
查了网路上 好像大家都用 groupBy +agg 实现 自己也尝试使用
baseDF.groupBy("ID2").agg((collect_list($"ID"), collect_list($"ID2")))
但试了好久都没办法把上面使用mapPartitions的逻辑用上
请问如何使用DataFrame实现?
解决方案 »
- openstack的第二块网卡无法启动
- 【福利大放送】FusionSphere Foundation版(试用版)下载
- openstack中可以设置ACL吗
- HP Helion有奖调查活动正式启动!奖品等你来拿!
- java转行SDN 做底层openStack,openContrail对个人发展,技术感到有点迷茫觉
- 老调重弹,该怎么做呢?类似IM的软件,架构上面怎么设计?
- 求助:小型效果图公司,咨询搭建何种服务器?
- 有什么方式能让java后台程序直接调用spark,并且直接收到spark计算后的返回值?
- Spark源码编译问题
- 什么才算是云计算产品?很疑惑
- 亚马逊AWS 使用putty或winSCP连接时:服务器拒绝接受我们的密钥。
- hive连接不上mysql
然后分别看 C1 ....C33 每一列 找到ID首先出现 1的留下 其他的设为0原始数据
ID ID2 C1 C2 ....C33
CM1 a 1 0
CM2 a 1 0
1K13 f 0 0
CM4 a 1 1
CM5 a 1 1
1K14 f 0 1
1K2 b 0 1
1K3 b 1 1
1K11 f 0 0
1K12 f 0 0
1K1 b 1 0
CM3 a 1 0
目标输出
ID ID2 C1 C2
CM1 a 1 0
CM2 a 0 0
CM3 a 0 1
CM4 a 0 0
CM5 a 0 0
1K1 b 1 0
1K2 b 0 1
1K3 b 0 0
1K11 f 0 0
1K12 f 0 0
1K13 f 0 0
1K14 f 0 1
case class TestGoods(ID:String,ID2:String,C1:Int,C2:Int)
case class CompressedRows(id2:String,ids:Array[String],indexs:Array[Int]) def main(args: Array[String]) {
import session.implicits._
val random = new Random(10)
val generator: () => Int = () => {
if (random.nextBoolean()) 1
else 0
}
val datasource = (1 to 100).map(idx => TestGoods(s"ID-${idx}", s"ID2-${idx % 20}", generator(), generator())) val df = session.sparkContext.makeRDD(datasource).toDF("id", "id2", "c1", "c2")
df.show(false)
import session.implicits.newStringEncoder
df.groupByKey { case Row(_, id2: String, _, _) => id2 }
.mapGroups {
case (id2, rows) =>
val cs: Array[(Int, Boolean)] = (1 to 2).map(_ => (-1, false)).toArray
val sorted = rows.toList.sortBy(row => row(0).hashCode())
(0 until sorted.length).foreach { idx =>
val row = sorted(idx)
row match {
case Row(id, id2, c1: Int, c2: Int) =>
if (!cs(0)._2 && c1 == 1) cs(0) = (idx -> true)
if (!cs(1)._2 && c2 == 1) cs(1) = (idx -> true)
}
}
val compressedIdx = cs.map(r => if (r._2) r._1 else sorted.length - 1)
CompressedRows(id2,sorted.map(r=>r(0).toString).toArray,compressedIdx)
}.show()
}
虽然我使用的是spark1.6 不能直接跑 但是大概知道如何解决问题了