最近在自学scala,在项目里找到了连接spark的代码,想试试手用scala实现出来,却发现有点行不通,java和scala的方法似乎并不是一样的,我自己的scala也是半生不熟,求教各位大佬最后那个方法JavaRDD<String> u = gaaatem2.toJavaRDD().map()应该怎样用scala实现,在下感激不尽
public class Gaaa_Wide_Yarn { public static void main(String[] args) {
SparkConf sc = new SparkConf().setAppName("Gaaa_Wide_Yarn");
SparkContext jsc = new SparkContext(sc);
HiveContext hc = new HiveContext(jsc);
String day = args[0];
System.out.println("Gaaa_Wide_Yarn day:" + day);
String gaaaSql=" select username,groupid,starttime,stoptime,timelen,octets,sourceno,nasip,"
+ " nasport,frameip,downreason "
+ " from shjs_wlpt.interface_aaag_wide_day "
+ " where day = '"+day+"'";
String gaaaSql_1=" select m.cityname as area_name,m.citycrmid as area_code, username,groupid,starttime,stoptime,timelen,octets,sourceno,nasip,"
+ " nasport,frameip,downreason,'' as reserve1,'' as reserve2,'' as reserve3,'' as reserve4 "
+ " from gaaaSql t,usi_odso.itv_check_city m "
+ " where t.sourceno = m.cityid ";
System.out.println("gaaaSql_1:"+gaaaSql_1); DataFrame gaaatem1=hc.sql(gaaaSql);
gaaatem1.registerTempTable("gaaaSql");
DataFrame gaaatem2=hc.sql(gaaaSql_1);
gaaatem2.registerTempTable("gaaaSql_1");
SimpleDateFormat df3 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
final String today = df3.format(new Date());
@SuppressWarnings("serial")
JavaRDD<String> u = gaaatem2.toJavaRDD().map(
new Function<Row, String>() {
public String call(Row row) {
String str = "";
str += today + "\001";//更新日期放首位,为后续加字段方便
for (int i = 0; i < row.length(); i++) {
str += row.getString(i) + "\001";
}
return str;
}
});
u.repartition(20).saveAsTextFile(args[1]);
}}
public class Gaaa_Wide_Yarn { public static void main(String[] args) {
SparkConf sc = new SparkConf().setAppName("Gaaa_Wide_Yarn");
SparkContext jsc = new SparkContext(sc);
HiveContext hc = new HiveContext(jsc);
String day = args[0];
System.out.println("Gaaa_Wide_Yarn day:" + day);
String gaaaSql=" select username,groupid,starttime,stoptime,timelen,octets,sourceno,nasip,"
+ " nasport,frameip,downreason "
+ " from shjs_wlpt.interface_aaag_wide_day "
+ " where day = '"+day+"'";
String gaaaSql_1=" select m.cityname as area_name,m.citycrmid as area_code, username,groupid,starttime,stoptime,timelen,octets,sourceno,nasip,"
+ " nasport,frameip,downreason,'' as reserve1,'' as reserve2,'' as reserve3,'' as reserve4 "
+ " from gaaaSql t,usi_odso.itv_check_city m "
+ " where t.sourceno = m.cityid ";
System.out.println("gaaaSql_1:"+gaaaSql_1); DataFrame gaaatem1=hc.sql(gaaaSql);
gaaatem1.registerTempTable("gaaaSql");
DataFrame gaaatem2=hc.sql(gaaaSql_1);
gaaatem2.registerTempTable("gaaaSql_1");
SimpleDateFormat df3 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
final String today = df3.format(new Date());
@SuppressWarnings("serial")
JavaRDD<String> u = gaaatem2.toJavaRDD().map(
new Function<Row, String>() {
public String call(Row row) {
String str = "";
str += today + "\001";//更新日期放首位,为后续加字段方便
for (int i = 0; i < row.length(); i++) {
str += row.getString(i) + "\001";
}
return str;
}
});
u.repartition(20).saveAsTextFile(args[1]);
}}
解决方案 »
- 求助!如何自定义一个nova命令
- 无法attach volume to instance
- 谷歌高管:云计算价格战才刚刚打响
- 请教一下域名解析的问题, 比如 已经有域名为 blog.com, 想在 虚拟主机(ip: a.b.c.d) 上搭两个博客, 希望最终效果是 blog1.bl
- 云计算开发学习?
- AWS 选择Ubnutu,安装好nginx后,无法用Amazon 给的公用DNS,IP 访问
- 自己弄个服务器,目的是linux,redis,mongodb,mysql等学习,2000左右的预算
- docker service 无法使用自定义的overlay网络 提示无法发现overlay网络 怎么破???
- Spark的RDD转换成DataFrame问题
- 一个入门级的问题,关于主机操作系统
- Beam 源码编译出现问题,希望大家共同讨论一下
- 求大神帮我看下这个是个什么情况
真的哭了gaaatem2.rdd().map()这里出来的rdd就是scala源生的rdd啊
真的哭了gaaatem2.rdd().map()这里出来的rdd就是scala源生的rdd啊
意思是说只要直接gaaatem2.rdd就行了吗,那里面那个对字符串处理的方法应该写在哪呢
val sc = new SparkConf().setAppName("Gaaa_Wide_Yarn")
val jsc = new SparkContext(sc)
val hc = new HiveContext(jsc)
val day = args(0)
println("Gaaa_Wide_Yarn day:" + day) val gaaaSql =" select username,groupid,starttime,stoptime,timelen,octets,sourceno,nasip,"
+ " nasport,frameip,downreason "
+ " from shjs_wlpt.interface_aaag_wide_day "
+ " where day = '"+day+"'"; val gaaaSql_1 = " select m.cityname as area_name,m.citycrmid as area_code, username,groupid,starttime,stoptime,timelen,octets,sourceno,nasip,"
+ " nasport,frameip,downreason,'' as reserve1,'' as reserve2,'' as reserve3,'' as reserve4 "
+ " from gaaaSql t,usi_odso.itv_check_city m "
+ " where t.sourceno = m.cityid "; println("gaaaSql_1:"+gaaaSql_1) val gaaatem1=hc.sql(gaaaSql)
gaaatem1.registerTempTable("gaaaSql")
val gaaatem2=hc.sql(gaaaSql_1);
gaaatem2.registerTempTable("gaaaSql_1"); val dfs3 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val today = dfs3.format(new Date())
val u = gaaatem2.map(
new Function[Row,String](){
def call(row:Row):String = {
val str = ""
str += today + "\001"
for(i <- row.length){
str += row.getString(i)+"\001"
}
str
}
}
)
u.repartition(20).saveAsTextFile(args(1))
}
}