原生LastJoin优化
介绍
LastJoin实现
val indexColumnName = "indexCol"
val orderbyColmn = "orderbyCol"
val t1WithIndex = t1.withColumn(indexColumnName, monotonically_increasing_id())
val t3 = t1WithIndex.join(t2, t1WithIndex("id") === t2("u_id"), "left_outer")
val customOrder = new Ordering[Row]() {
override def compare(x: Row, y: Row): Int =
Ordering[Long].compare(x.getAs[Long](orderbyColmn), y.getAs[Long](orderbyColmn))
}
val t4 = t3.groupByKey(row => row.getAs[Long](indexColumnName)).mapGroups {
case (k, iter) => {
val maxRow = iter.max(customOrder)
(maxRow.getAs[Int](0), maxRow.getAs[String](1), maxRow.getAs[String](2), maxRow.getAs[Int](3), maxRow.getAs[Long](4), maxRow.getAs[Int](6), maxRow.getAs[String](7), maxRow.getAs[Long](8))
}
}原生LastJoin实现
性能对比

Last updated