val con = "10.20.30.91:2181"
val topics = "topic1"
val group = "group1"
val numThreads = 6
val ssc = new StreamingContext(sc,Seconds(2))
val sqc = new SQLContext(sc)
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, con, group, topicMap).map(_._2)
val showLines = lines.window(Minutes(60))
showLines.foreachRDD( rdd => {
val t = sqc.jsonRDD(rdd)
t.registerTempTable("kafka_test")
})
ssc.start()
这是我写的关于spark streaming读取kafka数据的程序,但是当数据量大的时候,就会堵死,我想实现并发的功能,已达到数据的实时性,该如何去做?谢谢大家了
val topics = "topic1"
val group = "group1"
val numThreads = 6
val ssc = new StreamingContext(sc,Seconds(2))
val sqc = new SQLContext(sc)
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, con, group, topicMap).map(_._2)
val showLines = lines.window(Minutes(60))
showLines.foreachRDD( rdd => {
val t = sqc.jsonRDD(rdd)
t.registerTempTable("kafka_test")
})
ssc.start()
这是我写的关于spark streaming读取kafka数据的程序,但是当数据量大的时候,就会堵死,我想实现并发的功能,已达到数据的实时性,该如何去做?谢谢大家了
解决方案 »
- 求助----安装glance时出错
- nova-volume和swift的区别
- 痛苦:openstack nova 创建运行虚拟机error
- linux主机下amh和wdcp那个好用些?占内存少?
- 我搭建的hadoop2.4.0完全分布集群,起了两个manenode,zkfc没有启动,请问怎么解决?
- ipconfig/aal 为何在cisco packet tracer模拟中 查询主机信息提示无效指令!!!
- 容器的用户的一些疑问 容器的用户是谁创建的?宿主机管理员创建一个容器,这个容器是不是有一个根用户?它的用户名口令字是什么
- docker中-t -i的疑问
- 我只想在windows上运行打包好的镜像?
- RowMatrix 的转置怎么处理。
- hbase 协处理器不生效啊???
- 求助:spark on yarn中client模式报错
普通的createStream方法,是指定某个executor的一个线程去充当receiver,单线程的从Kafka上消费数据;而DirectStream则是每个executor的每个线程都主动去Kafka上获取数据。但前者Spark会帮你维护Consumer offset,后者要求你自己搞。网上很多DirectStream的文档,去看看吧
val ssc = new StreamingContext(sc, Seconds(1))
val topicsSet = Set("topic1")
val brokers = "10.20.30.91:2181"
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)