欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

Akka学习笔记:ActorSystem(调度)

调度

  正如你在ActorSystem中的API看到的,如下:

//Light-weight scheduler for running asynchronous tasks after some deadline in the future.
def   scheduler : Scheduler

  在 ActorSystem 中有大量的方法调用scheduler,而scheduler返回的是Scheduler。Scheduler中有大量的schedule方法,利用他们我们可以在Actor环境下做大量的有趣的事情。

A、SCHEDULE SOMETHING TO EXECUTE ONCE


如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop

  在我们的Student-Teacher例子里面,假如在我们的测试用例程序中StudentActor想在接收到InitSignal消息后的5秒中之后才发送消息给Teacher。我们的代码看起来像这样的:

class StudentDelayedActor (teacherActorRef:ActorRef) extends Actor with ActorLogging {

  def receive = {
    case InitSignal=> {
      import context.dispatcher
      context.system.scheduler.scheduleOnce(5 seconds, teacherActorRef, QuoteRequest)
      //teacherActorRef!QuoteRequest
    }
    ...
    ...
  }
}

  1、测试用例
  我们来写个测试用例来测试这个:

"A delayed student" must {

   "fire the QuoteRequest after 5 seconds when an InitSignal is sent to it" in {

     import me.rerun.akkanotes.messaging.protocols.StudentProtocol._

     val teacherRef = system.actorOf(Props[TeacherActor], "teacherActorDelayed")
     val studentRef = system.actorOf(Props(new StudentDelayedActor(teacherRef)),
                                                          "studentDelayedActor")

     EventFilter.info (start="Printing from Student Actor", occurrences=1).intercept{
          studentRef!InitSignal
     }
   }
}

  2、将Eventfilter interception的超时时间增大
EventFilter在等待消息在 EventStream 中出现的默认的超时时间是3秒。我们将它增加到7秒来测试我们的程序, 我们可以通过filter-leeway 配置属性实现。

class RequestResponseTest extends TestKit(ActorSystem("TestUniversityMessageSystem", 
                                  ConfigFactory.parseString("""  
                                            akka{
                                              loggers = ["akka.testkit.TestEventListener"]
                                              test{
                                                  filter-leeway = 7s
                                              }
                                            }
                                    """)))
  with WordSpecLike
  with MustMatchers
  with BeforeAndAfterAll 
  with ImplicitSender {
  ...
  ...

B. SCHEDULE SOMETHING TO EXECUTE REPEATEDLY

  为了能够重复地运行任务,你可以用Scheduler的schedule 方法。最常用的schedule方法将定期地给Actor发送消息,它接收四个参数:
  1、在第一次运行的时候需要等待多少时间;
  2、子序列循序的频率;
  3、我们想发送消息的目标ActorRef ;
  4、消息

case InitSignal=> {  
      import context.dispatcher
      context.system.scheduler.schedule(0 seconds, 5 seconds, teacherActorRef, QuoteRequest)
      //teacherActorRef!QuoteRequest
    }

  琐事
  在这里引入import context.dispatcher非常重要。schedule方法需要一个很重要的隐形参数ExecutionContext,查看schedule 方法的实现就知道原因很明显

final def schedule(  
    initialDelay: FiniteDuration,
    interval: FiniteDuration,
    receiver: ActorRef,
    message: Any)(implicit executor: ExecutionContext,
                  sender: ActorRef = Actor.noSender): Cancellable =
    schedule(initialDelay, interval, new Runnable {
      def run = {
        receiver ! message
        if (receiver.isTerminated)
          throw new SchedulerException("timer active for terminated actor")
      }
    })

schedule 方法仅仅在Runnable中包装了tell,而它最后被我们传进来的ExecutionContext所调用。为了使得ExecutionContext 在这个范围内隐式可用,我们利用了上下文中可用的隐式dispatcher。可以从 ActorCell.scala (Context)代码里面看到

/**
   * Returns the dispatcher (MessageDispatcher) that is used for this Actor.
   * Importing this member will place an implicit ExecutionContext in scope.
   */
  implicit def dispatcher: ExecutionContextExecutor
本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Akka学习笔记:ActorSystem(调度)】(https://www.iteblog.com/archives/1166.html)
喜欢 (14)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(3)个小伙伴在吐槽