由于处理数据的需求,我写了一个简单的spark应用处理数据,本想着处理速度应该有很大的提高,然而结果令我难以接受!1 spark集群运行条件:3个work节点,Hdfs文件管理系统,数据输入2.5G左右,运行时间大约8分钟。
spark应用程序如下:package examples;import java.util.List;
import java.util.regex.Pattern;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;
import scala.Tuple6;import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;public class Compression { private static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) throws Exception {
if (args.length < 4) {
System.err.println("Usage: Compression <file> <interval> <slice> <type>");
System.exit(1);
}
final Integer interval = Integer.valueOf(args[1]);
SparkConf conf=new SparkConf().setAppName("Compression"+args[3]);
JavaSparkContext ctx=new JavaSparkContext(conf);
JavaRDD<String> lines = ctx.textFile(args[0]); JavaPairRDD<Integer, Tuple2<Integer, Double>> key_v = lines.mapToPair(
new PairFunction<String, Integer,Tuple2<Integer, Double>>() {
@Override
public Tuple2<Integer, Tuple2<Integer, Double>> call(String s) {
String[] x = SPACE.split(s);
Integer order = Integer.valueOf(x[0]);
Integer k = order/interval;
Tuple2<Integer, Double> v = new Tuple2<Integer, Double>(order%interval, Double.valueOf(x[1]));
return new Tuple2<Integer,Tuple2<Integer, Double>>(k,v);
}
}); JavaPairRDD<Integer, Iterable<Tuple2<Integer, Double>>> segments = key_v.groupByKey();
//JavaPairRDD<Integer, Iterable<Tuple2<Integer, Double>>> segments = key_v.distinct();
JavaPairRDD<Integer, Tuple6<Double, Double, Double, Double, Double, Double>> compressdata = segments.mapValues(
new Function<Iterable<Tuple2<Integer, Double>>, Tuple6<Double, Double, Double, Double, Double, Double>>() {
@Override
public Tuple6<Double, Double, Double, Double, Double, Double> call(Iterable<Tuple2<Integer, Double>> list) throws Exception {
// TODO Auto-generated method stub
Double max = Double.MIN_VALUE;
Double min = Double.MAX_VALUE;
Double total = 0.0;
Double start = 0.0;
Double end = 0.0;
Double avg = 0.0;
Double s = 0.0;
Integer len = 0;
for (Tuple2<Integer, Double> v : list) {
len ++;
if(v._1.equals(0)) start = v._2;
total += v._2;
if(v._2 > max) max = v._2;
if(v._2 < min) min = v._2;
}
avg = total / len;
Double temp = 0.0;
len -= 1;
for (Tuple2<Integer, Double> v : list) {
if(v._1.equals(len))end = v._2;
temp += (v._2 - avg)*(v._2 - avg);
}
s = Math.sqrt( temp / len);
return new Tuple6<Double, Double, Double, Double, Double, Double>(start, end, max, min, avg, s);
}
});
compressdata.saveAsHadoopFile("/SparkTest/Compression/result"+args[3], Text.class, IntWritable.class, TextOutputFormat.class);
System.exit(0);
}}
2 单机运行,时间大约在2分钟左右
package deal;import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;import javax.xml.crypto.Data;public class Compression { public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
if(args.length < 3)
{
System.out.println("In,out,interval\n");
System.exit(0);
}
File record = new File("records.txt");
BufferedWriter rw = new BufferedWriter(new FileWriter(record,true));
Date dates=new Date();
DateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String timestart=format.format(dates);
File filer = new File(args[0]);
File filew = new File(args[1]);
Integer interval = Integer.valueOf(args[2]);
BufferedReader reader = null;
BufferedWriter writer = null;
try {
reader = new BufferedReader(new FileReader(filer));
writer = new BufferedWriter(new FileWriter(filew));
String tempString = null;
int line = 0;
onedata one = null;
while ((tempString = reader.readLine()) != null) {
String[] lines = tempString.split(" ");
if(line%interval == 0)
{
if(one!=null)
{
one.calculate();
writer.write(one.ToString());
}
one = new onedata(line/interval);
}
one.list.add(Double.valueOf(tempString.split(" ")[1]));
line++;
}
reader.close();
writer.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e1) {
}
}
}
Date datee=new Date();
String timeend=format.format(datee);
long cha = datee.getTime() - dates.getTime();
rw.write(args[0]+"\t"+timestart+"\t"+timeend+"\t"+cha+"\n");
rw.close();
}}
class onedata{
public ArrayList<Double> list = new ArrayList<Double>();
Double max = Double.MIN_VALUE;
Double min = Double.MAX_VALUE;
Double start = 0.0;
Double end = 0.0;
Double avg = 0.0;
Double s = 0.0;
Integer order = 0;
public onedata(Integer o) {
// TODO Auto-generated constructor stub
order = o;
}
public void calculate() {
Double sum = 0.0;
start = list.get(0);
end = list.get(list.size()-1);
for (Double double1 : list) {
sum += double1;
if(max < double1) max = double1;
if (min > double1) min = double1;
}
avg = sum / list.size();
Double temp = 0.0;
for (Double double1 : list) {
temp += (double1 - avg)*(double1 - avg);
}
s = Math.sqrt( temp / list.size());
}
public String ToString() {
return order+"\t("+start+","+end+","+max+","+min+","+avg+","+s+")\n";
}
}
求各位大牛解释为啥spark集群运行时消耗的时间比单机的时候多那么多?谢谢!
spark应用程序如下:package examples;import java.util.List;
import java.util.regex.Pattern;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;
import scala.Tuple6;import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;public class Compression { private static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) throws Exception {
if (args.length < 4) {
System.err.println("Usage: Compression <file> <interval> <slice> <type>");
System.exit(1);
}
final Integer interval = Integer.valueOf(args[1]);
SparkConf conf=new SparkConf().setAppName("Compression"+args[3]);
JavaSparkContext ctx=new JavaSparkContext(conf);
JavaRDD<String> lines = ctx.textFile(args[0]); JavaPairRDD<Integer, Tuple2<Integer, Double>> key_v = lines.mapToPair(
new PairFunction<String, Integer,Tuple2<Integer, Double>>() {
@Override
public Tuple2<Integer, Tuple2<Integer, Double>> call(String s) {
String[] x = SPACE.split(s);
Integer order = Integer.valueOf(x[0]);
Integer k = order/interval;
Tuple2<Integer, Double> v = new Tuple2<Integer, Double>(order%interval, Double.valueOf(x[1]));
return new Tuple2<Integer,Tuple2<Integer, Double>>(k,v);
}
}); JavaPairRDD<Integer, Iterable<Tuple2<Integer, Double>>> segments = key_v.groupByKey();
//JavaPairRDD<Integer, Iterable<Tuple2<Integer, Double>>> segments = key_v.distinct();
JavaPairRDD<Integer, Tuple6<Double, Double, Double, Double, Double, Double>> compressdata = segments.mapValues(
new Function<Iterable<Tuple2<Integer, Double>>, Tuple6<Double, Double, Double, Double, Double, Double>>() {
@Override
public Tuple6<Double, Double, Double, Double, Double, Double> call(Iterable<Tuple2<Integer, Double>> list) throws Exception {
// TODO Auto-generated method stub
Double max = Double.MIN_VALUE;
Double min = Double.MAX_VALUE;
Double total = 0.0;
Double start = 0.0;
Double end = 0.0;
Double avg = 0.0;
Double s = 0.0;
Integer len = 0;
for (Tuple2<Integer, Double> v : list) {
len ++;
if(v._1.equals(0)) start = v._2;
total += v._2;
if(v._2 > max) max = v._2;
if(v._2 < min) min = v._2;
}
avg = total / len;
Double temp = 0.0;
len -= 1;
for (Tuple2<Integer, Double> v : list) {
if(v._1.equals(len))end = v._2;
temp += (v._2 - avg)*(v._2 - avg);
}
s = Math.sqrt( temp / len);
return new Tuple6<Double, Double, Double, Double, Double, Double>(start, end, max, min, avg, s);
}
});
compressdata.saveAsHadoopFile("/SparkTest/Compression/result"+args[3], Text.class, IntWritable.class, TextOutputFormat.class);
System.exit(0);
}}
2 单机运行,时间大约在2分钟左右
package deal;import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;import javax.xml.crypto.Data;public class Compression { public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
if(args.length < 3)
{
System.out.println("In,out,interval\n");
System.exit(0);
}
File record = new File("records.txt");
BufferedWriter rw = new BufferedWriter(new FileWriter(record,true));
Date dates=new Date();
DateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String timestart=format.format(dates);
File filer = new File(args[0]);
File filew = new File(args[1]);
Integer interval = Integer.valueOf(args[2]);
BufferedReader reader = null;
BufferedWriter writer = null;
try {
reader = new BufferedReader(new FileReader(filer));
writer = new BufferedWriter(new FileWriter(filew));
String tempString = null;
int line = 0;
onedata one = null;
while ((tempString = reader.readLine()) != null) {
String[] lines = tempString.split(" ");
if(line%interval == 0)
{
if(one!=null)
{
one.calculate();
writer.write(one.ToString());
}
one = new onedata(line/interval);
}
one.list.add(Double.valueOf(tempString.split(" ")[1]));
line++;
}
reader.close();
writer.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e1) {
}
}
}
Date datee=new Date();
String timeend=format.format(datee);
long cha = datee.getTime() - dates.getTime();
rw.write(args[0]+"\t"+timestart+"\t"+timeend+"\t"+cha+"\n");
rw.close();
}}
class onedata{
public ArrayList<Double> list = new ArrayList<Double>();
Double max = Double.MIN_VALUE;
Double min = Double.MAX_VALUE;
Double start = 0.0;
Double end = 0.0;
Double avg = 0.0;
Double s = 0.0;
Integer order = 0;
public onedata(Integer o) {
// TODO Auto-generated constructor stub
order = o;
}
public void calculate() {
Double sum = 0.0;
start = list.get(0);
end = list.get(list.size()-1);
for (Double double1 : list) {
sum += double1;
if(max < double1) max = double1;
if (min > double1) min = double1;
}
avg = sum / list.size();
Double temp = 0.0;
for (Double double1 : list) {
temp += (double1 - avg)*(double1 - avg);
}
s = Math.sqrt( temp / list.size());
}
public String ToString() {
return order+"\t("+start+","+end+","+max+","+min+","+avg+","+s+")\n";
}
}
求各位大牛解释为啥spark集群运行时消耗的时间比单机的时候多那么多?谢谢!
解决方案 »
- (交流)大家使用oepnstack创建vlan之后,出来的数据包带vlan标签么?
- openstack到底是什么呢?
- 问一下EC2使用量比如磁盘读写次数之类的东西,在哪看呢?
- elb日志到s3周期是多久???
- AWS S3 实现上传(SSH框架)哪些对象是支持多线程的?
- 请教一下,docker run出现 No command specified是咋回事啊?
- 一般都是有奖征集回帖多,大家真的是对奖品感兴趣么?
- 求助!我自定义一个spark sql的函数
- win10 docker镜像替换失败,求解
- Hadoop中运行jps 只显示 11158 JPS
- 使用docker搭建gitlab,发现项目不能导入,求指导!
- 运行spark 1.6.1,出现异常,不能运行
处理逻辑基本都是一样的,单机的代码是正常的Java写的处理数据的代码。集群运行的代码是Spark应用的代码。