package com.gudeng.commerce.gd;import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.gudeng.commerce.gd.conf.MySQLConf;
import com.gudeng.commerce.gd.util.StockUtil;/**
 * FileStream实例
 * 可以获取指定目录下的 文件  以及子目录下的文件
 * @author Administrator
 *
 */
public class SparkStreamingOnHDFSFileStreamDemon {
private static Logger logger = LoggerFactory.getLogger(SparkStreamingOnHDFSFileStreamDemon.class);
private static MySQLConf mysql_stock = null;

public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingOnHDFSFileStreamDemon");
conf.set("spark.streaming.fileStream.minRememberDuration", "25920000s"); // 300 days
conf.set("spark.scheduler.mode", "FAIR");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
jssc.checkpoint("hdfs://10.17.1.215:9000/sparkStreaming/checkPoint"); 
mysql_stock = new MySQLConf("test");

JavaPairInputDStream<String, String> files = jssc.fileStream("hdfs://10.17.1.215:9000/stockTxt/*", String.class,
String.class, HtmlFileInputFormat.class, new Function<Path, Boolean>() {
private static final long serialVersionUID = 1700599882445141563L;
public Boolean call(Path path) throws Exception {
logger.info("\n files path.getName:-->" + path.getName());
return path.getName().endsWith("_COPYING_")?false:true;
}
}, false); JavaPairDStream<String, String> anotherFiles = files.mapValues(new Function<String, String>() {
private static final long serialVersionUID = 4785162568501250065L;
public String call(String fPath) throws Exception {
logger.info("\n anotherFiles fPath:-->" + fPath);
dealFile(fPath);
return fPath;
}
//处理单个文件数据
public void dealFile(String path) throws Exception{
SparkSession spark = SparkSession.builder()
.master("local[*]")
.appName(path)
.config("spark.scheduler.mode", "FAIR")
.config("spark.sql.warehouse.dir", "file:///e:/spark-warehouse")//本地调试
.getOrCreate();

Dataset<Row> lines = spark.read()
.format("CSV")
.option("header", "true")
.option("charset", "GBK")
.load("hdfs://10.17.1.215:9000"+path);//hdfs://mycluster/stockTxt/2017-06-16.xls

//数据转换成Bean
Dataset<DayDetails> lines1= lines.map(new MapFunction<Row,DayDetails>(){
private static final long serialVersionUID = -3035418593858989490L;
@Override
public DayDetails call(Row value) throws Exception {
//logger.info("\n Row :-->"+value.toString());
String[] s = value.toString().substring(1, value.toString().length()-1).split(" ");
DayDetails bean = new DayDetails();
bean.setDealTime(s[0]);
bean.setPrice(Double.parseDouble(s[1]));
bean.setPriceChage("--".equals(s[2])? 0D : Double.parseDouble(s[2]));
bean.setVol(Long.parseLong(s[3]));
bean.setAmt(Long.parseLong(s[4]));
bean.setType(StockUtil.getType(s[5]));

return bean;
}

}, Encoders.bean(DayDetails.class));

lines1.createOrReplaceTempView("dealDetails");
Dataset<Row> lines2 = spark.sql("select 1 as k, sum(amt) as amt ,sum(vol) as vol,count(1) as rows,max(price) as high,min(price) as low from dealDetails ");
Dataset<Row> lines3 = spark.sql("select 1 as k, sum(amt) as amt_0 ,sum(vol) as vol_0 from dealDetails where type=0");
Dataset<Row> lines4 = spark.sql("select 1 as k, sum(amt) as amt_1 ,sum(vol) as vol_1 from dealDetails where type=1");
Dataset<Row> lines5 = spark.sql("select 1 as k, sum(amt) as amt_2 ,sum(vol) as vol_2 from dealDetails where type=2");
lines2.createOrReplaceTempView("t2");
lines3.createOrReplaceTempView("t3");
lines4.createOrReplaceTempView("t4");
lines5.createOrReplaceTempView("t5");
StringBuilder sql = new StringBuilder();
sql.append("select *  ")
.append(" from  t2")
.append(" left join t3 on t2.k=t3.k")
.append(" left join t4 on t2.k=t4.k")
.append(" left join t5 on t2.k=t5.k")
.append(" ");
Dataset<Row> lines6 = spark.sql(sql.toString());
lines6.createOrReplaceTempView("t6");

StringBuilder sql2 = new StringBuilder();
String[] sd = StockUtil.getCodeTime(path);
sql2.append(" select amt,vol,rows,high,low")
.append(" ,amt_0,vol_0")
.append(" ,amt_1,vol_1")
.append(" ,amt_2,vol_2")
.append(" ,'"+sd[0]+"' as coder")
.append(" ,'"+sd[1]+"' as date")
.append(" ")
.append(" from t6");
Dataset<Row> lines7 = spark.sql(sql2.toString());
lines7.write().mode(SaveMode.Append).jdbc(mysql_stock.getJdbc(), "lineday2", mysql_stock.getProperties());
// lines2.show();
// lines3.show();
// lines4.show();
// lines5.show();
// lines6.show();
lines7.show();
lines7.dropDuplicates();
lines6.dropDuplicates();
lines5.dropDuplicates();
lines4.dropDuplicates();
lines3.dropDuplicates();
lines2.dropDuplicates();
lines1.dropDuplicates();
lines.dropDuplicates();


}

});// anotherFiles.foreachRDD((rdd, time) -> {
// JavaRDD<JavaRow> rowRDD = rdd.map(word -> {
// Dataset<Row> lines = spark.read()
// .format("CSV")
// .option("header", "true")
// .option("charset", "GBK")
// .load("hdfs://10.17.1.215:9000"+word._2);//hdfs://mycluster/stockTxt/2017-06-16.xls
//
// //数据转换成Bean
// Dataset<DayDetails> lines2 = lines.map(new MapFunction<Row,DayDetails>(){
// private static final long serialVersionUID = -3035418593858989490L;
// @Override
// public DayDetails call(Row value) throws Exception {
// logger.info("\n Row :-->"+value.toString());
// String[] s = value.toString().split(" ");
// DayDetails bean = new DayDetails();
// bean.setDealTime(s[0]);
// bean.setPrice(Double.parseDouble(s[1]));
// bean.setPriceChage("--".equals(s[2])? 0D : Double.parseDouble(s[2]));
// bean.setVol(Long.parseLong(s[3]));
// bean.setAmt(Long.parseLong(s[4]));
// bean.setType(StockUtil.getType(s[5]));
//
// return bean;
// }
//
// }, Encoders.bean(DayDetails.class));
//
// lines2.createTempView("dealDetails");
// Dataset<Row> lines4 = spark.sql("select sum(amt),sum(vol),type from dealDetails group by type");
// lines4.show();
// JavaRow jr = new JavaRow();
// jr.setWord(word._2);
//
// return jr;
//   });
//  Dataset<Row> wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class);
//  wordsDataFrame.createOrReplaceTempView("dayDetails");
//  Dataset<Row> wordCountsDataFrame =spark.sql("select * from dayDetails ");
//  wordCountsDataFrame.show();
//
// });


anotherFiles.print();
jssc.start();
jssc.awaitTermination();
jssc.close(); }}
使用spark2.2.1 streaming 处理hdfs目录csv文件
当文件超过5个 就不执行了,任务被挂起,不执行了,大神指点下 什么原因?