TaxiTour多窗口测试
介绍
这个测试场景使用公开的TaxiTour数据,数据已经预处理成Parquet文件格式,测试脚本地址为 https://github.com/4paradigm/SparkFE/tree/main/benchmark/taxi_tour_multiple_window 。
测试SQL
测试的SQL脚本包含两个Window,窗口大小默认为10000,如果窗口越大那么整体运行时间变长,但SparkFE的性能提升越明显。
select
trip_duration,
passenger_count,
sum(pickup_latitude) over w as vendor_sum_pl,
max(pickup_latitude) over w as vendor_max_pl,
min(pickup_latitude) over w as vendor_min_pl,
avg(pickup_latitude) over w as vendor_avg_pl,
sum(pickup_latitude) over w2 as pc_sum_pl,
max(pickup_latitude) over w2 as pc_max_pl,
min(pickup_latitude) over w2 as pc_min_pl,
avg(pickup_latitude) over w2 as pc_avg_pl,
sum(dropoff_latitude) over w as vendor_sum_pl2,
max(dropoff_latitude) over w as vendor_max_pl2,
min(dropoff_latitude) over w as vendor_min_pl2,
avg(dropoff_latitude) over w as vendor_avg_pl2,
sum(dropoff_latitude) over w2 as pc_sum_pl2,
max(dropoff_latitude) over w2 as pc_max_pl2,
min(dropoff_latitude) over w2 as pc_min_pl2,
avg(dropoff_latitude) over w2 as pc_avg_pl2,
sum(trip_duration) over w as vendor_sum_pl3,
max(trip_duration) over w as vendor_max_pl3,
min(trip_duration) over w as vendor_min_pl3,
avg(trip_duration) over w as vendor_avg_pl3,
sum(trip_duration) over w2 as pc_sum_pl3,
max(trip_duration) over w2 as pc_max_pl3,
min(trip_duration) over w2 as pc_min_pl3,
avg(trip_duration) over w2 as pc_avg_pl3
from t1
window w as (partition by vendor_id order by pickup_datetime ROWS BETWEEN 10000 PRECEDING AND CURRENT ROW),
w2 as (partition by passenger_count order by pickup_datetime ROWS BETWEEN 10000 PRECEDING AND CURRENT ROW)
测试脚本
测试使用简单PySpark即可运行,也可以使用Scala等语言实现,小数据量下本地可运行,示例程序如下。
from pyspark.sql import SparkSession
import os
def main():
spark = SparkSession.builder.appName("app").getOrCreate()
current_path = os.getcwd()
parquet_file_path = "file://{}/taxi_tour_parquet/".format(current_path)
train = spark.read.parquet(parquet_file_path)
train.createOrReplaceTempView("t1")
with open("multiple_window.sql", "r") as f:
sparksqlText = f.read()
spark_df = spark.sql(sparksqlText)
output_path = "file:///tmp/spark_output/"
spark_df.write.mode('overwrite').parquet(output_path)
spark.stop()
if __name__ == "__main__":
main()
$SPARK_HOME/bin/spark-submit \
--master local[*] \
./multiple_window.py
Last updated