欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

为什么Spark Streaming + Kafka很难保证exactly once?

Streaming job 的调度与执行

  我们先来看看如下 job 调度执行流程图:


如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop

为什么很难保证 exactly once

  上面这张流程图最主要想说明的就是,job 的提交执行是异步的,与 checkpoint 操作并不是原子操作。这样的机制会引起数据重复消费问题:

  为了简化问题容易理解,我们假设一个 batch 只生成一个 job,并且 spark.streaming.concurrentJobs 值为1,该值代表 jobExecutor 线程池中线程的个数,也即可以同时执行的 job 的个数。

  假设,batch duration 为2s,一个 batch 的总共处理时间为1s,此时,一个 batch 开始了,第一步生成了一个 job,假设花了0.1s,然后把该 job 丢到了 jobExecutor 线程池中等待调度执行,由于 checkpoint 操作和 job 在线程池中执行是异步的,在0.2s 的时候,checkpoint 操作完成并且此时开始了 job 的执行。

  注意,这个时候 checkpoint 完成了并且该 job 在 checkpoint 中的状态是未完成的,随后在第1s 的时候 job 完成了,那么在这个 batch 结束的时候 job 已经完成了但该 job 在 checkpoint 中的状态是未完成的。

  在下一个 batch 运行到 checkpoint 之前就挂了(比如在拉取数据的时候挂了、OOM 挂了等等异常情况),driver 随后从 checkpoint 中恢复,那么上述的 job 依然是未执行的,根据使用的 api 不同,对于这个 job 会再次拉取数据或从 wal 中恢复数据重新执行该 job,那么这种情况下该 job 的数据就就会被重复处理。比如这时记次的操作,那么次数就会比真实的多。

  如果一个 batch 有多个 job 并且spark.streaming.concurrentJobs大于1,那么这种情况就会更加严重,因为这种情况下就会有多个 job 已经完成但在 checkpoint 中还是未完成状态,在 driver 重启后这些 job 对应的数据会被重复消费处理。

  另一种会导致数据重复消费的情况主要是由于 Spark 处理的数据单位是 partition 引起的。比如在处理某 partition 的数据到一半的时候,由于数据内容或格式会引起抛异常,此时 task 失败,Spark 会调度另一个同样的 task 执行,那么此时引起 task 失败的那条数据之前的该 partition 数据就会被重复处理,虽然这个 task 被再次调度依然会失败。若是失败还好,如果某些特殊的情况,新的 task 执行成功了,那么我们就很难发现数据被重复消费处理了。

如何保证 exactly once

  至于如何才能保证 exactly once,其实要根据具体情况而定。总体来说,可以考虑以下几点:

 1、业务是否不能容忍即使是极少量的数据差错,如果是那么考虑 exactly once。如果可以容忍,那就没必要非实现 exactly once 不可
 2、即使重复处理极小部分数据会不会对最终结果产生影响。若不会,那重复处理就重复吧,比如排重统计
 3、若一定要保证 exactly once,应该考虑将对 partition 处理和 checkpoint或自己实现类似 checkpoint 功能的操作做成原子的操作;并且对 partition 整批数据进行类似事物的处理

本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【为什么Spark Streaming + Kafka很难保证exactly once?】(https://www.iteblog.com/archives/1795.html)
喜欢 (12)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(5)个小伙伴在吐槽
  1. 写了详细的原理: http://blog.csdn.net/cymvp/article/details/52605987

    沙皮猪2016-09-21 19:16 回复
  2. 作者,你对Kafka解决Spark Streaming的exactly once的理解有误。
    Exactly once的实现,需要整体系统三模块的保证: 输入源 --> Spark Streaming计算 ---> 输出结果;
    而Kafka的direct API, 解决的是"属于源"的exactly once.
    Spark Streaming部分的exactly once, 使用WAL保证(注意我没有提checkpoint和replica, 因为这两个failover机制,并不是专门解决exactly once这个问题的),所以你文中提到的checkpoint,也并不在点上。
    具体WAL如何保证exactly once,需要你了解源码的逻辑,我简单说下:
    1 Spark Straming部分,对输入源数据的处理,分为两部分:
    (1) addBlock: 将输入数据保存转化为block,并保存;
    (2) allocateBlockToBatch(抱歉具体方法名我忘了,但意思是这个): 将当前所有未处理的block,分给batch,然后删除所有buffer中的block;
    2 用分给本次batch的所有数据进行job计算;
    其中第一步的2个小步骤,都会进行WAL操作, 简单说,WAL的保存数据是这样:
    addblock1 --> addblock2 --> 将所有block分给batch然后删除所有block --> addblock3 --> block4....
    这样任何一个阶段driver崩溃再恢复的时候,根据WAL,就可以恢复当时的数据;
    但这里仍无法解决一个问题,就是如果job运行一半(比如已经写了一部分到结果数据库中),driver崩溃,再恢复时,这个job会重新运行,但是上一次运行一半的job已经写了部分到数据库了。
    解决这个问题,就需要"输出结果"的操作,是幂等的,这就不是Spark Streaming解决的问题了,需要应用程序自己来保证。
    总结一下:
    输入源对于Exactly Once需要实现的是: 崩溃前一部分数据已经作为block1输入到Spark了, 那么崩溃后恢复,这部分数据不能再输入到spark中;
    Spark Streaming对于Exactly Once需要实现的是: 持久化输入数据,和分给Batch job的数据, 这两部分的持久化一步都不能少,这其实也是本质上造成必须有Kafka Direct API的原因,因为流入数据和将数据分给Batch,是分离的两个步骤,没有事务化;
    输出对于Exactly Once需要实现的是: In order to achieve exactly-once semantics for output of your results, your output operation that saves the data to an external data store must be either idempotent, or an atomic transaction that saves results and offsets (see Semantics of output operations in the main programming guide for further information).

    发现了么,这全程和checkpoint完全没有什么关系。

    沙皮猪2016-09-21 11:01 回复
    • Exactly once的实现,需要整体系统三模块的保证: 输入源 --> Spark Streaming计算 ---> 输出结果; 这句话没错。
      但是Spark Streaming部分的exactly once, 使用WAL保证这个是不对的,使用WAL机制只能做到at-least once semantics,要做到exactly once你得结合Kafka Direct API。
      你上面谈到Kafka Direct API和Spark的WAL机制,这两个能一起用?WAL机制是保证接收到的数据不会在Exectuor挂掉的时候丢失,并不能保证exactly once;而且就算能保证exactly once语义,你不结合checkpoint中的信息你能知道哪个batch是分配到哪个Job?

      w3970907702016-09-22 09:42 回复
      • 如果输入源解决了"输入源"端的exactly once, 那么单纯的WAL, 就可以保证exactly once.
        之所以不需要Kafka Direct API和WAL一起使用, 是因为Kafka Direct API实现了"输入源"和Spark Streaming计算框架这2部分的exactly once,所以不需要WAL了。
        真正的数据是保存在Executor端的WAL操作中的,而数据逻辑及顺序是保存在Driver的WAL中的.
        CheckPoint的信息其实基本都是环境信息:
        val master = ssc.sc.master
        val framework = ssc.sc.appName
        val jars = ssc.sc.jars
        val graph = ssc.graph
        val checkpointDir = ssc.checkpointDir
        val checkpointDuration = ssc.checkpointDuration
        val pendingTimes = ssc.scheduler.getPendingTimes().toArray
        val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
        val sparkConfPairs = ssc.conf.getAll

        沙皮猪2016-09-26 23:43 回复
      • 而且其实是job分配给batch,每一个outputDStream,都对应一个job. 这个job是Sreaming概念的job,非RDD的job.

        沙皮猪2016-09-26 23:49 回复