public class LinearRegression { public static void main(String[] args) {
// TODO Auto-generated method stub SparkConf conf = new SparkConf().setAppName("JavaStreamingLinearRegressionWithSGDE");
JavaStreamingContext jssc=new JavaStreamingContext(conf, Durations.seconds(10L));
JavaDStream<String> data = jssc.textFileStream("/LienarRegression/lpsa.data");
JavaDStream<LabeledPoint> parsedData = data.map(line -> {
String[] parts = line.split(",");
String[] features = parts[1].split(" ");
double[] v = new double[features.length];
for (int i = 0; i < features.length - 1; i++) {
v[i] = Double.parseDouble(features[i]);
}
return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v));
});
parsedData.cache(); JavaDStream<String> data_1 = jssc.textFileStream("/LienarRegression/lpsa_1.data");
// JavaDStream转JavaPairDStream (JavaPairDStream<K,Vector> data)
int numIterations = 3;
StreamingLinearAlgorithm model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numIterations));
model.trainOn(parsedData);
model.latestModel();
//model.predictOnValues();
}
}数据源示例 : 2.5687881,1.16902610257751 0.855491905752846 2.03274448152093 1.22628985326088 1.89254797819741 2.02833774827712 3.11219574032972 2.68112551007152(用逗号和空格分隔)调用predictOnValues方法 , 参数是JavaPairDStream<K,Vector> data , 如何把JavaDStream转换成JavaPairDStream ? 求教 , 求上代码 , 大佬们
// TODO Auto-generated method stub SparkConf conf = new SparkConf().setAppName("JavaStreamingLinearRegressionWithSGDE");
JavaStreamingContext jssc=new JavaStreamingContext(conf, Durations.seconds(10L));
JavaDStream<String> data = jssc.textFileStream("/LienarRegression/lpsa.data");
JavaDStream<LabeledPoint> parsedData = data.map(line -> {
String[] parts = line.split(",");
String[] features = parts[1].split(" ");
double[] v = new double[features.length];
for (int i = 0; i < features.length - 1; i++) {
v[i] = Double.parseDouble(features[i]);
}
return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v));
});
parsedData.cache(); JavaDStream<String> data_1 = jssc.textFileStream("/LienarRegression/lpsa_1.data");
// JavaDStream转JavaPairDStream (JavaPairDStream<K,Vector> data)
int numIterations = 3;
StreamingLinearAlgorithm model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numIterations));
model.trainOn(parsedData);
model.latestModel();
//model.predictOnValues();
}
}数据源示例 : 2.5687881,1.16902610257751 0.855491905752846 2.03274448152093 1.22628985326088 1.89254797819741 2.02833774827712 3.11219574032972 2.68112551007152(用逗号和空格分隔)调用predictOnValues方法 , 参数是JavaPairDStream<K,Vector> data , 如何把JavaDStream转换成JavaPairDStream ? 求教 , 求上代码 , 大佬们
解决方案 »
- OpenStack 2011.3已经兼容亚马逊API
- 云计算市场现状及未来如何?
- openstack 安装glance的问题
- 请教运行"glance-manage db_sync" glance,报错
- AWS 域名怎么修改
- 如何把所有在AWS S3 bucket里面的东西全部设置为默认公开?
- 云计算真的对个人有用吗?
- 请教个问题,在VPC中,同一个subnet下,如何设置其它的instance通过同一个subnet下的某个NAT instance来访问internet?
- Docker启动失败
- docker配置问题
- 定时检测系统cpu,内存,根分区使用情况,检测脚本写好了,但不知道怎么做任务计划让半小时执行,小女子不才,向各路大神请教!
- 数据存储的未来趋势使企业存储面临怎样问题
类似这个操作?
您看我这个对不 ? parsedData是JavaDStream<LabeledPoint>
JavaPairDStream<Double, Vector> temp = parsedData.mapToPair(new PairFunction<LabeledPoint, Double, Vector>() {
public Tuple2<Double, Vector> call(LabeledPoint p) {
return new Tuple2<Double, Vector>(p.label() , (Vector) p.features());
}
});
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("JavaStreamingLinearRegressionWithSGDETest");
JavaStreamingContext jssc = new JavaStreamingContext(conf , Durations.seconds(2L));
JavaDStream<String> lines= jssc.socketTextStream("***.**.**.***", 9999);
JavaDStream<LabeledPoint> parsedData = lines.map(line -> {
String[] parts = line.split(",");
String[] features = parts[1].split(" ");
double[] v = new double[features.length];
for (int i = 0; i < features.length - 1; i++) {
v[i] = Double.parseDouble(features[i]);
}
return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v));
});
parsedData.cache();
JavaPairDStream<Double, Vector> temp = parsedData.mapToPair(new PairFunction<LabeledPoint, Double, Vector>() {
public Tuple2<Double, Vector> call(LabeledPoint p) {
return new Tuple2<Double, Vector>(p.label() , (Vector) p.features());
}
});
int numIterations = 3;
StreamingLinearAlgorithm model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numIterations));
model.trainOn(parsedData);
model.latestModel();
model.predictOnValues(temp).print();
jssc.start();
try {
jssc.awaitTermination();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}