val con = "10.20.30.91:2181"
val topics = "topic1"
val group = "group1"
val numThreads = 6
val ssc = new StreamingContext(sc,Seconds(2))
val sqc = new SQLContext(sc)
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, con, group, topicMap).map(_._2)
val showLines = lines.window(Minutes(60))
showLines.foreachRDD( rdd => {
        val t = sqc.jsonRDD(rdd)
        t.registerTempTable("kafka_test")
})
ssc.start()这是我写的关于spark streaming读取kafka数据的程序,但是当数据量大的时候,就会堵死,我想实现并发的功能,已达到数据的实时性,该如何去做?谢谢大家了官网有这个 KafkaUtils.createDirectStream
但是我用的时候会出错Received -1 when reading from channel, socket has likely been closed
这个怎么用

解决方案 »

  1.   

    你是不是连了zookeeper?createDirectStream直接流模式是连的broker
      

  2.   

    val numInputDStreams = 4
    val kafkaDStreams = (1 to numInputDStreams).map { _ =>KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)}
    kafkaDStreams.map(
        你的处理逻辑
    )
    多进程读取kafka
    提交的时候加上这个,后面的数字根据你集群的处理能力来定,每秒钟每个进程最多从每个partition消费多少数据
    --conf spark.streaming.kafka.maxRatePerPartition=10000
      

  3.   

    配置一下 spark.streaming.backpressure.enabled 和 spark.streaming.backpressure.initialRate 两个参数