在昨天的文章中介绍了Spark Streaming作业提交的数据接收部分的源码(《Spark Streaming作业提交源码分析接收数据篇》),今天来介绍Spark Streaming中如何处理这些从外部接收到的数据。
在调用StreamingContext的start函数的时候,会调用JobScheduler的start函数。而JobScheduler的start函数会启动ReceiverTracker和jobGenerator。
在启动jobGenerator的时候,系统会根据这次是从Checkpoint恢复与否分别调用restart和startFirstTime函数。
/** Start generation of jobs */
def start(): Unit = synchronized {
if (eventActor != null) return // generator has already been started
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
def receive = {
case event: JobGeneratorEvent => processEvent(event)
}
}), "JobGenerator")
if (ssc.isCheckpointPresent) {
restart()
} else {
startFirstTime()
}
}
startFirstTime函数会分别启动DStreamGraph和JobGenerator线程
/**
* User: 过往记忆
* Date: 15-04-30
* Time: 上午07:16
* bolg:
* 本文地址:/archives/1336
* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
* 过往记忆博客微信公共帐号:iteblog_hadoop
*
*
* Starts the generator for the first time */
private def startFirstTime() {
val startTime = new Time(timer.getStartTime())
graph.start(startTime - graph.batchDuration)
timer.start(startTime.milliseconds)
logInfo("Started JobGenerator at " + startTime)
}
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator")
JobGenerator线程会每隔ssc.graph.batchDuration.milliseconds的时间生成Jobs,这个时间就是我们初始化StreamingContext的时候传进来的,生成Jobs是通过Akka调用generateJobs方法:
/**
* User: 过往记忆
* Date: 15-04-30
* Time: 上午07:16
* bolg:
* 本文地址:/archives/1336
* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
* 过往记忆博客微信公共帐号:iteblog_hadoop
*
*
* Generate jobs and perform checkpoint for the given `time`. */
private def generateJobs(time: Time) {
// Set the sparkEnv in this thread, so that job generation code can access the
// environment Example: BlockRDDs are created in this thread, and it needs
// to access BlockManager
// Update: This is probably redundant after threadlocal stuff in sparkEnv has
// been removed.
S parkEnv.set(ssc.env)
Try {
// allocate received blocks to batch
jobScheduler.receiverTracker.allocateBlocksToBatch(time)
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val receivedBlockInfos =
jobScheduler.receiverTracker.getBlocksOfBatch(time).mapValues { _.toArray }
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventActor ! DoCheckpoint(time)
}
在generateJobs方法中的jobScheduler.receiverTracker.allocateBlocksToBatch(time)很重要,其最终调用的是allocateBlocksToBatch函数,其定义如下:
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
val streamIdToBlocks = streamIds.map { streamId =>
(streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
}.toMap
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))
timeToAllocatedBlocks(batchTime) = allocatedBlocks
lastAllocatedBatchTime = batchTime
allocatedBlocks
} else {
// This situation occurs when:
// 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
// possibly processed batch job or half-processed batch job need to be processed
// again, so the batchTime will be equal to lastAllocatedBatchTime.
// 2. Slow checkpointing makes recovered batch time older than WAL recovered
// lastAllocatedBatchTime.
// This situation will only occurs in recovery time.
logInfo(s"Possibly processed batch $batchTime need to be processed again
in WAL recovery")
}
}
注意getReceivedBlockQueue(streamId),它的实现就是
private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
}
还记得我们介绍从Kafka中读取数据并存储的过程吗?最终那些新生成的Block信息就是存储在streamIdToUnallocatedBlockQueues里面的,通过这个获取到所有那些没有处理的block并存储在timeToAllocatedBlocks(mutable.HashMap[Time, AllocatedBlocks])中,然后调用graph.generateJobs(time)函数生成Jobs。
当Success(jobs) 成立时,系统会通过调用jobScheduler.receiverTracker.getBlocksOfBatch(time)获取那些新的block,这也就是获取timeToAllocatedBlocks中的信息,最后调用jobScheduler的submitJobSet函数将JobSet提交到集群进行计算,计算完之后会进行Checkpoint操作。
好了,整个Spark Streaming作业从外部数据源接收数据并存储到内存,最后分割成作业的源码部分就弄完了。如果想关站Spark最新的资讯请关注本博客。
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Spark Streaming作业提交源码分析数据处理篇】(https://www.iteblog.com/archives/1336.html)


早上刚写的啊!
不是,是昨天晚上下班写的,今天早上发的。