现有1个大表A(ip,value)
ip是ip地址转换的10进制Double数值1个小表B(beginip,lastip,label)
beginip至lastip标识了一段ip地址
label表示这段ip地址归属的运营商(如:移动,联通,电信)
需要在大表的map阶段做join,结果将ip地址转换成ip地址归属的运营商,请教spark scala的程序实现代码,谢谢!
ip是ip地址转换的10进制Double数值1个小表B(beginip,lastip,label)
beginip至lastip标识了一段ip地址
label表示这段ip地址归属的运营商(如:移动,联通,电信)
需要在大表的map阶段做join,结果将ip地址转换成ip地址归属的运营商,请教spark scala的程序实现代码,谢谢!
解决方案 »
- 【每日译帖】[Cinder]能改变备份文件卷的大小吗?
- 大数据时代的机遇和挑战?
- 女硕士踏入云计算领域,前景讨论
- 在centos当中去部署PHP语言环境
- docker DOCKER_OPTS 配置文件一般在哪个路径下的
- network interface的security group和ec2的security group是什么关系?
- 如何用GraphX实现二跳邻居数统计
- 国产服务器虚拟化软件比较好的有哪些?
- sqoop从hdfs导出parquet到oracle报错,文本文件导出正常,
- Aws 磁盘共享的问题
- 都说Docker能给开发和运维带来很大好处,我实在不理解,请教..
- DELL服务器,主板上有红灯,无法开机,显示器点不亮,求指点给思路
val sighttp1 = sighttp.map(x => (x.iremoteipv4,x)).mapPartitions({ iter =>
val m=broadCastMap.value
var f1="NULL"
for {
(k,v) <- iter
val f=m.where(k+">=fstip and lastip>="+k)
if (f.count()>0)
f1 = m.where(k + ">=fstip and lastip>=" + k).first.get(2).toString
} yield (f1,v)
})
算法实现了,不知道是不是最优的,输出的结果就是能匹配到的行,匹配不到的行不输出。
val sqlCtx = new SQLContext(sc)
sqlCtx.read().jdbc(xxx).registerTempTable("t_a")
sqlCtx.read().jdbc(xxx).registerTempTable("t_b")val res = sqlCtx.sql(" SELECT a.ip , b.label FROM t_a a JOIN t_b b ON a.ip BETWEEN b.beginip AND b.endip ")
res.show(100)
但是我的大表很大,有几十亿行,小表就几万行,这种情况在reduce-side join运行非常慢,所以我想map-side join效率会高一些。我的源码如下,在spark-shell --master yarn-client集群运行,报NullPointerException,不知道是什么原因?出异常的行是val f = m.where(k + ">=fstip and lastip>=" + k) ,如果这行改成f=“test”这样的常量就能正常跑通。import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext object test1 {
case class httpxdr(localipv4:String,remoteipv4:Double,host:String,wholeurl:String)
case class iptable(fstip:Double,lastip:Double,label:String)
def main(args: Array[String]) {
if (args.length != 1 ){
println("usage is ebda <master> <input> <output>")
return
}
val sc = new SparkContext(args(0), "ebda-spark", System.getenv("SPARK_HOME"))
val hiveCtx = new HiveContext(sc)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val sighttp = hiveCtx.sql("SELECT * FROM httpnew where reportdate='20160510'").map(h => httpxdr(h(1).toString(),h(2).toString().split("\\.")(0).toDouble*1000000000+h(2).toString().split("\\.")(1).toDouble*1000000+h(2).toString().split("\\.")(2).toDouble*1000+h(2).toString().split("\\.")(3).toDouble,h(3).toString(),h(4).toString()))
val iptable1 = hiveCtx.sql("select * from configuration.wwipdsttable").map(ip => iptable(ip(0).toString().toDouble,ip(1).toString().toDouble,ip(3).toString())).toDF()
val broadCastMap = sc.broadcast(iptable1)
val sighttp1 = sighttp.map(line => (line.iremoteipv4,line)).mapPartitions({ iter =>
var f1="NULL"
val m=broadCastMap.value
for {
(k,v) <- iter
val f = m.where(k + ">=fstip and lastip>=" + k)
if (f.count>0)
f1=f.first.get(2).toString
} yield (f1,v)
}).toDF()
sighttp1.show(10)
}
}
现在你小表的ip都是范围数据,如果转换成C类或者B类ip网段的穷举会有多大?估计不会是天文数字吧。
这样就把范围轮询转化成了map/reduce最擅长的等值映射了,估计速度会快几个数量级。