文章目录
背景
在默认情况下,Spark Streaming 通过 receivers (或者是 Direct 方式) 以生产者生产数据的速率接收数据。当 batch processing time > batch interval 的时候,也就是每个批次数据处理的时间要比 Spark Streaming 批处理间隔时间长;越来越多的数据被接收,但是数据的处理速度没有跟上,导致系统开始出现数据堆积,可能进一步导致 Executor 端出现 OOM 问题而出现失败的情况。
而在 Spark 1.5 版本之前,为了解决这个问题,对于 Receiver-based 数据接收器,我们可以通过配置 spark.streaming.receiver.maxRate
参数来限制每个 receiver 每秒最大可以接收的记录的数据;对于 Direct Approach 的数据接收,我们可以通过配置 spark.streaming.kafka.maxRatePerPartition
参数来限制每次作业中每个 Kafka 分区最多读取的记录条数。这种方法虽然可以通过限制接收速率,来适配当前的处理能力,但这种方式存在以下几个问题:
- 我们需要事先估计好集群的处理速度以及消息数据的产生速度;
- 这两种方式需要人工参与,修改完相关参数之后,我们需要手动重启 Spark Streaming 应用程序;
- 如果当前集群的处理能力高于我们配置的 maxRate,而且 producer 产生的数据高于 maxRate,这会导致集群资源利用率低下,而且也会导致数据不能够及时处理。
反压机制
那么有没有可能不需要人工干预,Spark Streaming 系统自动处理这些问题呢?当然有了!Spark 1.5 引入了反压(Back Pressure)机制,其通过动态收集系统的一些数据来自动地适配集群数据处理能力。详细的记录请参见 SPARK-7398 里面的说明。
Spark Streaming 1.5 以前的体系结构
在 Spark 1.5 版本之前,Spark Streaming 的体系结构如下所示:
- 数据是源源不断的通过 receiver 接收,当数据被接收后,其将这些数据存储在 Block Manager 中;为了不丢失数据,其还将数据备份到其他的 Block Manager 中;
- Receiver Tracker 收到被存储的 Block IDs,然后其内部会维护一个时间到这些 block IDs 的关系;
- Job Generator 会每隔 batchInterval 的时间收到一个事件,其会生成一个 JobSet;
- Job Scheduler 运行上面生成的 JobSet。
Spark Streaming 1.5 之后的体系结构
- 为了实现自动调节数据的传输速率,在原有的架构上新增了一个名为
RateController
的组件,这个组件继承自StreamingListener
,其监听所有作业的onBatchCompleted
事件,并且基于processingDelay
、schedulingDelay
、当前 Batch 处理的记录条数以及处理完成事件来估算出一个速率;这个速率主要用于更新流每秒能够处理的最大记录的条数。速率估算器(RateEstimator
)可以又多种实现,不过目前的 Spark 2.2 只实现了基于 PID 的速率估算器。 - InputDStreams 内部的
RateController
里面会存下计算好的最大速率,这个速率会在处理完onBatchCompleted
事件之后将计算好的速率推送到ReceiverSupervisorImpl
,这样接收器就知道下一步应该接收多少数据了。 - 如果用户配置了
spark.streaming.receiver.maxRate
或spark.streaming.kafka.maxRatePerPartition
,那么最后到底接收多少数据取决于三者的最小值。也就是说每个接收器或者每个 Kafka 分区每秒处理的数据不会超过spark.streaming.receiver.maxRate
或spark.streaming.kafka.maxRatePerPartition
的值。
详细的过程如下图所示:
Spark Streaming 反压机制的使用
在 Spark 启用反压机制很简单,只需要将 spark.streaming.backpressure.enabled
设置为 true
即可,这个参数的默认值为 false。反压机制还涉及以下几个参数,包括文档中没有列出来的:
- spark.streaming.backpressure.initialRate: 启用反压机制时每个接收器接收第一批数据的初始最大速率。默认值没有设置。
- spark.streaming.backpressure.rateEstimator:速率估算器类,默认值为 pid ,目前 Spark 只支持这个,大家可以根据自己的需要实现。
- spark.streaming.backpressure.pid.proportional:用于响应错误的权重(最后批次和当前批次之间的更改)。默认值为1,只能设置成非负值。weight for response to "error" (change between last batch and this batch)
- spark.streaming.backpressure.pid.integral:错误积累的响应权重,具有抑制作用(有效阻尼)。默认值为 0.2 ,只能设置成非负值。weight for the response to the accumulation of error. This has a dampening effect.
- spark.streaming.backpressure.pid.derived:对错误趋势的响应权重。 这可能会引起 batch size 的波动,可以帮助快速增加/减少容量。默认值为0,只能设置成非负值。weight for the response to the trend in error. This can cause arbitrary/noise-induced fluctuations in batch size, but can also help react quickly to increased/reduced capacity.
- spark.streaming.backpressure.pid.minRate:可以估算的最低费率是多少。默认值为 100,只能设置成非负值。
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Spark Streaming 反压(Back Pressure)机制介绍】(https://www.iteblog.com/archives/2323.html)
涨涨姿势
学习了
看看原理,涨涨姿势
看看原理,涨涨姿势
看看原理,涨涨姿势
想评论支持下结果折腾半天 这智商就不该上网啊...
不好意思,本博客有些功能使用不是很友好。
看看原理,涨涨姿势
学习,涨涨经验
学习,涨涨经验
学习一下,感觉不错
了解一下
学习
学习了,还有个问题,怎么解决spark streaming长期运行,内存越占越多,最后导致内存吃完的问题?
请问 你的问题解决了么 我写的项目遇到这个问题了
你确定你程序写的没问题?正常情况下不会出现这种问题的。
上次描述说的有点问题,我分配给集群的dirver 4g,executor每个2g,storage Memory分配到了4.3g,在sparkUI 中storage Memory一直增长,RDD blockes的数量也一直在增长,项目运行到2天18小时的时候报了一个Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space异常;个人觉得这个时候可能也是storage Memory增长到了4.3g,程序没反应过来得不到内存就报这个异常?但是spark中的内存模型中,程序的内存和缓存的内存是分开的啊,缓存的话我设置的是可以在磁盘里面的保存的,遇到这个问题一脸懵逼,新人很多地方不懂,跪求解答!<>_<>!
你确定你程序写的没问题?正常情况下不会出现这种问题的。
学习
学习
学习
学习
之前的监听了长任务,没有去想到动态调整这个Kafka入口流量,学习一下
一直关注博主
这个网站东博客还是很有含金量的
感谢支持本博客,同时还请多多分享很博客。
学习
学习一下
正在学习
学习学习
第一次听说,了解下。
学习中
学习学习
希望能介绍一些远原理方面的
学习学习!
之前没听说过反压机制,有机会找资料深入了解下.
来学一下
反压 还是很重要的一个特性,好好学习吧
两个场景,根据情况决定是否开启它
还真不知道有这个,赶快学习一下。
学习
学习,看能用上吗
学习下
学习学习,支持支持!
学习一下,感谢
学习
学习
学习
学习
专业
学习一下反压机制
学习下
学习一下!
学习一下反压机制
好像之前看你写过这篇文章..
看看