public class LinearRegression { public static void main(String[] args) {
// TODO Auto-generated method stub SparkConf conf = new SparkConf().setAppName("JavaStreamingLinearRegressionWithSGDE");
JavaStreamingContext jssc=new JavaStreamingContext(conf, Durations.seconds(10L));
    
    JavaDStream<String> data = jssc.textFileStream("/LienarRegression/lpsa.data");
    JavaDStream<LabeledPoint> parsedData = data.map(line -> {
        String[] parts = line.split(",");
        String[] features = parts[1].split(" ");
        double[] v = new double[features.length];
        for (int i = 0; i < features.length - 1; i++) {
          v[i] = Double.parseDouble(features[i]);
        }
        return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v));
    });
    parsedData.cache(); JavaDStream<String> data_1 = jssc.textFileStream("/LienarRegression/lpsa_1.data");  
//   JavaDStream转JavaPairDStream (JavaPairDStream<K,Vector> data)
int numIterations = 3;
StreamingLinearAlgorithm model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numIterations));  
 
model.trainOn(parsedData);
model.latestModel();
//model.predictOnValues();
}
}数据源示例 : 2.5687881,1.16902610257751 0.855491905752846 2.03274448152093 1.22628985326088 1.89254797819741 2.02833774827712 3.11219574032972 2.68112551007152(用逗号和空格分隔)调用predictOnValues方法 , 参数是JavaPairDStream<K,Vector> data , 如何把JavaDStream转换成JavaPairDStream ? 求教 , 求上代码 , 大佬们

解决方案 »

  1.   

    textFile("x").map(x => (x, "1"))
    类似这个操作? 
      

  2.   

    是的 , 但是您这个是scala的 , 我这必须用java写
    您看我这个对不 ? parsedData是JavaDStream<LabeledPoint>
    JavaPairDStream<Double, Vector> temp = parsedData.mapToPair(new PairFunction<LabeledPoint, Double, Vector>() {
        public Tuple2<Double, Vector> call(LabeledPoint p) {
          return new Tuple2<Double, Vector>(p.label() , (Vector) p.features());
        }
    });
      

  3.   

    之前用的textFileStream 现在改用socketTextStream接收端口数据了 , 这是我写的 , 您给看看有毛病没 ? 我也是刚开始接触spark , 您有空给看看 , 测试数据用的就是github上spark的lpsa.data里面的测试数据(https://github.com/apache/spark/blob/master/data/mllib/ridge-data/lpsa.data)public static void main(String[] args) {
    SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("JavaStreamingLinearRegressionWithSGDETest");
    JavaStreamingContext jssc = new JavaStreamingContext(conf , Durations.seconds(2L));
    JavaDStream<String> lines= jssc.socketTextStream("***.**.**.***", 9999);
       
    JavaDStream<LabeledPoint> parsedData = lines.map(line -> {
            String[] parts = line.split(",");
            String[] features = parts[1].split(" ");
            double[] v = new double[features.length];
            for (int i = 0; i < features.length - 1; i++) {
              v[i] = Double.parseDouble(features[i]);
             }
            return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v));
        });
        parsedData.cache();
      
    JavaPairDStream<Double, Vector> temp = parsedData.mapToPair(new PairFunction<LabeledPoint, Double, Vector>() {
        public Tuple2<Double, Vector> call(LabeledPoint p) {
          return new Tuple2<Double, Vector>(p.label() , (Vector) p.features());
        }
    });

    int numIterations = 3;
    StreamingLinearAlgorithm model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numIterations));  
     
    model.trainOn(parsedData);
    model.latestModel();
    model.predictOnValues(temp).print();

    jssc.start();
    try {
    jssc.awaitTermination();
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }