我自定义一个spark sql的函数,中间值产生的是数组,但是设置中间类型为ArrayType会报红把ArrayType该父类DataType不报红,但是运行报错难道不可以产生的中间类型为Array吗?求大神解答,感谢!!!
下面是全部代码
class GameDuration(startDayLong: Long, endDayLong: Long) extends UserDefinedAggregateFunction { //输入的数据类型
override def inputSchema: StructType = StructType(
StructField("eventType", IntegerType) ::
StructField("timestamp", LongType) :: Nil
) //产生的中间结果类型
override def bufferSchema: StructType = StructType(
StructField("list", ArrayType) :: Nil
) //最终返回的结果类型
override def dataType: DataType = LongType override def deterministic: Boolean = true //指定初始值
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(1) = ArrayBuffer[Long]()
} //每有一条数据参与运算就更新一下中间结果(update相当于在每一个分区中的运算)
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val eventType = input.getInt(0)
val timestamp = input.getLong(1)
var tmp = 0L
if (eventType == 3) tmp = timestamp
else tmp = -timestamp buffer(1) = buffer.getAs[ArrayBuffer[Long]](1) += tmp
} //全局聚合
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getInt(0) + buffer2.getInt(0)
} //最终计算
override def evaluate(buffer: Row): Long = {
val arr: ArrayBuffer[Long] = buffer.getAs[ArrayBuffer[Long]](1)
if(arr(0) > 0) arr += -startDayLong
if(arr.last < 0) arr += endDayLong
arr.sum
}
}
还有这是我调用该函数,不知道是否正确啊
下面是全部代码
class GameDuration(startDayLong: Long, endDayLong: Long) extends UserDefinedAggregateFunction { //输入的数据类型
override def inputSchema: StructType = StructType(
StructField("eventType", IntegerType) ::
StructField("timestamp", LongType) :: Nil
) //产生的中间结果类型
override def bufferSchema: StructType = StructType(
StructField("list", ArrayType) :: Nil
) //最终返回的结果类型
override def dataType: DataType = LongType override def deterministic: Boolean = true //指定初始值
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(1) = ArrayBuffer[Long]()
} //每有一条数据参与运算就更新一下中间结果(update相当于在每一个分区中的运算)
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val eventType = input.getInt(0)
val timestamp = input.getLong(1)
var tmp = 0L
if (eventType == 3) tmp = timestamp
else tmp = -timestamp buffer(1) = buffer.getAs[ArrayBuffer[Long]](1) += tmp
} //全局聚合
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getInt(0) + buffer2.getInt(0)
} //最终计算
override def evaluate(buffer: Row): Long = {
val arr: ArrayBuffer[Long] = buffer.getAs[ArrayBuffer[Long]](1)
if(arr(0) > 0) arr += -startDayLong
if(arr.last < 0) arr += endDayLong
arr.sum
}
}
还有这是我调用该函数,不知道是否正确啊
解决方案 »
免费领取超大流量手机卡,每月29元包185G流量+100分钟通话, 中国电信官方发货