欢迎关注Hadoop、Spark、Flink、Hive、Hbase、Flume等大数据资料分享微信公共账号:iteblog_hadoop
  1. 文章总数:978
  2. 浏览总数:11,981,057
  3. 评论:3939
  4. 分类目录:106 个
  5. 注册用户数:6130
  6. 最后更新:2018年12月15日
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
iteblog_hadoop
大数据技术博客公众号bigdata_ai
大数据猿:
bigdata_ai

Spark Streaming作业提交源码分析数据处理篇

  在昨天的文章中介绍了Spark Streaming作业提交的数据接收部分的源码(《Spark Streaming作业提交源码分析接收数据篇》),今天来介绍Spark Streaming中如何处理这些从外部接收到的数据。

  在调用StreamingContext的start函数的时候,会调用JobScheduler的start函数。而JobScheduler的start函数会启动ReceiverTrackerjobGenerator

  在启动jobGenerator的时候,系统会根据这次是从Checkpoint恢复与否分别调用restart和startFirstTime函数。

/** Start generation of jobs */
def start(): Unit = synchronized {
    if (eventActor != null) return // generator has already been started

    eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
      def receive = {
        case event: JobGeneratorEvent =>  processEvent(event)
      }
    }), "JobGenerator")
    if (ssc.isCheckpointPresent) {
      restart()
    } else {
      startFirstTime()
    }
}

  startFirstTime函数会分别启动DStreamGraphJobGenerator线程

/**
 * User: 过往记忆
 * Date: 15-04-30
 * Time: 上午07:16
 * bolg: https://www.iteblog.com
 * 本文地址:https://www.iteblog.com/archives/1336
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 *
 *
 * Starts the generator for the first time */
private def startFirstTime() {
    val startTime = new Time(timer.getStartTime())
    graph.start(startTime - graph.batchDuration)
    timer.start(startTime.milliseconds)
    logInfo("Started JobGenerator at " + startTime)
}

  private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator")

  JobGenerator线程会每隔ssc.graph.batchDuration.milliseconds的时间生成Jobs,这个时间就是我们初始化StreamingContext的时候传进来的,生成Jobs是通过Akka调用generateJobs方法:

/**
 * User: 过往记忆
 * Date: 15-04-30
 * Time: 上午07:16
 * bolg: https://www.iteblog.com
 * 本文地址:https://www.iteblog.com/archives/1336
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 *
 *
 * Generate jobs and perform checkpoint for the given `time`.  */
private def generateJobs(time: Time) {
    // Set the sparkEnv in this thread, so that job generation code can access the
    // environment Example: BlockRDDs are created in this thread, and it needs 
    // to access BlockManager 
    // Update: This is probably redundant after threadlocal stuff in sparkEnv has
    // been removed.
    S parkEnv.set(ssc.env)
    Try {
      // allocate received blocks to batch
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) 
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val receivedBlockInfos =
          jobScheduler.receiverTracker.getBlocksOfBatch(time).mapValues { _.toArray }
        jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
    }
    eventActor ! DoCheckpoint(time)
}

在generateJobs方法中的jobScheduler.receiverTracker.allocateBlocksToBatch(time)很重要,其最终调用的是allocateBlocksToBatch函数,其定义如下:

def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
    if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
      val streamIdToBlocks = streamIds.map { streamId =>
          (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
      }.toMap
      val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
      writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))
      timeToAllocatedBlocks(batchTime) = allocatedBlocks
      lastAllocatedBatchTime = batchTime
      allocatedBlocks
    } else {
      // This situation occurs when:
      // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
      // possibly processed batch job or half-processed batch job need to be processed 
      // again, so the batchTime will be equal to lastAllocatedBatchTime.
      // 2. Slow checkpointing makes recovered batch time older than WAL recovered
      // lastAllocatedBatchTime.
      // This situation will only occurs in recovery time.
      logInfo(s"Possibly processed batch $batchTime need to be processed again 
            in WAL recovery")
    }
}

注意getReceivedBlockQueue(streamId),它的实现就是

private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
    streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
}

  还记得我们介绍从Kafka中读取数据并存储的过程吗?最终那些新生成的Block信息就是存储在streamIdToUnallocatedBlockQueues里面的,通过这个获取到所有那些没有处理的block并存储在timeToAllocatedBlocks(mutable.HashMap[Time, AllocatedBlocks])中,然后调用graph.generateJobs(time)函数生成Jobs。

  当Success(jobs) 成立时,系统会通过调用jobScheduler.receiverTracker.getBlocksOfBatch(time)获取那些新的block,这也就是获取timeToAllocatedBlocks中的信息,最后调用jobScheduler的submitJobSet函数将JobSet提交到集群进行计算,计算完之后会进行Checkpoint操作。

  好了,整个Spark Streaming作业从外部数据源接收数据并存储到内存,最后分割成作业的源码部分就弄完了。如果想关站Spark最新的资讯请关注本博客。

本博客文章除特别声明,全部都是原创!
转载本文请加上:转载自过往记忆(https://www.iteblog.com/)
本文链接: 【Spark Streaming作业提交源码分析数据处理篇】(https://www.iteblog.com/archives/1336.html)
喜欢 (8)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(2)个小伙伴在吐槽
  1. 早上刚写的啊!
    吴江122015-04-30 09:52 回复
    • 不是,是昨天晚上下班写的,今天早上发的。
      w3970907702015-04-30 10:25 回复