欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

Structured Streaming和Kafka 0.8\0.9整合开发

  流式处理是大数据应用中的非常重要的一环,在Spark中Spark Streaming利用Spark的高效框架提供了基于micro-batch的流式处理框架,并在RDD之上抽象了流式操作API DStream供用户使用。

  随着流式处理需求的复杂化,用户希望在流式数据中引入较为复杂的查询和分析,传统的DStream API想要实现相应的功能就变得较为复杂,同时随着Spark的迭代,社区希望使用同一套Structured API来进行流式操作。因此在Spark 2.0中引入了Structured Streaming模块,它使得用户可以使用与批处理同样的Structured API对结构化的流式数据进行处理。

  官方文档就其描述如下:

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data.The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive.

我们可以将流数据当做静态的数据来处理,因为是建立在Spark SQL引擎之上,所以我们可以直接在流数据上使用Dataset/DataFrame API来操作数据。而流数据存储的组件最常使用的是Kafka,但是如果我们去Spark官方文档可以了解到,Structured Streaming只支持Kafka 0.10或更高版本!而现实情况是,使用Kafka 0.8.x和Kafka 0.9.x版本的人很多,更遗憾的是Kafka 0.10并不兼容Kafka 0.8.x和Kafka 0.9.x版本,这也就是说我们不能直接通过官方提供的工具来在Kafka 0.8.x或Kafka 0.9.x版本上使用Structured Streaming!

  针对这个问题,有开发者提了个Issue:SPARK-17343,但是社区的人在SPARK-15406里面讨论决定不开发基于Kafka 0.8.x或Kafka 0.9.x的Structured Streaming相关API。可能这个功能大家太希望有了,所以有人重新开了个Issue来讨论是否要开发基于Kafka 0.8.x版本的API:SPARK-17344,还有人在http://apache-spark-developers-list.1001551.n3.nabble.com/Kafaka-0-8-0-9-in-Structured-Streaming-td19297.html里面提问,官方可能会考虑这个问题。

  不过值得大家高兴的是,Hortonworks公司的赛赛大牛写了个项目使得我们可以在Kafka 0.8上使用Spark Structured Streaming,项目地址:spark-kafka-0-8-sql。本文将介绍如何使用这个类库。

编译

  虽然赛赛告诉我们可以通过--packages com.hortonworks.spark:spark-kafka-0-8-sql_2.11:1.0方式来引入这个包,但是我在maven并没有找到这个包,可能是他们内部的Maven仓库。所以我们可以先到Github下载这个项目,然后使用Maven编译;编译完会在项目的targer目录下生成名为 spark-kafka-0-8-sql_2.11-1.0.jar 包,这就是我们需要的。

使用

为了简便,我直接在spark-shell下进行说明,使用如下:

[iteblog@www.iteblog.com ~]$ bin/spark-shell --master yarn-client --executor-memory 4g --num-executors 10 --queue iteblog --executor-cores 1  --jars spark-kafka-0-8-sql_2.11-1.0.jar

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.2
      /_/
         
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_77)
Type in expressions to have them evaluated.
Type :help for more information.

scala>  import spark.implicits
import spark.implicits

scala> import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.sql.streaming.ProcessingTime

scala> val reader = spark
            .readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "www.iteblog.com:9092")
            .option("startingoffset", "smallest")
            .option("topics", "iteblog")


scala> val kafka = reader.load()
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "CAST(partition AS STRING)", "CAST(offset AS STRING)")
      .writeStream
      .format("console")
      .trigger(ProcessingTime(2000L))
      .start()

-------------------------------------------
Batch: 797
-------------------------------------------
+--------------------+--------------------+--------------------+---------+------+
|                 key|               value|               topic|partition|offset|
+--------------------+--------------------+--------------------+---------+------+
|@4035ffc9bd01a8c3...|             iteblog|             iteblog|        6|450624|
|@68079e17f55331a5...|      iteblog_hadoop|             iteblog|        1|449856|
|@5ff109adb9192987...|             iteblog|             iteblog|        1|449857|
|@83416028d5234b0a...|      iteblog_hadoop|             iteblog|        1|449858|
|@2fcd3cf5f7e1f1e2...|      iteblog_hadoop|             iteblog|        0|451100|
|@40243bf7944ea821...|      iteblog_hadoop|             iteblog|        0|451101|
+--------------------+--------------------+--------------------+---------+------+

我们可以看到,一旦Kafka中有数据,就会在控制窗口输出结果。如果我们是在程序里面使用需要在最后加上 kafka.awaitTermination()

注意

  1、Kafka 0.8的模式是固定的,我们只能在查询里面获取到下面的属性:

StructType(Seq(
    StructField("key", BinaryType),
    StructField("value", BinaryType),
    StructField("topic", StringType),
    StructField("partition", IntegerType),
    StructField("offset", LongType)))

这也就是上面的代码(selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "CAST(partition AS STRING)", "CAST(offset AS STRING)"))里面使用到的,如果是其他Sources,会有其他的属性。

  2、如果是多个主题(topic),可以通过“,”来分割,如下:

scala> val reader = spark
            .readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "www.iteblog.com:9092")
            .option("startingoffset", "smallest")
            .option("topics", "iteblog,topic2,topic3")

注意:赛赛的文档上写需要使用冒号(":")分割是不对的,可能是笔误。
  3、所有和Kafka相关的属性需要使用 kafka. 前缀;
  4、目前这个只支持从 smallest 或者 largest 位置读取Kafka中的数据,不支持其他位置的offset。

本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Structured Streaming和Kafka 0.8.9整合开发】(https://www.iteblog.com/archives/1877.html)
喜欢 (13)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!