spark官网,多个receive(对应多个输入dstream)并行运行通过下面的代码解决:int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(numStreams);
for (int i = 0; i < numStreams; i++) {
  kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();
我的程序中,使用kafka源,单个输入dstream是没有问题,当采用多个Dstream时,经过测试,两个输入DStream中的数据都接收到了,但问题是:程序只运行一次,或者说只接收一次数据,后面就不再接收了,我的代码如下:
                String groupId = args[0];
String zookeepers = args[1];
String topics = "tpsN5a";
Integer numPartitions = Integer.parseInt(args[3]);

Map<String, Integer> topicsMap = new HashMap<String, Integer>();
for (String topic : topics.split(",")) {
topicsMap.put(topic, numPartitions);
}
// 多长时间统计一次
Duration batchInterval = Durations.seconds(2);
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaConsumerWordCount");

JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
batchInterval);

JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils
.createStream(ssc, zookeepers, groupId, topicsMap, StorageLevel.MEMORY_AND_DISK_SER());

String topics2 = "tpsN5b";
Map<String, Integer> topicsMap2 = new HashMap<String, Integer>();
topicsMap2.put(topics2, numPartitions);
JavaPairReceiverInputDStream<String, String> kafkaStream2 = KafkaUtils
.createStream(ssc, zookeepers, groupId, topicsMap2, StorageLevel.MEMORY_AND_DISK_SER());

List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(2);
kafkaStreams.add(kafkaStream);
kafkaStreams.add(kafkaStream2);

ssc.checkpoint("/spark/stream/checkpoint/d1"); JavaPairDStream<String, String> unifiedStream = ssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));

JavaDStream<String> lines = unifiedStream//kafkaStream
.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> arg0)
throws Exception {
logger.warn(Thread.currentThread().getName() +  " msg1:" + arg0._1 + "|msg2:" + arg0._2);
return arg0._2();
}
});请教如何解决上面提到的问题,当采用多个输入DStream并行接收数据时,streaming程序能持续接收数据,而不是只接收一次?

解决方案 »

  1.   

    我用的是spark1.3.1,使用了预写日志,我的预写日志的接收数据中,能接收到kafka发的消息,但在程序中怎么接收不到呢
      

  2.   

    运行了一下,没问题,在程序最后加上如下代码即可:
    ssc.start();
    ssc.awaitTermination();
      

  3.   

    请问你的多个输入DStream的groupid是一样的吗?换句话说你的多个输入DStream都属于一个消费者群组吗?