本人最近研究关于spark的图模型,spark最近才接触,scala语言也不是很熟,论坛里有没有高手帮忙解答一下关于最短路径的,官方文档我也看了,但貌似没有关于最短路径的,那个shortestpaths源码,也没怎么看明白,而且一下实现办法也只是展示了源点到目标顶点的距离,我想用spark做出最短路径的,最好是有代码案例的,结果带中间节点的,不知哪位大神做过这方面的研究,帮帮忙
解决方案 »
- 华为云计算有奖问答第5期(已结束)
- 求请教 如何为neutron中的router设置路由表
- 提问:EMR中s3distcp的容器超过了虚拟内存的限制
- 貌似在docker 环境里,用不了主机现有的 gcc 这样的东东
- spaksql支持INSERT语句吗?
- Hadoop
- 【求助】IBM X3560 M4启动不了
- 单机查询mysql,再到spark上查询,这个逻辑该如何实现?
- virt-manager启动虚机失败
- arm64位cpu 加上linux系统,怎么安装docker,求助!
- spark jar 读取hdfs文件错误
- nginx转发给tomcat,POST数据丢失,又要修改浏览器路径,怎么破?
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx.{EdgeDirection, VertexId, Graph}
import org.apache.spark.graphx.util.GraphGenerators
object Pregel_SSSP {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Pregel_SSSP")
val sc = new SparkContext(conf)
// A graph with edge attributes containing distances
val graph: Graph[Long, Double] =
GraphGenerators.logNormalGraph(sc, numVertices = 5).mapEdges(e => e.attr.toDouble)
graph.edges.foreach(println)
val sourceId: VertexId = 0 // The ultimate source // Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph : Graph[(Double, List[VertexId]), Double] = graph.mapVertices((id, _) => if (id == sourceId) (0.0, List[VertexId](sourceId)) else (Double.PositiveInfinity, List[VertexId]())) val sssp = initialGraph.pregel((Double.PositiveInfinity, List[VertexId]()), Int.MaxValue, EdgeDirection.Out)( // Vertex Program
(id, dist, newDist) => if (dist._1 < newDist._1) dist else newDist, // Send Message
triplet => {
if (triplet.srcAttr._1 < triplet.dstAttr._1 - triplet.attr ) {
Iterator((triplet.dstId, (triplet.srcAttr._1 + triplet.attr , triplet.srcAttr._2 :+ triplet.dstId)))
} else {
Iterator.empty
}
},
//Merge Message
(a, b) => if (a._1 < b._1) a else b)
println(sssp.vertices.collect.mkString("\n"))
}
}
注意下单源的实现思路,就是迭代过程中记录下当前的最短距离,现在变成了多源,就需要分别记录多个路径的相对最短距离,attr里就需要一个映射关系。并不是简单的并行对每一个点对执行单源的算法和数据结构就可以的。