由于处理数据的需求,我写了一个简单的spark应用处理数据,本想着处理速度应该有很大的提高,然而结果令我难以接受!1 spark集群运行条件:3个work节点,Hdfs文件管理系统,数据输入2.5G左右,运行时间大约8分钟。
spark应用程序如下:package examples;import java.util.List;
import java.util.regex.Pattern;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;
import scala.Tuple6;import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;public class Compression { private static final Pattern SPACE = Pattern.compile(" ");    public static void main(String[] args) throws Exception {
        if (args.length < 4) {
            System.err.println("Usage: Compression <file> <interval> <slice> <type>");
            System.exit(1);
        }
        final Integer interval = Integer.valueOf(args[1]);
        SparkConf conf=new SparkConf().setAppName("Compression"+args[3]); 
        JavaSparkContext ctx=new JavaSparkContext(conf);
        JavaRDD<String> lines = ctx.textFile(args[0]);        JavaPairRDD<Integer, Tuple2<Integer, Double>> key_v = lines.mapToPair(
                new PairFunction<String, Integer,Tuple2<Integer, Double>>() {
                    @Override
                    public Tuple2<Integer, Tuple2<Integer, Double>> call(String s) {
                         String[] x = SPACE.split(s);
                         Integer order = Integer.valueOf(x[0]);
                         Integer k = order/interval;
                         Tuple2<Integer, Double> v = new Tuple2<Integer, Double>(order%interval, Double.valueOf(x[1]));
                        return new Tuple2<Integer,Tuple2<Integer, Double>>(k,v);
                    }
                });        JavaPairRDD<Integer, Iterable<Tuple2<Integer, Double>>> segments = key_v.groupByKey();
        //JavaPairRDD<Integer, Iterable<Tuple2<Integer, Double>>> segments = key_v.distinct();
        JavaPairRDD<Integer, Tuple6<Double, Double, Double, Double, Double, Double>> compressdata = segments.mapValues(
                new Function<Iterable<Tuple2<Integer, Double>>, Tuple6<Double, Double, Double, Double, Double, Double>>() {
@Override
public Tuple6<Double, Double, Double, Double, Double, Double> call(Iterable<Tuple2<Integer, Double>> list) throws Exception {
// TODO Auto-generated method stub
Double max = Double.MIN_VALUE;
Double min = Double.MAX_VALUE;
Double total = 0.0;
Double start = 0.0;
Double end = 0.0;
Double avg = 0.0;
Double s = 0.0;  
Integer len = 0;
for (Tuple2<Integer, Double> v : list) {
len ++;
if(v._1.equals(0)) start = v._2;
total += v._2;
if(v._2 > max) max = v._2;
if(v._2 < min) min = v._2;
}
avg = total / len;
Double temp = 0.0;
len -= 1;
for (Tuple2<Integer, Double> v : list) {
if(v._1.equals(len))end = v._2;
temp += (v._2 - avg)*(v._2 - avg);
}
s = Math.sqrt( temp / len);
return new Tuple6<Double, Double, Double, Double, Double, Double>(start, end, max, min, avg, s);
}
                });
         compressdata.saveAsHadoopFile("/SparkTest/Compression/result"+args[3], Text.class, IntWritable.class, TextOutputFormat.class);
       
        System.exit(0);
    }}
2 单机运行,时间大约在2分钟左右
package deal;import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;import javax.xml.crypto.Data;public class Compression { public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
if(args.length < 3)
{
System.out.println("In,out,interval\n");
System.exit(0);
}
File record = new File("records.txt");
BufferedWriter rw = new BufferedWriter(new FileWriter(record,true));

Date dates=new Date();
DateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String timestart=format.format(dates);

File filer = new File(args[0]);
File filew = new File(args[1]);
Integer interval = Integer.valueOf(args[2]);
BufferedReader reader = null;
BufferedWriter writer = null;
        try {
            reader = new BufferedReader(new FileReader(filer));
            writer = new BufferedWriter(new FileWriter(filew));
            String tempString = null;
            int line = 0;
            onedata one = null;
            while ((tempString = reader.readLine()) != null) {
                String[] lines = tempString.split(" ");
                if(line%interval == 0)
                {
                 if(one!=null)
                 {
                 one.calculate();
                 writer.write(one.ToString());
                 }
                 one = new onedata(line/interval);
                }
                 one.list.add(Double.valueOf(tempString.split(" ")[1]));
                line++;
            }
            reader.close();
            writer.close();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (reader != null) {
                try {
                    reader.close();
                } catch (IOException e1) {
                }
            }
        }
        Date datee=new Date();
        String timeend=format.format(datee);
        long cha = datee.getTime() - dates.getTime();
        rw.write(args[0]+"\t"+timestart+"\t"+timeend+"\t"+cha+"\n");
        rw.close();
        
}}
class onedata{
public ArrayList<Double> list = new ArrayList<Double>();
Double max = Double.MIN_VALUE;
Double min = Double.MAX_VALUE;
Double start = 0.0;
Double end = 0.0;
Double avg = 0.0;
Double s = 0.0;
Integer order = 0;
public onedata(Integer o) {
// TODO Auto-generated constructor stub
order = o;
}
public void calculate() {
Double sum = 0.0;
start = list.get(0);
end = list.get(list.size()-1);
for (Double double1 : list) {
sum += double1;
if(max < double1) max = double1;
if (min > double1) min = double1;
}
avg = sum / list.size();
Double temp = 0.0;
for (Double double1 : list) {
temp += (double1 - avg)*(double1 - avg);
}
s = Math.sqrt( temp / list.size());
}
public String ToString() {
return order+"\t("+start+","+end+","+max+","+min+","+avg+","+s+")\n";
}
}
求各位大牛解释为啥spark集群运行时消耗的时间比单机的时候多那么多?谢谢!