现有1个大表A(ip,value)
ip是ip地址转换的10进制Double数值1个小表B(beginip,lastip,label)
beginip至lastip标识了一段ip地址
label表示这段ip地址归属的运营商(如:移动,联通,电信)
 
需要在大表的map阶段做join,结果将ip地址转换成ip地址归属的运营商,请教spark scala的程序实现代码,谢谢!

解决方案 »

  1.   

        val broadCastMap = sc.broadcast(iptable1)
        val sighttp1 = sighttp.map(x => (x.iremoteipv4,x)).mapPartitions({ iter =>
            val m=broadCastMap.value
            var f1="NULL"
            for {
              (k,v) <- iter
              val f=m.where(k+">=fstip and lastip>="+k)
              if (f.count()>0)
                f1 = m.where(k + ">=fstip and lastip>=" + k).first.get(2).toString
        } yield (f1,v)
        })
    算法实现了,不知道是不是最优的,输出的结果就是能匹配到的行,匹配不到的行不输出。
      

  2.   

    为什么不直接用SQLContext
      

  3.   


    val sqlCtx = new SQLContext(sc)
    sqlCtx.read().jdbc(xxx).registerTempTable("t_a")
    sqlCtx.read().jdbc(xxx).registerTempTable("t_b")val res = sqlCtx.sql(" SELECT a.ip , b.label  FROM t_a a JOIN t_b b ON a.ip BETWEEN b.beginip AND b.endip ")
    res.show(100)
      

  4.   

    感谢 link0007的建议!我用你的方法实现了。
    但是我的大表很大,有几十亿行,小表就几万行,这种情况在reduce-side join运行非常慢,所以我想map-side join效率会高一些。我的源码如下,在spark-shell --master yarn-client集群运行,报NullPointerException,不知道是什么原因?出异常的行是val f = m.where(k + ">=fstip and lastip>=" + k) ,如果这行改成f=“test”这样的常量就能正常跑通。import org.apache.spark.sql.hive.HiveContext 
    import org.apache.spark.SparkContext 
    import org.apache.spark.sql.SQLContext object test1 { 
    case class httpxdr(localipv4:String,remoteipv4:Double,host:String,wholeurl:String) 
    case class iptable(fstip:Double,lastip:Double,label:String) 
    def main(args: Array[String]) { 
    if (args.length != 1 ){ 
    println("usage is ebda <master> <input> <output>") 
    return 

    val sc = new SparkContext(args(0), "ebda-spark", System.getenv("SPARK_HOME")) 
    val hiveCtx = new HiveContext(sc) 
    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._ 
    val sighttp = hiveCtx.sql("SELECT * FROM httpnew where reportdate='20160510'").map(h => httpxdr(h(1).toString(),h(2).toString().split("\\.")(0).toDouble*1000000000+h(2).toString().split("\\.")(1).toDouble*1000000+h(2).toString().split("\\.")(2).toDouble*1000+h(2).toString().split("\\.")(3).toDouble,h(3).toString(),h(4).toString())) 
    val iptable1 = hiveCtx.sql("select * from configuration.wwipdsttable").map(ip => iptable(ip(0).toString().toDouble,ip(1).toString().toDouble,ip(3).toString())).toDF() 
    val broadCastMap = sc.broadcast(iptable1) 
    val sighttp1 = sighttp.map(line => (line.iremoteipv4,line)).mapPartitions({ iter => 
    var f1="NULL" 
    val m=broadCastMap.value 
    for { 
    (k,v) <- iter 
    val f = m.where(k + ">=fstip and lastip>=" + k) 
    if (f.count>0) 
    f1=f.first.get(2).toString 
    } yield (f1,v) 
    }).toDF() 
    sighttp1.show(10) 

    }
      

  5.   

    broadcast(RDD.collect())就不报空指针错误了。但是data frame执行了collect后,data frame的where 等算子就无法使用了,broadcast的对象难道不支持data frame么?
      

  6.   

    我觉得还有进一步从算法上优化的余地。
    现在你小表的ip都是范围数据,如果转换成C类或者B类ip网段的穷举会有多大?估计不会是天文数字吧。
    这样就把范围轮询转化成了map/reduce最擅长的等值映射了,估计速度会快几个数量级。