最近在自学scala,在项目里找到了连接spark的代码,想试试手用scala实现出来,却发现有点行不通,java和scala的方法似乎并不是一样的,我自己的scala也是半生不熟,求教各位大佬最后那个方法JavaRDD<String> u = gaaatem2.toJavaRDD().map()应该怎样用scala实现,在下感激不尽
public class Gaaa_Wide_Yarn { public static void main(String[] args) {
SparkConf sc = new SparkConf().setAppName("Gaaa_Wide_Yarn");
SparkContext jsc = new SparkContext(sc);
HiveContext hc = new HiveContext(jsc);
 String day = args[0];
     System.out.println("Gaaa_Wide_Yarn day:" + day);
 String gaaaSql=" select username,groupid,starttime,stoptime,timelen,octets,sourceno,nasip,"
  + " nasport,frameip,downreason "
  + " from shjs_wlpt.interface_aaag_wide_day "
  + " where day = '"+day+"'";
 String gaaaSql_1=" select m.cityname as area_name,m.citycrmid as area_code, username,groupid,starttime,stoptime,timelen,octets,sourceno,nasip,"
  + " nasport,frameip,downreason,'' as reserve1,'' as reserve2,'' as reserve3,'' as reserve4 "
  + " from gaaaSql t,usi_odso.itv_check_city m "
  + " where t.sourceno = m.cityid ";
 System.out.println("gaaaSql_1:"+gaaaSql_1);  DataFrame gaaatem1=hc.sql(gaaaSql);
 gaaatem1.registerTempTable("gaaaSql");
 DataFrame gaaatem2=hc.sql(gaaaSql_1);
 gaaatem2.registerTempTable("gaaaSql_1");
SimpleDateFormat df3 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
final String today = df3.format(new Date());
@SuppressWarnings("serial")
JavaRDD<String> u = gaaatem2.toJavaRDD().map(
new Function<Row, String>() {
public String call(Row row) {
String str = "";
str += today + "\001";//更新日期放首位,为后续加字段方便
for (int i = 0; i < row.length(); i++) {
str += row.getString(i) + "\001";
}
return str;
}
});
u.repartition(20).saveAsTextFile(args[1]);
}}

解决方案 »

  1.   

    Spark本身就是Scala写的,你用Scala的API就好了嘛,为啥要用Java的?
      

  2.   

    这是项目里面写好的java代码啊,我想用scala实现
      

  3.   

    这是项目里面写好的java代码啊,我想用scala实现
    真的哭了gaaatem2.rdd().map()这里出来的rdd就是scala源生的rdd啊
      

  4.   

    这是项目里面写好的java代码啊,我想用scala实现
    真的哭了gaaatem2.rdd().map()这里出来的rdd就是scala源生的rdd啊
    意思是说只要直接gaaatem2.rdd就行了吗,那里面那个对字符串处理的方法应该写在哪呢
      

  5.   

    你好,我顺手把你的java代码 改成了scala版,看是不是符合你想要的object Gaaa_Wide_Yarn {    def main(args: Array[String]): Unit = {
           val  sc  = new SparkConf().setAppName("Gaaa_Wide_Yarn")
           val  jsc = new  SparkContext(sc)
           val  hc = new HiveContext(jsc)
           val day = args(0)
           println("Gaaa_Wide_Yarn day:" + day)       val  gaaaSql  =" select username,groupid,starttime,stoptime,timelen,octets,sourceno,nasip,"
            + " nasport,frameip,downreason "
            + " from shjs_wlpt.interface_aaag_wide_day "
            + " where day = '"+day+"'";        val gaaaSql_1 = " select m.cityname as area_name,m.citycrmid as area_code, username,groupid,starttime,stoptime,timelen,octets,sourceno,nasip,"
            + " nasport,frameip,downreason,'' as reserve1,'' as reserve2,'' as reserve3,'' as reserve4 "
            + " from gaaaSql t,usi_odso.itv_check_city m "
            + " where t.sourceno = m.cityid ";        println("gaaaSql_1:"+gaaaSql_1)        val   gaaatem1=hc.sql(gaaaSql)
            gaaatem1.registerTempTable("gaaaSql")
            val gaaatem2=hc.sql(gaaaSql_1);
            gaaatem2.registerTempTable("gaaaSql_1");        val dfs3 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
            val  today =  dfs3.format(new Date())
             val u =  gaaatem2.map(
                 new Function[Row,String](){
                    def call(row:Row):String = {
                         val str =  ""
                         str += today + "\001"
                         for(i <- row.length){
                                str += row.getString(i)+"\001"
                         }
                        str
                    }
                 }
             )
                u.repartition(20).saveAsTextFile(args(1))
        }
    }