欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

Apache Spark 3.1 中 Structured Streaming 方面的改进

Apache Spark 3.1.x 版本发布到现在已经过了两个多月了,这个版本继续保持使得 Spark 更快,更容易和更智能的目标,Spark 3.1 的主要目标如下:

  • 提升了 Python 的可用性;
  • 加强了 ANSI SQL 兼容性;
  • 加强了查询优化;
  • Shuffle hash join 性能提升;
  • History Server 支持 structured streaming

更多详情请参见这里。在这篇博文中,我们总结了3.1版本中 Spark Streaming 的显著改进,包括新的流式表(streaming table)API、支持 stream-stream join 和多个 UI 增强。此外,模式验证(schema validation)和对 Apache Kafka 数据源的改进提供了更好的可用性。此外,FileStream source/sink 也进行了各种增强,以提高读/写性能。

新的流式表 API

启动 structured stream 时,连续数据流被认为是无界表(unbounded table)。 因此,Table APIs 提供了一种更自然、更方便的方法来处理流查询。 在 Spark 3.1 中,社区添加了对 DataStreamReader 和 DataStreamWriter 的支持。 我们现在可以直接以表的形式使用这个 API 读取和写入流式 DataFrames。 请参见下面的示例:

# Create a streaming DataFrame
src = spark.readStream.format("rate").option("rowPerSecond", 10).load()

# Write the streaming DataFrame to a table
src.writeStream.option("checkpointLocation", checkpointLoc1).toTable("myTable")

# Check the table result
spark.read.table("myTable").show(truncate=30)
+-----------------------+-----+
|timestamp              |value|
+-----------------------+-----+
|2021-01-19 07:45:23.122|42   |
|2021-01-19 07:45:23.222|43   |
|2021-01-19 07:45:23.322|44   |
...

此外,通过这些新功能,用户可以转换源数据集并写入到一张新表:

# Write to a new table with transformation
spark.readStream.table("myTable").select("value") \
  .writeStream.option("checkpointLocation", checkpointLoc2) \
  .format("parquet").toTable("newTable")

# Check the table result
spark.read.table("newTable").show()
+-----+
|value|
+-----+
| 1214|
| 1215|
| 1216|
...

Databricks 推荐在 streaming table APIs 中使用 Delta Lake 格式,因为这种格式将带来以下好处:

  • 并发压缩由低延迟场景产生的小文件;
  • 多个流作业(或并发批处理作业)支持“仅且一次”(exactly-once)处理;
  • 当使用文件作为流的源时,可以有效地发现哪些文件是新的。

stream-stream 支持更多 Join 类型

在 Spark 3.1 之前,stream-stream join 只支持 inner、left outer 以及 right outer joins。在最新的版本中,社区实现了完整的 full outer 以及 left semi stream-stream join,使 Structured Streaming 支持更多的场景。

Kafka 数据源性能提升

在 Spark 3.1 中,社区已经将 Kafka 依赖升级到 2.6.0 (SPARK-32568),这使得用户可以迁移到 Kafka offsets retrieval 新的 API(AdminClient.listOffsets)。它解决了使用旧版本时 Kafka 连接器无限等待的问题 (SPARK-28367)。

模式校验

模式是 Structured Streaming 查询的基本信息。在 Spark 3.1 中,社区为用户输入的模式和内部存储的模式添加了模式验证逻辑:

在查询重启中引入状态模式验证(SPARK-27237)

通过此更新,键和值的模式将存储在 stream 启动时的模式文件(schema files)中。然后,在重新启动查询时,根据现有的键和值模式验证新的键和值模式的兼容性。当字段的数量相同且每个字段的数据类型相同时,状态模式被认为是“兼容的”。注意,这里不会检查字段名,因为 Spark 允许重命名。

这将阻止使用不兼容状态模式的查询运行,从而减少不确定性行为的概率,并提供在错误的时候更多的信息。

为流状态存储引入模式验证(SPARK-31894)

以前,Structured Streaming 直接将检查点(用UnsafeRow表示)放到 StateStore 中,而不需要任何模式验证。当升级到新的 Spark 版本时,检查点文件将被重用。如果没有模式验证,任何与聚合函数相关的更改或 bug 修复都可能导致随机异常,甚至产生错误的结果(参见 SPARK-28067)。现在 Spark 将检验检查点里面的模式,并在迁移过程中重用检查点时抛出 InvalidUnsafeRowException。

Structured Streaming UI 方面的加强

社区在 Spark 3.0 中引入了新的 Structured Streaming UI。在 Spark 3.1 中,社区在 Structured Streaming UI 中添加了对历史记录服务器的支持(Structured Streaming UI(),以及更多关于 streaming 运行时状态的信息,具体如下:

Structured Streaming UI 中的状态信息 (SPARK-33223)

状态信息中添加了四个度量信息:

  1. Aggregated Number Of Total State Rows
  2. Aggregated Number Of Updated State Rows
  3. Aggregated State Memory Used In Bytes
  4. Aggregated Number Of State Rows Dropped By Watermark

有了这些指标,我们就可以了解状态存储的整体情况。而且根据这些信息我们还可以评估是否需要扩容。


如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:过往记忆大数据

Structured Streaming UI 中 Watermark gap 信息 (SPARK-33224)

Watermark 是状态查询中用户需要跟踪的主要指标之一。它定义了附加模式(append mode)的输出“何时”发出,因此知道 wall clock 和水印(输入数据)之间的差距对于设置输出期望非常有帮助。


如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:过往记忆大数据

Structured Streaming UI 中自定义指标信息(SPARK-33287)

下面显示了在配置 spark.sql.streaming.ui.enabledCustomMetricList 中设置的自定义度量信息:


如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:过往记忆大数据

FileStreamSource/Sink 方面的加强

FileStreamSource/Sink 主要有以下几个方面的加强。

Cache fetched list of files beyond maxFilesPerTrigger as unread files (SPARK-30866)

以前,当设置了 maxFilesPerTrigger 配置时,FileStreamSource 将获取所有可用的文件,根据配置处理有限数量的文件,并在每个微批处理时忽略其他文件。通过这个改进,它将缓存以前批次中获取的文件,并在接下来的批次中处理它们。

Streamline the logic on file stream source and sink metadata log (SPARK-30462)

在此更改之前,每当需要 FileStreamSource/Sink 中的元数据时,元数据日志中的所有信息都被反序列化到 Spark 驱动程序的内存中。通过这个更改,Spark 将尽可能以流式(streamlined)的方式读取和处理元数据日志。

Provide a new option to have retention on output files (SPARK-27188)

FileStreamSink 中有一个新选项用于配置元数据日志文件的保留,这有助于限制长时间运行的 Structured Streaming 查询的元数据日志文件大小的增长。

未来的计划

在下一个主要版本中,社区将继续关注 Spark Structured Streaming 的新功能、性能和可用性改进。如果大家在使用过程中有任何方面的问题可以直接到社区反馈。

本文翻译自:What’s New in Apache Spark™ 3.1 Release for Structured Streaming

本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Apache Spark 3.1 中 Structured Streaming 方面的改进】(https://www.iteblog.com/archives/9958.html)
喜欢 (2)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!