各位大牛们,我现在有这样一个问题,我从redis中取数据,redis中的数据是安partition存储的,我需要让不同patition中的数据在各自的partition中分别处理,应该如何做啊。应该如何处理呢?

解决方案 »

  1.   

    你可以在foreachPartition算子内进行判断,然后根据条件执行指定的方法。
    例如rdd.foreachPartition(new VoidFunction<Iterator<String>>() {                    @Override
                        public void call(Iterator<String> it) throws Exception {
                            if( xxxxx ) { // 条件。比如遍历该分区的数据去取某个特征
                                new RealForeachPartitionFunc1().call(it);  // 执行真正的foreachPartition算子
                            } else {
                                new RealForeachPartitionFunc2().call(it); 
                            }
                       }
    });
      

  2.   

    自定义分区,rdd.partitionBy(/*自定义的分区*/new TestPartitioner()).foreachPartition {/*对分区内数据的操作代码*/}class TestPartitioner extends Partitioner {  //redis分区个数
      override def numPartitions: Int = ???  override def getPartition(key: Any): Int = {
        /*redis中的分区规则*/
      }
    }