欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

Apache Spark 2.4 内置的 Avro 数据源介绍

Apache Avro 是一种流行的数据序列化格式。它广泛用于 Apache Spark 和 Apache Hadoop 生态系统,尤其适用于基于 Kafka 的数据管道。从 Apache Spark 2.4 版本开始Spark 为读取和写入 Avro 数据提供内置支持。新的内置 spark-avro 模块最初来自 Databricks 的开源项目Avro Data Source for Apache Spark。除此之外,它还提供以下功能:

  • 新函数 from_avro()to_avro() 用于在 DataFrame 中读取和写入 Avro 数据,而不仅仅是文件。
  • 支持 Avro 逻辑类型(logical types),包括 Decimal,Timestamp 和 Date类型。
  • 2倍读取吞吐量提高和10%写入吞吐量提升。

本文将通过示例介绍上面的每个功能。

加载和保存函数

在 Apache Spark 2.4 中,为了读写 Avro 格式的数据,你只需在 DataFrameReaderDataFrameWriter 中将文件格式指定为“avro”即可。其用法和其他数据源用法很类似。如下所示:

val iteblogDF = spark.read.format("avro").load("examples/src/main/resources/iteblog.avro")
iteblogDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")

from_avro() 和 to_avro() 的使用

为了进一步简化数据转换流程(transformation pipeline),社区引入了两个新的内置函数:from_avro()to_avro()。Avro 通常用于序列化/反序列化基于 Apache Kafka 的数据管道中的消息或数据,在读取或写入 Kafka 时,将 Avro records 作为列将非常有用。每个 Kafka 键值记录都会增加一些元数据,例如 Kafka 的摄取时间戳,Kafka 的偏移量等。

在以下三种场景,from_avro()to_avro() 函数将非常有用:

  • 当使用 Spark 从 Kafka 中读取 Avro 格式的数据,可以使用 from_avro() 函数来抽取你要的数据,清理数据并对其进行转换。
  • 当你想要将 structs 格式的数据转换为 Avro 二进制记录,然后将它们发送到 Kafka 或写入到文件,你可以使用 to_avro()
  • 如果你需要将多个列重新编码为单个列,请使用to_avro().

目前这两个函数仅在 Scala 和 Java 语言中可用。from_avroto_avro 函数的使用除了需要人为指定 Avro schema,其他的和使用 from_jsonto_json 函数一样,下面是这两个函数的使用示例。

在代码里面指定 Avro 模式

import org.apache.spark.sql.avro._
import org.apache.avro.SchemaBuilder

val servers = "www.iteblog.com:9092"
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save them to a Kafka topic.
iteblogDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .save()

通过 Schema Registry 服务提供 Avro 模式

如果我们有 Schema Registry 服务,那么我们就不需要在代码里面指定 Avro 模式了,如下:

import org.apache.spark.sql.avro._

// Read a Kafka topic "t", assuming the key and value are already
// registered in Schema Registry as subjects "t-key" and "t-value" of type
// string and int. The binary key and value columns are turned into string
// and int type with Avro and Schema Registry. The schema of the resulting DataFrame
// is: <key: string, value: int>.
val schemaRegistryAddr = "https://www.iteblog.com"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

  // Given that key and value columns are registered in Schema Registry, convert
  // structured data of key and value columns to Avro binary data by reading the schema
  // info from the Schema Registry. The converted data is saved to Kafka as a Kafka topic "t".
  iteblogDF
    .select(
      to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
      to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .save()

通过文件设置 Avro 模式

我们还可以将 Avro 模式写入到文件里面,然后在代码里面读取模式文件:

import org.apache.spark.sql.avro._

// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "iteblog1")
  .load()

// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
  .select(from_avro('value, jsonFormatSchema) as 'user)
  .where("user.favorite_color == \"red\"")
  .select(to_avro($"user.name") as 'value)

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "iteblog2")
  .start()

与 Databricks spark-avro的兼容性

因为 Spark 内置对读写 Avro 数据的支持是从 Spark 2.4 才引入的,所以在这些版本之前,可能有用户已经使用了 Databricks 开源的 spark-avro。但是不用急,内置的 spark-avro 模块和这个是完全兼容的。我们仅仅需要将之前引入的 com.databricks.spark.avro 修改成 org.apache.spark.sql.avro._ 即可。

性能测试

基于 SPARK-24800 的优化,内置 Avro 数据源读写 Avro 文件的性能得到很大提升。社区在这方面进行了相关的基准测试,结果表明,在1百万行的数据(包含 Int/Double/String/Map/Array/Struct 等各种数据格式)测试中,读取的性能提升了2倍,写的性能提升了8%。基准测试的代码可参见 这里,测试比较如下:


如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop

结论

内置的 spark-avro 模块为 Spark SQL 和 Structured Streaming 提供了更好的用户体验以及 IO 性能。

参考文档:Apache Avro as a Built-in Data Source in Apache Spark 2.4

本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Apache Spark 2.4 内置的 Avro 数据源介绍】(https://www.iteblog.com/archives/2476.html)
喜欢 (9)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!