spark 多个特征做onehot,我有50个多特征需要做onehot处理,怎么做效率高点?
解决方案 »
- 如何隔绝不同项目之的网络,如何限制vm公网出口的带宽?
- openstack-ceilometer配置问题
- 公司台式电脑mac地址突然改变了,还导致一小段时间不能上网,然后网络又自动连上了
- build镜像时,我想将多个服务器都打包进去,该如何做?
- SparkUI界面打不开
- oozie运行shell,namenode缓存没有文件
- 关于大数据的编程与算法~~~很困惑,求助MapReduce和Spark大神、前辈
- java与数据库的联系
- 请问一下,在使用idea中maven配置spark项目的时候,导包会出现这个问题
- 关于spark分区什么时候进行的问题?
- 自己弄个服务器,目的是linux,redis,mongodb,mysql等学习,2000左右的预算
- 1.6Spark读取csv解析超多字段文件转DF问题求教
val vectorData = dataRDD
//将 枚举的值 转化为 Double
.map( x => ( enum2Double("是否已流失",x._1), x._2(0) , x._2(1) ,x._2(2),x._2(3) ) )
//ml.feature.LabeledPoint
.toDF("loss","gender","age","grade","region") //indexing columns
val stringColumns = Array("gender","age","grade","region")
val index_transformers: Array[org.apache.spark.ml.PipelineStage] = stringColumns.map(
cname => new StringIndexer()
.setInputCol(cname)
.setOutputCol(s"${cname}_index")
)
// Add the rest of your pipeline like VectorAssembler and algorithm
val index_pipeline = new Pipeline().setStages(index_transformers)
val index_model = index_pipeline.fit(vectorData)
val df_indexed = index_model.transform(vectorData) //encoding columns
val indexColumns = df_indexed.columns.filter(x => x contains "index")
val one_hot_encoders: Array[org.apache.spark.ml.PipelineStage] = indexColumns.map(
cname => new OneHotEncoder()
.setInputCol(cname)
.setOutputCol(s"${cname}_vec")
) val pipeline = new Pipeline().setStages(index_transformers ++ one_hot_encoders)
val model = pipeline.fit(vectorData)
model.transform(vectorData).select("loss","gender_index_vec","age_index_vec","grade_index_vec","region_index_vec")
.map (
x=>
ml.feature.LabeledPoint(x.apply(0).toString().toDouble ,ml.linalg.Vectors.dense(x.getAs[SparseVector] ("gender_index_vec").toArray++x.getAs[SparseVector]("age_index_vec").toArray++x.getAs[SparseVector]("grade_index_vec").toArray++x.getAs[SparseVector]("region_index_vec").toArray))
)
来源:
http://blog.csdn.net/pan_haufei/article/details/72903667
祝成功