上代码:
val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("testApp"))
val sqlContext = new SQLContext(sc)
implicit val region = Region.CN_NORTH_1
val tempS3Dir = "s3a://redshift-test/RedShift/red/" //设置S3链接信息
sqlContext.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "AKIA3ZwewewewCHYE");
sqlContext.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "wg2mPMDNtcqeweweweweCSu7Q+JJHNPT2O");
sqlContext.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-north-1.amazonaws.com.cn");
sqlContext.setConf("driver","com.amazon.redshift.jdbc4.Driver")
val dataDF=sqlContext.read
.format("csv")
.option("header",true)
.load("s3a://redshift-test/RedShift/out/test0.csv") //读取表数据
val test_union = sqlContext.read
.format("jdbc")
.option("url", jdbcURL)
.option("dbtable", "test_union")
.load()
//dataDF有但是test_union没有的数据
val data = dataDF.except(test_union)
data.show()
data.write
.mode(SaveMode.Overwrite) // Overwrite表示重新加载
.option("header",true)
.jdbc(jdbcURL, "test_test", new Properties) sc.stop()
执行很慢,一直在转。为什么呢?val data = dataDF.except(test_union)应该是这部的问题,但不知怎么办
val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("testApp"))
val sqlContext = new SQLContext(sc)
implicit val region = Region.CN_NORTH_1
val tempS3Dir = "s3a://redshift-test/RedShift/red/" //设置S3链接信息
sqlContext.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "AKIA3ZwewewewCHYE");
sqlContext.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "wg2mPMDNtcqeweweweweCSu7Q+JJHNPT2O");
sqlContext.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-north-1.amazonaws.com.cn");
sqlContext.setConf("driver","com.amazon.redshift.jdbc4.Driver")
val dataDF=sqlContext.read
.format("csv")
.option("header",true)
.load("s3a://redshift-test/RedShift/out/test0.csv") //读取表数据
val test_union = sqlContext.read
.format("jdbc")
.option("url", jdbcURL)
.option("dbtable", "test_union")
.load()
//dataDF有但是test_union没有的数据
val data = dataDF.except(test_union)
data.show()
data.write
.mode(SaveMode.Overwrite) // Overwrite表示重新加载
.option("header",true)
.jdbc(jdbcURL, "test_test", new Properties) sc.stop()
执行很慢,一直在转。为什么呢?val data = dataDF.except(test_union)应该是这部的问题,但不知怎么办
解决方案 »
免费领取超大流量手机卡,每月29元包185G流量+100分钟通话, 中国电信官方发货