使用spark 1.6 原始数据如下 
val baseDF=hiveContext.sql(newSql)ID    ID2   C1   C2   C3   C4   C5 .....C33
CM1 a 1 1 1 0 0
CM2 a 1 1 0 1 0
CM3 a 1 0 1 1 1
CM4 a 1 1 1 1 1
CM5 a 1 1 1 1 1
1k2    b    0    0    1    1    1
1K3    b    1    1    1    1    1
1K1    b    0    0    0    0    1


ID    ID2   C1   C2   C3   C4   C5 .....C33
CM1 a 1 1 1 0 0
CM2 a 0 0 0 1 0
CM3 a 0 0 0 0 1
CM4 a 0 0 0 0 0
CM5 a 0 0 0 0 0
1K1    b    0    0    0    0    1
1k2    b    0    0    1    1    0
1K3    b    1    1    0    0    0逻辑是根据ID2做groupby  然后找ID 最小同时Cn 为1  的设置为1  ,其他的设置为0
而Cn 最多为C33 
如果用case class 会超过上限这是我目前尝试的 ,出来结果是错的 ,理解过后应该是用错方法 应该要用group
case class testGoods(ID: String, ID2: String, C1 : String, C2 : String)val cartMap = new HashMap[String, Set[(String,String,String)]] with MultiMap[String,(String,String,String)]val baseDF=hiveContext.sql(newSql)val testRDD=baseDF.mapPartitions( partition => {
  while (partition.hasNext) {
    val record = partition.next()
    val ID = record.getString(0)
    if (ID != null && ID != "null") {
      val ID2=record.getString(1)
      val C1=record.getString(2)
      val C2=record.getString(3)
      cartMap.addBinding(ID2, (ID,C1,C2))
    }
  }
  cartMap.iterator
})val recordList = new mutable.ListBuffer[testGoods]()
val testRDD1=testRDD.mapPartitions( partition => {
  while (partition.hasNext) {
    val record = partition.next()
    val ID2=record._1
    val recordRow= record._2
    val sortedRecordRow = TreeSet[(String,String,String)]() ++ recordRow
    val dic=new mutable.HashMap[String,String]
    for(v<-sortedRecordRow) {
      val ID = v._1
      val C1 = v._2
      val C2 = v._3      if (dic.contains(ID2)){
        val goodsValue=dic.get(ID2)
        if("1".equals(goodsValue)){
          recordList.append(new testGoods(ID, ID2, "0", C2))
        }else{
          dic.put(ID2,C1)
          recordList.append(new testGoods(ID, ID2, C1,C2))
        }
      }else{
        dic.put(ID2,C1)
        recordList.append(new testGoods(ID, ID2, C1, C2))
      }
    }
  }
  recordList.iterator
})
val searchToItemNewDF = hiveContext.createDataFrame(testRDD1).repartition(1)
  .rdd.map { r => r.mkString("\t") }
  .saveAsTextFile("/data/testRDD1")
查了网路上 好像大家都用 groupBy +agg  实现 自己也尝试使用
baseDF.groupBy("ID2").agg((collect_list($"ID"), collect_list($"ID2")))
但试了好久都没办法把上面使用mapPartitions的逻辑用上
请问如何使用DataFrame实现?

解决方案 »

  1.   

    Don't know what you try to do. You need more detail example. 是我的中文不够好?
      

  2.   

    不好意思 我讲的不太清楚 新增测试数据目标是根据ID2 先分组  然后把分组下的ID 从小到大排列 
    然后分别看 C1 ....C33  每一列 找到ID首先出现 1的留下   其他的设为0原始数据 
    ID ID2 C1 C2 ....C33
    CM1 a 1 0
    CM2 a 1 0
    1K13 f 0 0
    CM4 a 1 1
    CM5 a 1 1
    1K14 f 0 1
    1K2 b 0 1
    1K3 b 1 1
    1K11 f 0 0
    1K12 f 0 0
    1K1 b 1 0
    CM3 a 1 0
    目标输出
    ID ID2 C1 C2
    CM1 a 1 0
    CM2 a 0 0
    CM3 a 0 1
    CM4 a 0 0
    CM5 a 0 0
    1K1 b 1 0
    1K2 b 0 1
    1K3 b 0 0
    1K11 f 0 0
    1K12 f 0 0
    1K13 f 0 0
    1K14 f 0 1
      

  3.   


    case class TestGoods(ID:String,ID2:String,C1:Int,C2:Int)
      case class CompressedRows(id2:String,ids:Array[String],indexs:Array[Int])  def main(args: Array[String]) {
        import session.implicits._
        val random = new Random(10)
        val generator: () => Int = () => {
          if (random.nextBoolean()) 1
          else 0
        }
        val datasource = (1 to 100).map(idx => TestGoods(s"ID-${idx}", s"ID2-${idx % 20}", generator(), generator()))    val df = session.sparkContext.makeRDD(datasource).toDF("id", "id2", "c1", "c2")
        df.show(false)
        import session.implicits.newStringEncoder
        df.groupByKey { case Row(_, id2: String, _, _) => id2 }
          .mapGroups {
            case (id2, rows) =>
              val cs: Array[(Int, Boolean)] = (1 to 2).map(_ => (-1, false)).toArray
              val sorted = rows.toList.sortBy(row => row(0).hashCode())
              (0 until sorted.length).foreach { idx =>
                val row = sorted(idx)
                row match {
                  case Row(id, id2, c1: Int, c2: Int) =>
                    if (!cs(0)._2 && c1 == 1) cs(0) = (idx -> true)
                    if (!cs(1)._2 && c2 == 1) cs(1) = (idx -> true)
                }
              }
              val compressedIdx = cs.map(r => if (r._2) r._1 else sorted.length - 1)
              CompressedRows(id2,sorted.map(r=>r(0).toString).toArray,compressedIdx)
          }.show()
      }
      

  4.   

    谢谢yangguo_2011的回复
    虽然我使用的是spark1.6 不能直接跑  但是大概知道如何解决问题了