spark官网,多个receive(对应多个输入dstream)并行运行通过下面的代码解决:int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(numStreams);
for (int i = 0; i < numStreams; i++) {
kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();
我的程序中,使用kafka源,单个输入dstream是没有问题,当采用多个Dstream时,经过测试,两个输入DStream中的数据都接收到了,但问题是:程序只运行一次,或者说只接收一次数据,后面就不再接收了,我的代码如下:
String groupId = args[0];
String zookeepers = args[1];
String topics = "tpsN5a";
Integer numPartitions = Integer.parseInt(args[3]);
Map<String, Integer> topicsMap = new HashMap<String, Integer>();
for (String topic : topics.split(",")) {
topicsMap.put(topic, numPartitions);
}
// 多长时间统计一次
Duration batchInterval = Durations.seconds(2);
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaConsumerWordCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
batchInterval);
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils
.createStream(ssc, zookeepers, groupId, topicsMap, StorageLevel.MEMORY_AND_DISK_SER());
String topics2 = "tpsN5b";
Map<String, Integer> topicsMap2 = new HashMap<String, Integer>();
topicsMap2.put(topics2, numPartitions);
JavaPairReceiverInputDStream<String, String> kafkaStream2 = KafkaUtils
.createStream(ssc, zookeepers, groupId, topicsMap2, StorageLevel.MEMORY_AND_DISK_SER());
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(2);
kafkaStreams.add(kafkaStream);
kafkaStreams.add(kafkaStream2);
ssc.checkpoint("/spark/stream/checkpoint/d1"); JavaPairDStream<String, String> unifiedStream = ssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
JavaDStream<String> lines = unifiedStream//kafkaStream
.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> arg0)
throws Exception {
logger.warn(Thread.currentThread().getName() + " msg1:" + arg0._1 + "|msg2:" + arg0._2);
return arg0._2();
}
});请教如何解决上面提到的问题,当采用多个输入DStream并行接收数据时,streaming程序能持续接收数据,而不是只接收一次?
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(numStreams);
for (int i = 0; i < numStreams; i++) {
kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();
我的程序中,使用kafka源,单个输入dstream是没有问题,当采用多个Dstream时,经过测试,两个输入DStream中的数据都接收到了,但问题是:程序只运行一次,或者说只接收一次数据,后面就不再接收了,我的代码如下:
String groupId = args[0];
String zookeepers = args[1];
String topics = "tpsN5a";
Integer numPartitions = Integer.parseInt(args[3]);
Map<String, Integer> topicsMap = new HashMap<String, Integer>();
for (String topic : topics.split(",")) {
topicsMap.put(topic, numPartitions);
}
// 多长时间统计一次
Duration batchInterval = Durations.seconds(2);
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaConsumerWordCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
batchInterval);
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils
.createStream(ssc, zookeepers, groupId, topicsMap, StorageLevel.MEMORY_AND_DISK_SER());
String topics2 = "tpsN5b";
Map<String, Integer> topicsMap2 = new HashMap<String, Integer>();
topicsMap2.put(topics2, numPartitions);
JavaPairReceiverInputDStream<String, String> kafkaStream2 = KafkaUtils
.createStream(ssc, zookeepers, groupId, topicsMap2, StorageLevel.MEMORY_AND_DISK_SER());
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(2);
kafkaStreams.add(kafkaStream);
kafkaStreams.add(kafkaStream2);
ssc.checkpoint("/spark/stream/checkpoint/d1"); JavaPairDStream<String, String> unifiedStream = ssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
JavaDStream<String> lines = unifiedStream//kafkaStream
.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> arg0)
throws Exception {
logger.warn(Thread.currentThread().getName() + " msg1:" + arg0._1 + "|msg2:" + arg0._2);
return arg0._2();
}
});请教如何解决上面提到的问题,当采用多个输入DStream并行接收数据时,streaming程序能持续接收数据,而不是只接收一次?
解决方案 »
- 京东采用openstack技术提供云计算能力,不知这次是否用上啦?
- OpenStack魅力非凡
- 求助:镜像在openstack上启动蓝屏了
- Mapreduce中的作业指的是什么
- ipconfig/aal 为何在cisco packet tracer模拟中 查询主机信息提示无效指令!!!
- 请教下各位大神 我在用aws s3 bucket~我想让bucket下的文件上传后可以在公网访问~ 但是我开放后访问一直是401.。unauthorized~~
- 求助:小型效果图公司,咨询搭建何种服务器?
- spark on yarn 概念问题
- 如何解决IT管理中的存储异构兼容性问题
- webui显示的executor小于设置的
- 如何使用手机APP用NFC来模拟M1卡
- 关于dockerfile建立镜像的一些困惑
ssc.start();
ssc.awaitTermination();