Spark监视UI页面上,出现好多SQLXXX,这个正常吗?请各位帮我看看,谢谢了!
下面是我的代码,就是从Kafka里拉取数据,然后转换成DateFrame后存储到elastic search中,
似乎只要是进入foreachRDD 里面一回,就会在SparkUI页面上产生一个SQL监视对象,是不是我代码写法有问题
    val logs = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet).map(_._2)
    /*
     * schema初期化
     */
    val host = StructField("host", StringType, true)
    val handle = StructField("handle", StringType, true)
    val host_handle = StructField("host_handle", StringType, true)
    val timestemp = StructField("timestemp", StringType, true)
    val time = StructField("time", IntegerType, true)
    val fromip = StructField("fromip", StringType, true)
    val schema = StructType(Array(host, handle, host_handle, timestemp, time, fromip))
    logs.foreachRDD { rdd =>
      /*
     * SQLContext初期化
     */
      val sqlContext = new SQLContext(rdd.sparkContext)
      //      /*
      //     * DataFrame作成
      //     */
      if (!rdd.partitions.isEmpty) {
        val rowRDD = rdd.map(_.split(" ")).map(p =>
          Row(p(1),
            p(8).substring(1, p(8).length()),
            p(1) + "_" + p(8).substring(1, p(8).length()),
            stringTodate(p(5).substring(1, p(5).length())) + p(6).substring(0, p(6).length() - 1),
            p(20).toInt,
            p(2).substring(1, p(2).length())))
        sqlContext.createDataFrame(rowRDD, schema).saveToEs(esResource)
      }
      sqlContext.clearCache()
    }
    ssc.start()
    ssc.awaitTermination()

解决方案 »

  1.   

    logs.foreachRDD { rdd =>
          /*
         * SQLContext初期化
         */
          val sqlContext = new SQLContext(rdd.sparkContext)
          //      /*
          //     * DataFrame作成
          //     */
          if (!rdd.partitions.isEmpty) {
            val rowRDD = rdd.map(_.split(" ")).map(p =>
              Row(p(1),
                p(8).substring(1, p(8).length()),
                p(1) + "_" + p(8).substring(1, p(8).length()),
                stringTodate(p(5).substring(1, p(5).length())) + p(6).substring(0, p(6).length() - 1),
                p(20).toInt,
                p(2).substring(1, p(2).length())))
            sqlContext.createDataFrame(rowRDD, schema).saveToEs(esResource)
          }
    你每个rdd都给创建一个SQLContext 当然机web UI上面会有很多的sql tab了。你用一个单例的SQLContex就可以了。