Spark监视UI页面上,出现好多SQLXXX,这个正常吗?请各位帮我看看,谢谢了!
下面是我的代码,就是从Kafka里拉取数据,然后转换成DateFrame后存储到elastic search中,
似乎只要是进入foreachRDD 里面一回,就会在SparkUI页面上产生一个SQL监视对象,是不是我代码写法有问题
val logs = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet).map(_._2)
/*
* schema初期化
*/
val host = StructField("host", StringType, true)
val handle = StructField("handle", StringType, true)
val host_handle = StructField("host_handle", StringType, true)
val timestemp = StructField("timestemp", StringType, true)
val time = StructField("time", IntegerType, true)
val fromip = StructField("fromip", StringType, true)
val schema = StructType(Array(host, handle, host_handle, timestemp, time, fromip))
logs.foreachRDD { rdd =>
/*
* SQLContext初期化
*/
val sqlContext = new SQLContext(rdd.sparkContext)
// /*
// * DataFrame作成
// */
if (!rdd.partitions.isEmpty) {
val rowRDD = rdd.map(_.split(" ")).map(p =>
Row(p(1),
p(8).substring(1, p(8).length()),
p(1) + "_" + p(8).substring(1, p(8).length()),
stringTodate(p(5).substring(1, p(5).length())) + p(6).substring(0, p(6).length() - 1),
p(20).toInt,
p(2).substring(1, p(2).length())))
sqlContext.createDataFrame(rowRDD, schema).saveToEs(esResource)
}
sqlContext.clearCache()
}
ssc.start()
ssc.awaitTermination()
下面是我的代码,就是从Kafka里拉取数据,然后转换成DateFrame后存储到elastic search中,
似乎只要是进入foreachRDD 里面一回,就会在SparkUI页面上产生一个SQL监视对象,是不是我代码写法有问题
val logs = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet).map(_._2)
/*
* schema初期化
*/
val host = StructField("host", StringType, true)
val handle = StructField("handle", StringType, true)
val host_handle = StructField("host_handle", StringType, true)
val timestemp = StructField("timestemp", StringType, true)
val time = StructField("time", IntegerType, true)
val fromip = StructField("fromip", StringType, true)
val schema = StructType(Array(host, handle, host_handle, timestemp, time, fromip))
logs.foreachRDD { rdd =>
/*
* SQLContext初期化
*/
val sqlContext = new SQLContext(rdd.sparkContext)
// /*
// * DataFrame作成
// */
if (!rdd.partitions.isEmpty) {
val rowRDD = rdd.map(_.split(" ")).map(p =>
Row(p(1),
p(8).substring(1, p(8).length()),
p(1) + "_" + p(8).substring(1, p(8).length()),
stringTodate(p(5).substring(1, p(5).length())) + p(6).substring(0, p(6).length() - 1),
p(20).toInt,
p(2).substring(1, p(2).length())))
sqlContext.createDataFrame(rowRDD, schema).saveToEs(esResource)
}
sqlContext.clearCache()
}
ssc.start()
ssc.awaitTermination()
解决方案 »
- 京东采用openstack技术提供云计算能力,不知这次是否用上啦?
- 阿里和腾讯PK,华为消失了?--阿里分析
- 容器删除后,主机映射给容器的端口为何并立即未回收利用?
- 问:对于根设备,使用本地实例存储与使用 Amazon Elastic Block Storage (Amazon EBS) 有什么区别?
- 百度云盘 净网 和谐视频
- hadoop的各个守护节点都”正常“启动 但50030和50070打不开 还发现每个节点日志都有错误
- bosh_deployer 安装失败 求帮助
- 救救小弟啊:创建EC2实例的时候 将EBS容量写成了2T 怎么将EBS的容量改成20G?
- spark参与计算的Excutor和实际申请的excutor不相等
- 想问下桌面虚拟化怎么分配硬件资源?
- 如何用spark实现:调用外部程序或者调用动态链接库函数,对批量文件进行处理?
- 求助,sqoop从Mysql导入数据到Hive,为何有两行数据换行了?
/*
* SQLContext初期化
*/
val sqlContext = new SQLContext(rdd.sparkContext)
// /*
// * DataFrame作成
// */
if (!rdd.partitions.isEmpty) {
val rowRDD = rdd.map(_.split(" ")).map(p =>
Row(p(1),
p(8).substring(1, p(8).length()),
p(1) + "_" + p(8).substring(1, p(8).length()),
stringTodate(p(5).substring(1, p(5).length())) + p(6).substring(0, p(6).length() - 1),
p(20).toInt,
p(2).substring(1, p(2).length())))
sqlContext.createDataFrame(rowRDD, schema).saveToEs(esResource)
}
你每个rdd都给创建一个SQLContext 当然机web UI上面会有很多的sql tab了。你用一个单例的SQLContex就可以了。