我在 这篇 文章中介绍了 Apache Spark 3.0 动态分区裁剪(Dynamic Partition Pruning),里面涉及到动态分区的优化思路等,但是并没有涉及到如何使用,本文将介绍在什么情况下会启用动态分区裁剪。
并不是什么查询都会启用动态裁剪优化的,必须满足以下几个条件:
spark.sql.optimizer.dynamicPartitionPruning.enabled参数必须设置为 true,不过这个值默认就是启用的;- 需要裁减的表必须是分区表,而且分区字段必须在 join 的 on 条件里面;
- Join 类型必须是 INNER, LEFT SEMI (左表是分区表), LEFT OUTER (右表是分区表), or RIGHT OUTER (左表是分区表)。
- 满足上面的条件也不一定会触发动态分区裁减,还必须满足
spark.sql.optimizer.dynamicPartitionPruning.useStats和spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio两个参数综合评估出一个进行动态分区裁减是否有益的值,满足了才会进行动态分区裁减。评估函数实现请参见 org.apache.spark.sql.dynamicpruning.PartitionPruning#pruningHasBenefit。
如何使用 Dynamic Partition Pruning
我们先使用 Spark 创建两张表:
spark.range(10000)
.select(col("id"), col("id").as("k"))
.write.partitionBy("k")
.format("parquet")
.mode("overwrite")
.saveAsTable("iteblog_tab1")
spark.range(100)
.select(col("id"), col("id").as("k"))
.write.partitionBy("k")
.format("parquet")
.mode("overwrite")
.saveAsTable("iteblog_tab2")
运行完上面的代码之后,iteblog_tab1 表将产生 10000 个分区,iteblog_tab2 表将产生 100 个分区。我们运行下面的查询语句:
spark.sql("SELECT * FROM iteblog_tab1 t1 JOIN iteblog_tab2 t2 ON t1.k = t2.k AND t2.id < 2").show()
在没有启用动态分区裁剪的情况下 Spark 物理执行计划如下:
== Physical Plan ==
CollectLimit 21
+- *(2) Project [cast(id#0L as string) AS id#12, cast(k#1L as string) AS k#13, cast(id#2L as string) AS id#14, cast(k#3L as string) AS k#15]
+- *(2) BroadcastHashJoin [k#1L], [k#3L], Inner, BuildRight
:- *(2) ColumnarToRow
: +- FileScan parquet default.iteblog_tab1[id#0L,k#1L] Batched: true, DataFilters: [], Format: Parquet, Location: PrunedInMemoryFileIndex[file:/user/hive/warehouse/iteblog_tab1/k=0, file:/user/hive/warehouse/ite..., PartitionFilters: [isnotnull(k#1L)], PushedFilters: [], ReadSchema: struct<id:bigint>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, true])), [id=#41]
+- *(1) Project [id#2L, k#3L]
+- *(1) Filter (isnotnull(id#2L) AND (id#2L < 2))
+- *(1) ColumnarToRow
+- FileScan parquet default.iteblog_tab2[id#2L,k#3L] Batched: true, DataFilters: [isnotnull(id#2L), (id#2L < 2)], Format: Parquet, Location: PrunedInMemoryFileIndex[file:/user/hive/warehouse/iteblog_tab2/k=0, file:/user/hive/warehouse/ite..., PartitionFilters: [isnotnull(k#3L)], PushedFilters: [IsNotNull(id), LessThan(id,2)], ReadSchema: struct<id:bigint>
DAG 图如下:
从上面的 物理执行计划和 DAG 执行图可以看出,这个就是正常的算子下推的结果。
如果我们使用 Apache Spark 3.0 并启用动态分区裁减功能,物理执行计划如下:
== Physical Plan ==
CollectLimit 21
+- *(2) Project [cast(id#0L as string) AS id#12, cast(k#1L as string) AS k#13, cast(id#2L as string) AS id#14, cast(k#3L as string) AS k#15]
+- *(2) BroadcastHashJoin [k#1L], [k#3L], Inner, BuildRight
:- *(2) ColumnarToRow
: +- FileScan parquet default.iteblog_tab1[id#0L,k#1L] Batched: true, DataFilters: [], Format: Parquet, Location: PrunedInMemoryFileIndex[file:/user/hive/warehouse/iteblog_tab1/k=0, file:/user/hive/warehouse/ite..., PartitionFilters: [isnotnull(k#1L), dynamicpruningexpression(k#1L IN dynamicpruning#20)], PushedFilters: [], ReadSchema: struct<id:bigint>
: +- SubqueryBroadcast dynamicpruning#20, 0, [k#3L], [id=#96]
: +- ReusedExchange [id#2L, k#3L], BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, true])), [id=#72]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, true])), [id=#72]
+- *(1) Project [id#2L, k#3L]
+- *(1) Filter (isnotnull(id#2L) AND (id#2L < 2))
+- *(1) ColumnarToRow
+- FileScan parquet default.iteblog_tab2[id#2L,k#3L] Batched: true, DataFilters: [isnotnull(id#2L), (id#2L < 2)], Format: Parquet, Location: PrunedInMemoryFileIndex[file:/user/hive/warehouse/iteblog_tab2/k=0, file:/user/hive/warehouse/ite..., PartitionFilters: [isnotnull(k#3L)], PushedFilters: [IsNotNull(id), LessThan(id,2)], ReadSchema: struct<id:bigint>
DAG 图如下:
可以看出,iteblog_tab1 表的扫描相比上面那个多了一个分区过滤(PartitionFilters),在一些情况下性能提升能达到 2 - 18 倍。
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Apache Spark 3.0 动态分区裁剪(Dynamic Partition Pruning)使用】(https://www.iteblog.com/archives/8590.html)



