val con = "10.20.30.91:2181"
val topics = "topic1"
val group = "group1"
val numThreads = 6
val ssc = new StreamingContext(sc,Seconds(2))
val sqc = new SQLContext(sc)
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, con, group, topicMap).map(_._2)
val showLines = lines.window(Minutes(60))
showLines.foreachRDD( rdd => {
val t = sqc.jsonRDD(rdd)
t.registerTempTable("kafka_test")
})
ssc.start()这是我写的关于spark streaming读取kafka数据的程序,但是当数据量大的时候,就会堵死,我想实现并发的功能,已达到数据的实时性,该如何去做?谢谢大家了官网有这个 KafkaUtils.createDirectStream
但是我用的时候会出错Received -1 when reading from channel, socket has likely been closed
这个怎么用
val topics = "topic1"
val group = "group1"
val numThreads = 6
val ssc = new StreamingContext(sc,Seconds(2))
val sqc = new SQLContext(sc)
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, con, group, topicMap).map(_._2)
val showLines = lines.window(Minutes(60))
showLines.foreachRDD( rdd => {
val t = sqc.jsonRDD(rdd)
t.registerTempTable("kafka_test")
})
ssc.start()这是我写的关于spark streaming读取kafka数据的程序,但是当数据量大的时候,就会堵死,我想实现并发的功能,已达到数据的实时性,该如何去做?谢谢大家了官网有这个 KafkaUtils.createDirectStream
但是我用的时候会出错Received -1 when reading from channel, socket has likely been closed
这个怎么用
解决方案 »
- Eclipse下执行Hadoop程序报错
- 在属性A上建立B树索引所需空间1.44Xn/m*p,其中1.44是怎么来的呢?
- 如何让docker以daemon方式运行/bin/bash
- docker 是用go语言 , 将 lxc 重写了一遍 ,, 提供了更加友好的接口?
- 询问下,这些配置的R720服务器功耗都是多少?
- cloudera hue创建oozie workflow跑pyspark程序方法
- 如何用spark实现:调用外部程序或者调用动态链接库函数,对批量文件进行处理?
- spark如何更新mysql已有数据
- 关于在cmd启动spark—shell报错
- 现在缓存技术那个比较好
- spark streaming, kafka导入数据到es性能调优
- 关于大数据的编程与算法~~~很困惑,求助MapReduce和Spark大神、前辈
val kafkaDStreams = (1 to numInputDStreams).map { _ =>KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)}
kafkaDStreams.map(
你的处理逻辑
)
多进程读取kafka
提交的时候加上这个,后面的数字根据你集群的处理能力来定,每秒钟每个进程最多从每个partition消费多少数据
--conf spark.streaming.kafka.maxRatePerPartition=10000