欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

Spark SQL 查询 Parquet 文件的性能提升 30%,字节是如何做到的?

本文来自11月举办的 Data + AI Summit 2020 (原 Spark+AI Summit),主题为《Improving Spark SQL Performance by 30%: How We Optimize Parquet Filter Pushdown and Parquet Reader》的分享,作者为字节跳动的孙科和郭俊。相关 PPT 可以关注 Java与大数据架构 公众号并回复 9912 获取。

Parquet 是一种非常流行的列式存储格式。Spark 的算子下推(pushdown filters)可以利用 Parquet 文件的统计数据(比如最大最小值统计)来过滤无用数据。另一方面,Spark 用户可以启用 Spark Parquet 的向量化读取器(vectorized reader)来批量读取 parquet 文件。这些特性大大提高了 Spark 的性能,节省了 CPU 和 IO。Parquet 是字节跳动数据仓库的默认数据格式。在实践中,字节团队的同学发现 parquet 的下推过滤器的工作效果很差,其读取了大量没用的统计数据,这些统计数据对过滤 Parquet 的 row groups 无效(当 ETL 作业写入 Parquet 文件时列数据是无序的)。

在过去一年时间里,自己在 Spark 中添加了一系列的优化措施来提升 Parquet 的下推性能。我们开发了一个名为 LocalSort 的特性,在写 Parquet 文件时通过对一些列添加一个排序步骤,从而可以利用这些统计数据明显区分 Parquet Row groups,并提高压缩比(根据历史查询自动进行,不需要修改ETL作业)。此外,我们开发了一个名为 Prewhere 的特性。Prewhere parquet reader 从下推过滤器中选择低开销的列,以批处理方式来读取这些列的数据,并使用下推过滤器过滤数据,同时跳过其他不需要的列。这些努力的直接结果是,我们实现了平均 30% 的查询改进,40%的存储改进,而开销只有5%。

这篇文章将深入介绍 LocalSort 和 Prewhere,同时介绍 LocalSort/Prewhere 都有哪些用户场景,最后也会介绍一些基于历史查询自动建议对列进行排序的相关工作。

本次分享的主题就下面三个:

  • Spark SQL 在字节跳动的使用;
  • Spark 是如何读取 Parquet 文件的;
  • 字节跳动是优化 Parquet Filter Pushdown 和 Parquet Reader 的

字节跳动的 Spark SQL 使用情况

Improving
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

这部分内容其实在上一篇文章有提过 《物化列:字节为解决 Spark 嵌套列查询性能低下的优化》。在字节超过 98% 的 ETL 作业是用 Spark SQL 进行的。Parquet 是数据仓库的默认文件格式,Parquet 向量化读取默认也是启用的通过 spark.sql.parquet.enableVectorizedReader 参数启用。

Spark 是如何读取 Parquet 文件的

首先我们来回滚一下 Parquet 文件的格式。下面是从 Parquet 官方网站拷贝过来的 Parquet 文件格式。

Improving Spark SQL Performance by 30%: How We Optimize Parquet Filter Pushdown and Parquet Reader
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

从上图可以看出 Parquet 格式的文件包含多个 Row Group 和一个 Footer。Parquet 文件里面包含了许多有用的信息,比如统计信息(Statistics),Spark 和 Parquet 可以使用 Footer 和 Row Group 里面的统计信息来进行过滤下推。比如某个 Row Group 的 id 列最大值为 10,当我们查询 id > 20 的时候,就可以利用统计信息过滤掉这个 Row Group。

由于 Parquet 格式的特点,所以在 Parquet 中做列裁减(column pruning)是非常容易实现的;同时 Parquet 里面对数据的压缩也是非常容易做的。
接下来我们来看下 Spark SQL 中读取 Parquet 文件的过程。

Improving Spark SQL Performance by 30%: How We Optimize Parquet Filter Pushdown and Parquet Reader
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

整个过程的入口点是 DataSourceScanExecution,然后是 ParquetFileFormat,最后是 VectorizedParquetReactorReader。VectorizedParquetReactorReader 是读取 Parquet 文件非常重要的类,VectorizedParquetReactorReader 可以通过把算子下推转换成 ParquetFilters 来过滤掉一些无用的 Row Group。

同时,这个类为每个目标列构建 column Readers,而这些 column Readers 是一起以批量的形式读取数据的。比如我们想读取三列的数据,VectorizedParquetReactorReader 会为我们构造三个 column Readers。

Improving Spark SQL Performance by 30%: How We Optimize Parquet Filter Pushdown and Parquet Reader
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

假设我们的查询 SQL 为 select * fromtable_name where date = '***' and category = 'test'。在这个例子中,date 这列是分区列,而 category 是 predicate column。

上图中表格里面假设是 Parquet 文件里面每个 Row Group 分布情况。由于 RowGroup1 中 category 的最小值为 a1,最大值为 z1,所以在执行上面的 SQL 查询时需要把这个 Row Group 的数据读出来,因为 category = 'test' 包含在其中。同理,RowGroup2、RowGroup3 也需要读出来。所以在这种情况下 Spark 需要把 Parquet 文件中的三个 Row Group 都读出来,因为这三个 Row Group 的统计信息是过滤不了数据的。

在一些生产环境下,Parquet filter pushdown 不能很好的工作,因为 Parquet 中的 predicate columns 是乱序的。所以如果对一些比较常用的过滤列进行排序是非常有用的,以此来减少数据读取的 IO。

下面我们来看下另外一个例子。假设我们的查询 SQL 为 select col1 fromtable_name where date = ‘***’ and col2 = ‘test’。

Improving Spark SQL Performance by 30%: How We Optimize Parquet Filter Pushdown and Parquet Reader
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

在这个例子中,RowGroup1 是被过滤下推过滤了;col3 是被列裁剪过滤了。所以在向量化读取的时候 col1 和 col2 是被读取的。在很多情况下,如果 col2=‘test’ 过滤的数据非常高,那么 col1 的大部分数据都是不必要读取的。所以在查询时首先通过 filter column(col2=‘test’)来读取和过滤数据,然后再读取其他列是非常值得的做法。

字节跳动是优化 Parquet Filter Pushdown 和 Parquet Reader 的

Improving Spark SQL Performance by 30%: How We Optimize Parquet Filter Pushdown and Parquet Reader
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

首先字节的同学发现 Parquet 文件中 RowGroup 的统计信息是不能简单过滤数据的,所以他们希望提高 Parquet 统计信息的准确性。对于用户而言,他们的目标也是很明确的:低负载;所以过滤整个数据是非常昂贵的,甚至是不可能的。所以只能对某些列进行排序,但是新增的排序不能要求用户对现有的 ETL 做任何修改。

基于上面的原因,字节开发出 LocalSort 功能,也就是在 InsertIntoHiveTable 节点之前添加一个 SortExec 节点。通过这个特性,Spark 可以对 Parquet 中的某些列进行排序,以此来提高 Parquet 文件中统计信息的准确性。

Improving Spark SQL Performance by 30%: How We Optimize Parquet Filter Pushdown and Parquet Reader
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

上图是添加了 LocalSort 功能之后 Spark 的执行计划对比,可以看到,InsertIntoHiveTable 之前添加了 SortExec 节点。

那么问题来了,哪些列应该排序?字节的策略是分析历史的查询,选择那些在过滤条件中经常使用的列进行排序。同时也可以通过在表的属性里配置排序列,Spark SQL 会自动读取这些属性。所有的排序过程都是自动的,不需要用户做任何改变,从而满足用户的需求。

Improving Spark SQL Performance by 30%: How We Optimize Parquet Filter Pushdown and Parquet Reader
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

有了 LocalSort 之后,RowGroup 的信息就是准确的。比如上面 RowGroup1 中 category 最小值为 a1;最大值为 g1。根据上面的准确统计信息,在执行 select col1 fromtable_name where date = ‘***’ and col2 = ‘test’ 查询时,只需要读取 RowGroup2。这就使得我们可以读取更少的数据,由于相同的数据存放在一起,所以进一步导致 Parquet 文件的存储大小也变小了。而这些好处仅仅需要付出 5% 的代价,所以还是很值得的。

上面就是 Parquet filter pushdown 的优化细节。下面我们来看下 Parquet Reader 相关的优化。

Improving Spark SQL Performance by 30%: How We Optimize Parquet Filter Pushdown and Parquet Reader
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

我们发现 Spark 读取了很多无用的数据,所以想尽可能的读取上一点的数据。为了解决这个问题,字节的同学开发了名为 Prewhere 的功能,这个思想来自 ClickHouse。在 Prewhere 中,首先批量读取过滤的列,如果过滤的列没有匹配,那么其他列直接跳过。那么字节他们是如何实现的呢?

Improving Spark SQL Performance by 30%: How We Optimize Parquet Filter Pushdown and Parquet Reader
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

字节团队他们把 Parquet reader 拆分成两个 Reader:FilterReader 和 NonFilterReader。FilterReader 用于过滤列;而 NonFilterReader 用于其他列的读取。

整个过程如下:首先使用 FilterReader 来批量读取数据,然后对读取的数据使用过滤条件,如果这些数据没有符合过滤条件,则继续使用 FilterReader 读取数据。如果有数据符合过滤条件,则使用 NonFilterReader 批量的读取需要的数据同时跳过那些无用的数据,最后读取出来的数据会和 FilterReader 读取的数据进行 union 操作。

Improving Spark SQL Performance by 30%: How We Optimize Parquet Filter Pushdown and Parquet Reader
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

经过这个操作,一些场景下可以直接跳过 Parquet page,甚至是直接跳过读取 Parquet RowGroup。

Improving Spark SQL Performance by 30%: How We Optimize Parquet Filter Pushdown and Parquet Reader
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

在字节,生产环境下支持的过滤列类型有 ByteType、ShortType、IntegerType、 LongType、FloatType、 DoubleType 以及 StringType。支持的类型有 > 、>=、<、<=、=、in、isnull 以及 isnotnull。通过这个特点 Parquet 文件的读取性能在很多场景下提升了 13%。

本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Spark SQL 查询 Parquet 文件的性能提升 30%,字节是如何做到的?】(https://www.iteblog.com/archives/9912.html)
喜欢 (4)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(2)个小伙伴在吐槽
  1. 为啥不考虑用orc 格式呢 有什么说法吗 大神

    SuperMe2020-12-22 10:29 回复
    • 应该是 Parquet 在 spark 里面的优话比较多吧。orc 应该也可以做类似的优化。

      w3970907702020-12-25 13:22 回复