val con = "10.20.30.91:2181"
val topics = "topic1"
val group = "group1"
val numThreads = 6
val ssc = new StreamingContext(sc,Seconds(2))
val sqc = new SQLContext(sc)
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, con, group, topicMap).map(_._2)
val showLines = lines.window(Minutes(60))
showLines.foreachRDD( rdd => {
        val t = sqc.jsonRDD(rdd)
        t.registerTempTable("kafka_test")
})
ssc.start()
这是我写的关于spark streaming读取kafka数据的程序,但是当数据量大的时候,就会堵死,我想实现并发的功能,已达到数据的实时性,该如何去做?谢谢大家了

解决方案 »

  1.   

    KafkaUtils.createDirectStream
    普通的createStream方法,是指定某个executor的一个线程去充当receiver,单线程的从Kafka上消费数据;而DirectStream则是每个executor的每个线程都主动去Kafka上获取数据。但前者Spark会帮你维护Consumer offset,后者要求你自己搞。网上很多DirectStream的文档,去看看吧
      

  2.   

    Received -1 when reading from channel, socket has likely been closede尝试使用那种方法出现这个错误,而且这种方法没有group
    val ssc = new StreamingContext(sc, Seconds(1))
    val topicsSet = Set("topic1")
    val brokers = "10.20.30.91:2181" 
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")
    val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)