欢迎关注Hadoop、Spark、Flink、Hive、Hbase、Flume等大数据资料分享微信公共账号:iteblog_hadoop
  1. 文章总数:978
  2. 浏览总数:11,958,241
  3. 评论:3937
  4. 分类目录:106 个
  5. 注册用户数:6120
  6. 最后更新:2018年12月15日
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
iteblog_hadoop
大数据技术博客公众号bigdata_ai
大数据猿:
bigdata_ai

Spark Streaming kafka实现数据零丢失的几种方式

  在使用Spark streaming消费kafka数据时,程序异常中断的情况下发现会有数据丢失的风险,本文简单介绍如何解决这些问题。

  在问题开始之前先解释下流处理中的几种可靠性语义:

  1、At most once - 每条数据最多被处理一次(0次或1次),这种语义下会出现数据丢失的问题;
  2、At least once - 每条数据最少被处理一次 (1次或更多),这个不会出现数据丢失,但是会出现数据重复;
  3、Exactly once - 每条数据只会被处理一次,没有数据会丢失,并且没有数据会被多次处理,这种语义是大家最想要的,但是也是最难实现的。


如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop

Kafka高级API

  如果不做容错,将会带来数据丢失,因为Receiver一直在接收数据,在其没有处理的时候(已通知zk数据接收到),Executor突然挂掉(或是driver挂掉通知executor关闭),缓存在内存中的数据就会丢失。因为这个问题,Spark1.2开始加入了WAL(Write ahead log)开启 WAL,将receiver获取数据的存储级别修改为StorageLevel.MEMORY_AND_DISK_SER,使用代码片段如下:

/*
 * User: 过往记忆
 * Date: 2016年07月26日
 * Time: 下午23:16
 * bolg: https://www.iteblog.com
 * 本文地址:https://www.iteblog.com/archives/1716
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
*/
val conf = new SparkConf()
conf.set("spark.streaming.receiver.writeAheadLog.enable","true")
val sc= new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(5))
ssc.checkpoint("checkpoint")
val lines = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2)

但是开启WAL后,依旧存在数据丢失问题,即使按官方说的设置了WAL,依旧会有数据丢失,这是为什么?因为在任务中断时receiver也被强行终止了,将会造成数据丢失,提示如下:

ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
WARN BlockGenerator: Cannot stop BlockGenerator as its not in the Active state [state = StoppedAll]
WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer queue interrupted.

在Streaming程序的最后添加代码,只有在确认所有receiver都关闭的情况下才终止程序。我们可以调用StreamingContext的stop方法,其原型如下:

def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit

可以如下使用:

sys.addShutdownHook({
  ssc.stop(true,true)
)})

WAL带来的问题

WAL实现的是At-least-once语义。如果在写入到外部存储的数据还没有将offset更新到zookeeper就挂掉,这些数据将会被反复消费。同时,因为需要把数据写入到可靠的外部系统,这会牺牲系统的整个吞吐量。

Kafka Direct API

  Kafka direct API 的运行方式,将不再使用receiver来读取数据,也不用使用WAL机制。同时保证了exactly-once语义,不会在WAL中消费重复数据。不过需要自己完成将offset写入zk的过程。调用方式可以参见下面:

/*
 * User: 过往记忆
 * Date: 2016年07月26日
 * Time: 下午23:16
 * bolg: https://www.iteblog.com
 * 本文地址:https://www.iteblog.com/archives/1716
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
*/
messages.foreachRDD(rdd=>{
   val message = rdd.map(_._2)  
   //对数据进行一些操作
   message.map(method)
   //更新zk上的offset (自己实现)
   updateZKOffsets(rdd)
})

  更详细地关于Spark Kafka可靠性保证可以参见《Spark Streaming和Kafka整合是如何保证数据零丢失》

本博客文章除特别声明,全部都是原创!
转载本文请加上:转载自过往记忆(https://www.iteblog.com/)
本文链接: 【Spark Streaming kafka实现数据零丢失的几种方式】(https://www.iteblog.com/archives/1716.html)
喜欢 (15)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(3)个小伙伴在吐槽
  1. 博主,咨询点问题:问题1: Kafka Direct API方式 偏移是存储在checkpoint中对吧(是否手动存储到zookeeper跟spark消费进度无关对吧),我本地跑的代码 每次kill掉之后,再次重启都会存在一个批次数据的重复消费?这事什么问题?问题2:今天跑任务时候发现,我从昨天的checkpoint中恢复ssc之后,打印DStream,会把昨天消费的数据也打印出来!?(我的目的其实就是想从checkpoint恢复ssc之后继续消费而已)?
    欢乐豆2016-12-02 19:49 回复
  2. 即使是自己维护offset,那么在数据处理完后将offset写入zk前出问题了,这批数据还是会被重新消费的吧?
    spoofer2016-08-18 08:23 回复
    • 那肯定啊,要保证数据只被存储一次,需要自己做很多处理的,光靠Spark还是不行。
      w3970907702016-08-18 09:27 回复