在使用RDD 的flatmap函数时,如果flatmap函数中返回的数组对象很多,比如几十上百,会导致Spark运行特别慢,示例代码:
   val rdd2 = sc.textFile(strInputFilePath).flatMap(line => {
      var grids = new ArrayBuffer[Tuple2[Tuple2[Int, Int], Double]]()
      val values = line.split(",")
      if (values.length > 3) {
        try {
          val br = broadcast.value
          val x = values(br._6).toDouble
          val y = values(br._7).toDouble          val point = new SPoint2D(x, y)          if (br._1.contains(point)) {
            val xmin = point.x - br._2;
            val ymin = point.y - br._2;
            val xmax = point.x + br._2;
            val ymax = point.y + br._2;            val rcBounds = br._1;
            val IndexCols = br._4
            val IndexRows = br._5            var col1 = (Math.floor((xmin - rcBounds.getLeft())
              / resolution)).toInt;
            var col2 = (Math.floor((xmax - rcBounds.getLeft())
              / resolution)).toInt;
            var row1 = -(Math.floor((ymax - rcBounds.getTop())
              / resolution)).toInt;
            var row2 = -(Math.floor((ymin - rcBounds.getTop())
              / resolution)).toInt;            if (col1 < 0) {
              col1 = 0;
            } else if (col1 >= IndexCols) {
              col1 = IndexCols - 1;
            }
            if (col2 < 0) {
              col2 = 0;
            } else if (col2 >= IndexCols) {
              col2 = IndexCols - 1;
            }
            if (row1 < 0) {
              row1 = 0;
            } else if (row1 >= IndexRows) {
              row1 = IndexRows - 1;
            }
            if (row2 < 0) {
              row2 = 0;
            } else if (row2 >= IndexRows) {
              row2 = IndexRows - 1;
            }            if (col1 > col2) {
              var temp = col2;
              col2 = col1;
              col1 = temp;
            }
            if (row1 > row2) {
              var temp = row2;
              row2 = row1;
              row1 = temp;
            }            for (col <- col1.to(col2); row <- row1.to(row2)) {
              val xtemp = rcBounds.getLeft() + col * br._3 + 0.5 * br._3;
              val ytemp = rcBounds.getTop() - row * br._3 - 0.5 * br._3;              val distance = Math.sqrt((x - xtemp)
                * (x - xtemp) + (y - ytemp)
                * (y - ytemp));
              if (distance <= br._2) {
                val disPre = distance / br._2;
                val valuePre = 1.0 * 3 * Math.pow(1 - disPre * disPre, 2) / br._8;
                grids += new Tuple2((row, col), valuePre)
              }
            }
          }
        } catch {
          case ex: Exception =>
            ex.printStackTrace()
        }
      }
      grids.toArray[Tuple2[Tuple2[Int, Int], Double]]
    })在上面代码中,如果grids这个ArrayBuffer中对象数目比较多,这段代码运行会非常慢。请问各位,有没有好的解决思路和方法。

解决方案 »

  1.   

    I don't think Spark is slow, but your code is slow.You should bench your method (The whole line => { your implementation here }), to see how long it run. If it run for 1s, then assuming that your text file has 1,000,000 lines, you will run 1,000,000 seconds in sequence. So even spark can it as 1000 concurrently, your whole job will still take 1000 seconds.So my suggestion is:
    1) Try to bench how long your method runs? Try to improve it. I didn't read it carefully, but it looks like it can improve. The code smells bad
    2) If there is no way to improve the method, then your place need from each line is:
              val x = values(br._6).toDouble
              val y = values(br._7).toDouble
    So you can distinct the real combination of (values(br._6), values(br._7) first, so each unique combination will only run from your method once, instead of as right now.