在使用RDD 的flatmap函数时,如果flatmap函数中返回的数组对象很多,比如几十上百,会导致Spark运行特别慢,示例代码:
val rdd2 = sc.textFile(strInputFilePath).flatMap(line => {
var grids = new ArrayBuffer[Tuple2[Tuple2[Int, Int], Double]]()
val values = line.split(",")
if (values.length > 3) {
try {
val br = broadcast.value
val x = values(br._6).toDouble
val y = values(br._7).toDouble val point = new SPoint2D(x, y) if (br._1.contains(point)) {
val xmin = point.x - br._2;
val ymin = point.y - br._2;
val xmax = point.x + br._2;
val ymax = point.y + br._2; val rcBounds = br._1;
val IndexCols = br._4
val IndexRows = br._5 var col1 = (Math.floor((xmin - rcBounds.getLeft())
/ resolution)).toInt;
var col2 = (Math.floor((xmax - rcBounds.getLeft())
/ resolution)).toInt;
var row1 = -(Math.floor((ymax - rcBounds.getTop())
/ resolution)).toInt;
var row2 = -(Math.floor((ymin - rcBounds.getTop())
/ resolution)).toInt; if (col1 < 0) {
col1 = 0;
} else if (col1 >= IndexCols) {
col1 = IndexCols - 1;
}
if (col2 < 0) {
col2 = 0;
} else if (col2 >= IndexCols) {
col2 = IndexCols - 1;
}
if (row1 < 0) {
row1 = 0;
} else if (row1 >= IndexRows) {
row1 = IndexRows - 1;
}
if (row2 < 0) {
row2 = 0;
} else if (row2 >= IndexRows) {
row2 = IndexRows - 1;
} if (col1 > col2) {
var temp = col2;
col2 = col1;
col1 = temp;
}
if (row1 > row2) {
var temp = row2;
row2 = row1;
row1 = temp;
} for (col <- col1.to(col2); row <- row1.to(row2)) {
val xtemp = rcBounds.getLeft() + col * br._3 + 0.5 * br._3;
val ytemp = rcBounds.getTop() - row * br._3 - 0.5 * br._3; val distance = Math.sqrt((x - xtemp)
* (x - xtemp) + (y - ytemp)
* (y - ytemp));
if (distance <= br._2) {
val disPre = distance / br._2;
val valuePre = 1.0 * 3 * Math.pow(1 - disPre * disPre, 2) / br._8;
grids += new Tuple2((row, col), valuePre)
}
}
}
} catch {
case ex: Exception =>
ex.printStackTrace()
}
}
grids.toArray[Tuple2[Tuple2[Int, Int], Double]]
})在上面代码中,如果grids这个ArrayBuffer中对象数目比较多,这段代码运行会非常慢。请问各位,有没有好的解决思路和方法。
val rdd2 = sc.textFile(strInputFilePath).flatMap(line => {
var grids = new ArrayBuffer[Tuple2[Tuple2[Int, Int], Double]]()
val values = line.split(",")
if (values.length > 3) {
try {
val br = broadcast.value
val x = values(br._6).toDouble
val y = values(br._7).toDouble val point = new SPoint2D(x, y) if (br._1.contains(point)) {
val xmin = point.x - br._2;
val ymin = point.y - br._2;
val xmax = point.x + br._2;
val ymax = point.y + br._2; val rcBounds = br._1;
val IndexCols = br._4
val IndexRows = br._5 var col1 = (Math.floor((xmin - rcBounds.getLeft())
/ resolution)).toInt;
var col2 = (Math.floor((xmax - rcBounds.getLeft())
/ resolution)).toInt;
var row1 = -(Math.floor((ymax - rcBounds.getTop())
/ resolution)).toInt;
var row2 = -(Math.floor((ymin - rcBounds.getTop())
/ resolution)).toInt; if (col1 < 0) {
col1 = 0;
} else if (col1 >= IndexCols) {
col1 = IndexCols - 1;
}
if (col2 < 0) {
col2 = 0;
} else if (col2 >= IndexCols) {
col2 = IndexCols - 1;
}
if (row1 < 0) {
row1 = 0;
} else if (row1 >= IndexRows) {
row1 = IndexRows - 1;
}
if (row2 < 0) {
row2 = 0;
} else if (row2 >= IndexRows) {
row2 = IndexRows - 1;
} if (col1 > col2) {
var temp = col2;
col2 = col1;
col1 = temp;
}
if (row1 > row2) {
var temp = row2;
row2 = row1;
row1 = temp;
} for (col <- col1.to(col2); row <- row1.to(row2)) {
val xtemp = rcBounds.getLeft() + col * br._3 + 0.5 * br._3;
val ytemp = rcBounds.getTop() - row * br._3 - 0.5 * br._3; val distance = Math.sqrt((x - xtemp)
* (x - xtemp) + (y - ytemp)
* (y - ytemp));
if (distance <= br._2) {
val disPre = distance / br._2;
val valuePre = 1.0 * 3 * Math.pow(1 - disPre * disPre, 2) / br._8;
grids += new Tuple2((row, col), valuePre)
}
}
}
} catch {
case ex: Exception =>
ex.printStackTrace()
}
}
grids.toArray[Tuple2[Tuple2[Int, Int], Double]]
})在上面代码中,如果grids这个ArrayBuffer中对象数目比较多,这段代码运行会非常慢。请问各位,有没有好的解决思路和方法。
1) Try to bench how long your method runs? Try to improve it. I didn't read it carefully, but it looks like it can improve. The code smells bad
2) If there is no way to improve the method, then your place need from each line is:
val x = values(br._6).toDouble
val y = values(br._7).toDouble
So you can distinct the real combination of (values(br._6), values(br._7) first, so each unique combination will only run from your method once, instead of as right now.