欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

Spark Streaming和Kafka整合开发指南(二)

  在本博客的《Spark Streaming和Kafka整合开发指南(一)》文章中介绍了如何使用基于Receiver的方法使用Spark Streaming从Kafka中接收数据。本文将介绍如何使用Spark 1.3.0引入的Direct API从Kafka中读数据。

  和基于Receiver接收数据不一样,这种方式定期地从Kafka的topic+partition中查询最新的偏移量,再根据定义的偏移量范围在每个batch里面处理数据。当作业需要处理的数据来临时,spark通过调用Kafka的简单消费者API读取一定范围的数据。这个特性目前还处于试验阶段,而且仅仅在Scala和Java语言中提供相应的API。

  和基于Receiver方式相比,这种方式主要有一些几个优点:
  (1)、简化并行。我们不需要创建多个Kafka 输入流,然后union他们。而使用directStream,Spark Streaming将会创建和Kafka分区一样的RDD分区个数,而且会从Kafka并行地读取数据,也就是说Spark分区将会和Kafka分区有一一对应的关系,这对我们来说很容易理解和使用;

  (2)、高效。第一种实现零数据丢失是通过将数据预先保存在WAL中,这将会复制一遍数据,这种方式实际上很不高效,因为这导致了数据被拷贝两次:一次是被Kafka复制;另一次是写到WAL中。但是本文介绍的方法因为没有Receiver,从而消除了这个问题,所以不需要WAL日志;

  (3)、恰好一次语义(Exactly-once semantics)。《Spark Streaming和Kafka整合开发指南(一)》文章中通过使用Kafka高层次的API把偏移量写入Zookeeper中,这是读取Kafka中数据的传统方法。虽然这种方法可以保证零数据丢失,但是还是存在一些情况导致数据会丢失,因为在失败情况下通过Spark Streaming读取偏移量和Zookeeper中存储的偏移量可能不一致。而本文提到的方法是通过Kafka低层次的API,并没有使用到Zookeeper,偏移量仅仅被Spark Streaming保存在Checkpoint中。这就消除了Spark Streaming和Zookeeper中偏移量的不一致,而且可以保证每个记录仅仅被Spark Streaming读取一次,即使是出现故障。

  但是本方法唯一的坏处就是没有更新Zookeeper中的偏移量,所以基于Zookeeper的Kafka监控工具将会无法显示消费的状况。然而你可以通过Spark提供的API手动地将偏移量写入到Zookeeper中。如何使用呢?其实和方法一差不多


如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop

  1、引入依赖。

  对于Scala和Java项目,你可以在你的pom.xml文件引入以下依赖:

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka_2.10</artifactId>
      <version>1.3.0</version>
    </dependency>

  如果你是使用SBT,可以这么引入:

libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.3.0"

  2、编程

  在Streaming应用程序代码中,引入KafkaUtils ,并创建DStream输入流:

 import org.apache.spark.streaming.kafka._

 val directKafkaStream = KafkaUtils.createDirectStream[
     [key class], [value class], [key decoder class], [value decoder class] ](
     streamingContext, [map of Kafka parameters], [set of topics to consume])

  在 Kafka parameters参数中,你必须指定 metadata.broker.list或者bootstrap.servers参数。在默认情况下,Spark Streaming将会使用最大的偏移量来读取Kafka每个分区的数据。如果你配置了auto.offset.reset为smallest,那么它将会从最小的偏移量开始消费。

  当然,你也可以使用KafkaUtils.createDirectStream的另一个版本从任意的位置消费数据。如果你想回去每个batch中Kafka的偏移量,你可以如下操作:

 directKafkaStream.foreachRDD { rdd => 
     val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
     // offsetRanges.length = # of Kafka partitions being consumed
     ...
 }

你可以通过这种方式来手动地更新Zookeeper里面的偏移量,使得基于Zookeeper偏移量的Kafka监控工具可以使用。

  还有一点需要注意,因为这里介绍的方法没有使用到Receiver,所以Spark中关于spark.streaming.receiver.*相关的配置参数将不会对创建DStreams 有影响。我们可以使用spark.streaming.kafka.*参数进行配置。

  3、部署

  对应任何的Spark 应用,我们都是用spark-submit来启动你的应用程序,对于Scala和Java用户,如果你使用的是SBT或者是Maven,你可以将spark-streaming-kafka_2.10及其依赖打包进应用程序的Jar文件中,并确保spark-core_2.10和 spark-streaming_2.10标记为provided,因为它们在Spark 安装包中已经存在:

<dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.10</artifactId>
          <version>1.3.0</version>
          <scope>provided</scope>
</dependency>

<dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.10</artifactId>
          <version>1.3.0</version>
          <scope>provided</scope>
</dependency>

然后使用spark-submit来启动你的应用程序。

  本文翻译自:https://spark.apache.org/docs/latest/streaming-kafka-integration.html
本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Spark Streaming和Kafka整合开发指南(二)】(https://www.iteblog.com/archives/1326.html)
喜欢 (26)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(1)个小伙伴在吐槽
  1. 1.direct方式结合kafka上看消费总延迟情况
    2.顺便存一份到zk,然后再基于zk的方式监控运行?

    海东青2020-12-29 15:29 回复