请教一下:
val sqlContext = new SQLContext(sparkContext)
val result = rdd
.map(x => ({
var value :String = sqlContext.sql("select id from table")
}, 1)
在rdd的map里面,sqlContext输出是空的? 求教下原因?多谢啦~~
val sqlContext = new SQLContext(sparkContext)
val result = rdd
.map(x => ({
var value :String = sqlContext.sql("select id from table")
}, 1)
在rdd的map里面,sqlContext输出是空的? 求教下原因?多谢啦~~
解决方案 »
- OpenStack中文社区落户啦!
- 华为的FuisonManager产品能否管理VMware的虚拟化环境?
- CentOS6.4下PXE启动XP遇到txtsetup.sif丢失的问题
- 云计算晴雨表:诉说你的文件是否安全?
- openstack多个内网如何组成一个内部局域网
- ios中如何调用js
- 请教ec2通过iam role管理s3资源?
- sparkStreaming + kafka classnofound
- spark on yarn:启动时候报错Error initializing SparkContext,求各位大神指教。
- 讨论: 关于广播变量的用法
- salesforce开发微信上传临时素材问题!求高人指点!!!
- hadoop集群建立后启动不了
val result = rdd
.map(x => ({
var value :String = sqlContext.sql("select id from table")
}, 1)
你这个table创建了吗?
整体代码大体是这样的:
val conf = new SparkConf()
val sparkContext = new SparkContext(conf)
val sqlContext = new SQLContext(sparkContext)val srcTable = src.table(srcTbl).cache()
val ipTable = ip.table(ipTbl).cache()
val src_rdd = srcTable.select("type", "date", "ip").toDF("type", "date", "ip").cache()
val ip_rdd = ipTable.filter(ipTable("country") === 1).filter(ipTable("province") !== 0).select("start_ip", "end_ip", "province").cache()
ip_rdd.registerTempTable("ip_rdd")val result = src_rdd
.map(x => ({
var value :String = sqlContext.sql("select province from ip_rdd where start_ip <= 1032301638 and end_ip >= 1032301638 order by province limit 1" ).head().toString()
pvMap(x).concat(value)
pvMap(x)
}, 1))
.reduceByKey(_ + _)
这里在Map过程中调sqlContext,我print了一下,是空的。但是在val result = src_rdd.map 外面调用sqlContext.sql就是正常的。
表创建了。
整体代码大体是这样的:
val conf = new SparkConf()
val sparkContext = new SparkContext(conf)
val sqlContext = new SQLContext(sparkContext)val srcTable = src.table(srcTbl).cache()
val ipTable = ip.table(ipTbl).cache()
val src_rdd = srcTable.select("type", "date", "ip").toDF("type", "date", "ip").cache()
val ip_rdd = ipTable.filter(ipTable("country") === 1).filter(ipTable("province") !== 0).select("start_ip", "end_ip", "province").cache()
ip_rdd.registerTempTable("ip_rdd")val result = src_rdd
.map(x => ({
var value :String = sqlContext.sql("select province from ip_rdd where start_ip <= 1032301638 and end_ip >= 1032301638 order by province limit 1" ).head().toString()
pvMap(x).concat(value)
pvMap(x)
}, 1))
.reduceByKey(_ + _)
这里在Map过程中调sqlContext,我print了一下,是空的。但是在val result = src_rdd.map 外面调用sqlContext.sql就是正常的。
表创建了。
整体代码大体是这样的:
val conf = new SparkConf()
val sparkContext = new SparkContext(conf)
val sqlContext = new SQLContext(sparkContext)val srcTable = src.table(srcTbl).cache()
val ipTable = ip.table(ipTbl).cache()
val src_rdd = srcTable.select("type", "date", "ip").toDF("type", "date", "ip").cache()
val ip_rdd = ipTable.filter(ipTable("country") === 1).filter(ipTable("province") !== 0).select("start_ip", "end_ip", "province").cache()
ip_rdd.registerTempTable("ip_rdd")val result = src_rdd
.map(x => ({
var value :String = sqlContext.sql("select province from ip_rdd where start_ip <= 1032301638 and end_ip >= 1032301638 order by province limit 1" ).head().toString()
pvMap(x).concat(value)
pvMap(x)
}, 1))
.reduceByKey(_ + _)
这里在Map过程中调sqlContext,我print了一下,是空的。但是在val result = src_rdd.map 外面调用sqlContext.sql就是正常的。
我看你的sql结果应该是静态的,把这个值broadcast出去在map里访问就行,没必要用sqlContext
表创建了。
整体代码大体是这样的:
val conf = new SparkConf()
val sparkContext = new SparkContext(conf)
val sqlContext = new SQLContext(sparkContext)val srcTable = src.table(srcTbl).cache()
val ipTable = ip.table(ipTbl).cache()
val src_rdd = srcTable.select("type", "date", "ip").toDF("type", "date", "ip").cache()
val ip_rdd = ipTable.filter(ipTable("country") === 1).filter(ipTable("province") !== 0).select("start_ip", "end_ip", "province").cache()
ip_rdd.registerTempTable("ip_rdd")val result = src_rdd
.map(x => ({
var value :String = sqlContext.sql("select province from ip_rdd where start_ip <= 1032301638 and end_ip >= 1032301638 order by province limit 1" ).head().toString()
pvMap(x).concat(value)
pvMap(x)
}, 1))
.reduceByKey(_ + _)
这里在Map过程中调sqlContext,我print了一下,是空的。但是在val result = src_rdd.map 外面调用sqlContext.sql就是正常的。
我看你的sql结果应该是静态的,把这个值broadcast出去在map里访问就行,没必要用sqlContext
表创建了。
整体代码大体是这样的:
val conf = new SparkConf()
val sparkContext = new SparkContext(conf)
val sqlContext = new SQLContext(sparkContext)val srcTable = src.table(srcTbl).cache()
val ipTable = ip.table(ipTbl).cache()
val src_rdd = srcTable.select("type", "date", "ip").toDF("type", "date", "ip").cache()
val ip_rdd = ipTable.filter(ipTable("country") === 1).filter(ipTable("province") !== 0).select("start_ip", "end_ip", "province").cache()
ip_rdd.registerTempTable("ip_rdd")val result = src_rdd
.map(x => ({
var value :String = sqlContext.sql("select province from ip_rdd where start_ip <= 1032301638 and end_ip >= 1032301638 order by province limit 1" ).head().toString()
pvMap(x).concat(value)
pvMap(x)
}, 1))
.reduceByKey(_ + _)
这里在Map过程中调sqlContext,我print了一下,是空的。但是在val result = src_rdd.map 外面调用sqlContext.sql就是正常的。
我看你的sql结果应该是静态的,把这个值broadcast出去在map里访问就行,没必要用sqlContext
您好,sqlContext.sql里面的语句,我是贴了个demo,查询语句是有变量的:
“select province from ip_rdd where start_ip <= " + ipAsNumeric(pvStr(x)) + " and end_ip >= " + ipAsNumeric(pvStr(x)) + " order by province limit 1”
表创建了。
整体代码大体是这样的:
val conf = new SparkConf()
val sparkContext = new SparkContext(conf)
val sqlContext = new SQLContext(sparkContext)val srcTable = src.table(srcTbl).cache()
val ipTable = ip.table(ipTbl).cache()
val src_rdd = srcTable.select("type", "date", "ip").toDF("type", "date", "ip").cache()
val ip_rdd = ipTable.filter(ipTable("country") === 1).filter(ipTable("province") !== 0).select("start_ip", "end_ip", "province").cache()
ip_rdd.registerTempTable("ip_rdd")val result = src_rdd
.map(x => ({
var value :String = sqlContext.sql("select province from ip_rdd where start_ip <= 1032301638 and end_ip >= 1032301638 order by province limit 1" ).head().toString()
pvMap(x).concat(value)
pvMap(x)
}, 1))
.reduceByKey(_ + _)
这里在Map过程中调sqlContext,我print了一下,是空的。但是在val result = src_rdd.map 外面调用sqlContext.sql就是正常的。
我看你的sql结果应该是静态的,把这个值broadcast出去在map里访问就行,没必要用sqlContext
表创建了。
整体代码大体是这样的:
val conf = new SparkConf()
val sparkContext = new SparkContext(conf)
val sqlContext = new SQLContext(sparkContext)val srcTable = src.table(srcTbl).cache()
val ipTable = ip.table(ipTbl).cache()
val src_rdd = srcTable.select("type", "date", "ip").toDF("type", "date", "ip").cache()
val ip_rdd = ipTable.filter(ipTable("country") === 1).filter(ipTable("province") !== 0).select("start_ip", "end_ip", "province").cache()
ip_rdd.registerTempTable("ip_rdd")val result = src_rdd
.map(x => ({
var value :String = sqlContext.sql("select province from ip_rdd where start_ip <= 1032301638 and end_ip >= 1032301638 order by province limit 1" ).head().toString()
pvMap(x).concat(value)
pvMap(x)
}, 1))
.reduceByKey(_ + _)
这里在Map过程中调sqlContext,我print了一下,是空的。但是在val result = src_rdd.map 外面调用sqlContext.sql就是正常的。
我看你的sql结果应该是静态的,把这个值broadcast出去在map里访问就行,没必要用sqlContext
您好,sqlContext.sql里面的语句,我是贴了个demo,查询语句是有变量的:
“select province from ip_rdd where start_ip <= " + ipAsNumeric(pvStr(x)) + " and end_ip >= " + ipAsNumeric(pvStr(x)) + " order by province limit 1”
你这种情况应该用SQL去关联查询而不是map方法。把ipAsNumeric,pvStr函数注册为UDF就可以在SQLContext里访问了。