请教一下:   
val sqlContext = new SQLContext(sparkContext)
val result = rdd
     .map(x => ({
     var value :String = sqlContext.sql("select id from table")
     }, 1)
在rdd的map里面,sqlContext输出是空的? 求教下原因?多谢啦~~

解决方案 »

  1.   

    val sqlContext = new SQLContext(sparkContext)
    val result = rdd
         .map(x => ({
         var value :String = sqlContext.sql("select id from table")
         }, 1)
    你这个table创建了吗?
      

  2.   

    表创建了。
    整体代码大体是这样的:
    val conf = new SparkConf()
    val sparkContext = new SparkContext(conf)
    val sqlContext = new SQLContext(sparkContext)val srcTable = src.table(srcTbl).cache()
    val ipTable = ip.table(ipTbl).cache()
            
    val src_rdd = srcTable.select("type", "date", "ip").toDF("type", "date", "ip").cache()
    val ip_rdd = ipTable.filter(ipTable("country") === 1).filter(ipTable("province") !== 0).select("start_ip", "end_ip", "province").cache()
    ip_rdd.registerTempTable("ip_rdd")val result = src_rdd
    .map(x => ({
      var value :String = sqlContext.sql("select province from ip_rdd where start_ip <= 1032301638 and end_ip >= 1032301638 order by province limit 1" ).head().toString()
      pvMap(x).concat(value)
      pvMap(x)
      }, 1))
    .reduceByKey(_ + _)
    这里在Map过程中调sqlContext,我print了一下,是空的。但是在val result = src_rdd.map  外面调用sqlContext.sql就是正常的。
      

  3.   


    表创建了。
    整体代码大体是这样的:
    val conf = new SparkConf()
    val sparkContext = new SparkContext(conf)
    val sqlContext = new SQLContext(sparkContext)val srcTable = src.table(srcTbl).cache()
    val ipTable = ip.table(ipTbl).cache()
            
    val src_rdd = srcTable.select("type", "date", "ip").toDF("type", "date", "ip").cache()
    val ip_rdd = ipTable.filter(ipTable("country") === 1).filter(ipTable("province") !== 0).select("start_ip", "end_ip", "province").cache()
    ip_rdd.registerTempTable("ip_rdd")val result = src_rdd
    .map(x => ({
      var value :String = sqlContext.sql("select province from ip_rdd where start_ip <= 1032301638 and end_ip >= 1032301638 order by province limit 1" ).head().toString()
      pvMap(x).concat(value)
      pvMap(x)
      }, 1))
    .reduceByKey(_ + _)
    这里在Map过程中调sqlContext,我print了一下,是空的。但是在val result = src_rdd.map  外面调用sqlContext.sql就是正常的。
      

  4.   


    表创建了。
    整体代码大体是这样的:
    val conf = new SparkConf()
    val sparkContext = new SparkContext(conf)
    val sqlContext = new SQLContext(sparkContext)val srcTable = src.table(srcTbl).cache()
    val ipTable = ip.table(ipTbl).cache()
            
    val src_rdd = srcTable.select("type", "date", "ip").toDF("type", "date", "ip").cache()
    val ip_rdd = ipTable.filter(ipTable("country") === 1).filter(ipTable("province") !== 0).select("start_ip", "end_ip", "province").cache()
    ip_rdd.registerTempTable("ip_rdd")val result = src_rdd
    .map(x => ({
      var value :String = sqlContext.sql("select province from ip_rdd where start_ip <= 1032301638 and end_ip >= 1032301638 order by province limit 1" ).head().toString()
      pvMap(x).concat(value)
      pvMap(x)
      }, 1))
    .reduceByKey(_ + _)
    这里在Map过程中调sqlContext,我print了一下,是空的。但是在val result = src_rdd.map  外面调用sqlContext.sql就是正常的。
    我看你的sql结果应该是静态的,把这个值broadcast出去在map里访问就行,没必要用sqlContext
      

  5.   


    表创建了。
    整体代码大体是这样的:
    val conf = new SparkConf()
    val sparkContext = new SparkContext(conf)
    val sqlContext = new SQLContext(sparkContext)val srcTable = src.table(srcTbl).cache()
    val ipTable = ip.table(ipTbl).cache()
            
    val src_rdd = srcTable.select("type", "date", "ip").toDF("type", "date", "ip").cache()
    val ip_rdd = ipTable.filter(ipTable("country") === 1).filter(ipTable("province") !== 0).select("start_ip", "end_ip", "province").cache()
    ip_rdd.registerTempTable("ip_rdd")val result = src_rdd
    .map(x => ({
      var value :String = sqlContext.sql("select province from ip_rdd where start_ip <= 1032301638 and end_ip >= 1032301638 order by province limit 1" ).head().toString()
      pvMap(x).concat(value)
      pvMap(x)
      }, 1))
    .reduceByKey(_ + _)
    这里在Map过程中调sqlContext,我print了一下,是空的。但是在val result = src_rdd.map  外面调用sqlContext.sql就是正常的。
    我看你的sql结果应该是静态的,把这个值broadcast出去在map里访问就行,没必要用sqlContext
    表创建了。
    整体代码大体是这样的:
    val conf = new SparkConf()
    val sparkContext = new SparkContext(conf)
    val sqlContext = new SQLContext(sparkContext)val srcTable = src.table(srcTbl).cache()
    val ipTable = ip.table(ipTbl).cache()
            
    val src_rdd = srcTable.select("type", "date", "ip").toDF("type", "date", "ip").cache()
    val ip_rdd = ipTable.filter(ipTable("country") === 1).filter(ipTable("province") !== 0).select("start_ip", "end_ip", "province").cache()
    ip_rdd.registerTempTable("ip_rdd")val result = src_rdd
    .map(x => ({
      var value :String = sqlContext.sql("select province from ip_rdd where start_ip <= 1032301638 and end_ip >= 1032301638 order by province limit 1" ).head().toString()
      pvMap(x).concat(value)
      pvMap(x)
      }, 1))
    .reduceByKey(_ + _)
    这里在Map过程中调sqlContext,我print了一下,是空的。但是在val result = src_rdd.map  外面调用sqlContext.sql就是正常的。
    我看你的sql结果应该是静态的,把这个值broadcast出去在map里访问就行,没必要用sqlContext
    您好,sqlContext.sql里面的语句,我是贴了个demo,查询语句是有变量的:
    “select province from ip_rdd where start_ip <= " + ipAsNumeric(pvStr(x)) + " and end_ip >= " + ipAsNumeric(pvStr(x)) + " order by province limit 1”
     
      

  6.   


    表创建了。
    整体代码大体是这样的:
    val conf = new SparkConf()
    val sparkContext = new SparkContext(conf)
    val sqlContext = new SQLContext(sparkContext)val srcTable = src.table(srcTbl).cache()
    val ipTable = ip.table(ipTbl).cache()
            
    val src_rdd = srcTable.select("type", "date", "ip").toDF("type", "date", "ip").cache()
    val ip_rdd = ipTable.filter(ipTable("country") === 1).filter(ipTable("province") !== 0).select("start_ip", "end_ip", "province").cache()
    ip_rdd.registerTempTable("ip_rdd")val result = src_rdd
    .map(x => ({
      var value :String = sqlContext.sql("select province from ip_rdd where start_ip <= 1032301638 and end_ip >= 1032301638 order by province limit 1" ).head().toString()
      pvMap(x).concat(value)
      pvMap(x)
      }, 1))
    .reduceByKey(_ + _)
    这里在Map过程中调sqlContext,我print了一下,是空的。但是在val result = src_rdd.map  外面调用sqlContext.sql就是正常的。
    我看你的sql结果应该是静态的,把这个值broadcast出去在map里访问就行,没必要用sqlContext
    表创建了。
    整体代码大体是这样的:
    val conf = new SparkConf()
    val sparkContext = new SparkContext(conf)
    val sqlContext = new SQLContext(sparkContext)val srcTable = src.table(srcTbl).cache()
    val ipTable = ip.table(ipTbl).cache()
            
    val src_rdd = srcTable.select("type", "date", "ip").toDF("type", "date", "ip").cache()
    val ip_rdd = ipTable.filter(ipTable("country") === 1).filter(ipTable("province") !== 0).select("start_ip", "end_ip", "province").cache()
    ip_rdd.registerTempTable("ip_rdd")val result = src_rdd
    .map(x => ({
      var value :String = sqlContext.sql("select province from ip_rdd where start_ip <= 1032301638 and end_ip >= 1032301638 order by province limit 1" ).head().toString()
      pvMap(x).concat(value)
      pvMap(x)
      }, 1))
    .reduceByKey(_ + _)
    这里在Map过程中调sqlContext,我print了一下,是空的。但是在val result = src_rdd.map  外面调用sqlContext.sql就是正常的。
    我看你的sql结果应该是静态的,把这个值broadcast出去在map里访问就行,没必要用sqlContext
    您好,sqlContext.sql里面的语句,我是贴了个demo,查询语句是有变量的:
    “select province from ip_rdd where start_ip <= " + ipAsNumeric(pvStr(x)) + " and end_ip >= " + ipAsNumeric(pvStr(x)) + " order by province limit 1”
     
    你这种情况应该用SQL去关联查询而不是map方法。把ipAsNumeric,pvStr函数注册为UDF就可以在SQLContext里访问了。