《Akka学习笔记:ACTORS介绍》
《Akka学习笔记:Actor消息传递(1)》
《Akka学习笔记:Actor消息传递(2)》
《Akka学习笔记:日志》
《Akka学习笔记:测试Actors》
《Akka学习笔记:Actor消息处理-请求和响应(1) 》
《Akka学习笔记:Actor消息处理-请求和响应(2) 》
《Akka学习笔记:ActorSystem(配置)》
《Akka学习笔记:ActorSystem(调度)》
调度
正如你在ActorSystem中的API看到的,如下:
//Light-weight scheduler for running asynchronous tasks after some deadline in the future. def scheduler : Scheduler
在 ActorSystem 中有大量的方法调用scheduler,而scheduler返回的是Scheduler。Scheduler中有大量的schedule方法,利用他们我们可以在Actor环境下做大量的有趣的事情。
A、SCHEDULE SOMETHING TO EXECUTE ONCE
在我们的Student-Teacher例子里面,假如在我们的测试用例程序中StudentActor想在接收到InitSignal消息后的5秒中之后才发送消息给Teacher。我们的代码看起来像这样的:
class StudentDelayedActor (teacherActorRef:ActorRef) extends Actor with ActorLogging {
def receive = {
case InitSignal=> {
import context.dispatcher
context.system.scheduler.scheduleOnce(5 seconds, teacherActorRef, QuoteRequest)
//teacherActorRef!QuoteRequest
}
...
...
}
}
1、测试用例
我们来写个测试用例来测试这个:
"A delayed student" must {
"fire the QuoteRequest after 5 seconds when an InitSignal is sent to it" in {
import me.rerun.akkanotes.messaging.protocols.StudentProtocol._
val teacherRef = system.actorOf(Props[TeacherActor], "teacherActorDelayed")
val studentRef = system.actorOf(Props(new StudentDelayedActor(teacherRef)),
"studentDelayedActor")
EventFilter.info (start="Printing from Student Actor", occurrences=1).intercept{
studentRef!InitSignal
}
}
}
2、将Eventfilter interception的超时时间增大
EventFilter在等待消息在 EventStream 中出现的默认的超时时间是3秒。我们将它增加到7秒来测试我们的程序, 我们可以通过filter-leeway 配置属性实现。
class RequestResponseTest extends TestKit(ActorSystem("TestUniversityMessageSystem",
ConfigFactory.parseString("""
akka{
loggers = ["akka.testkit.TestEventListener"]
test{
filter-leeway = 7s
}
}
""")))
with WordSpecLike
with MustMatchers
with BeforeAndAfterAll
with ImplicitSender {
...
...
B. SCHEDULE SOMETHING TO EXECUTE REPEATEDLY
为了能够重复地运行任务,你可以用Scheduler的schedule 方法。最常用的schedule方法将定期地给Actor发送消息,它接收四个参数:
1、在第一次运行的时候需要等待多少时间;
2、子序列循序的频率;
3、我们想发送消息的目标ActorRef ;
4、消息
case InitSignal=> {
import context.dispatcher
context.system.scheduler.schedule(0 seconds, 5 seconds, teacherActorRef, QuoteRequest)
//teacherActorRef!QuoteRequest
}
琐事
在这里引入import context.dispatcher非常重要。schedule方法需要一个很重要的隐形参数ExecutionContext,查看schedule 方法的实现就知道原因很明显
final def schedule(
initialDelay: FiniteDuration,
interval: FiniteDuration,
receiver: ActorRef,
message: Any)(implicit executor: ExecutionContext,
sender: ActorRef = Actor.noSender): Cancellable =
schedule(initialDelay, interval, new Runnable {
def run = {
receiver ! message
if (receiver.isTerminated)
throw new SchedulerException("timer active for terminated actor")
}
})
schedule 方法仅仅在Runnable中包装了tell,而它最后被我们传进来的ExecutionContext所调用。为了使得ExecutionContext 在这个范围内隐式可用,我们利用了上下文中可用的隐式dispatcher。可以从 ActorCell.scala (Context)代码里面看到
/** * Returns the dispatcher (MessageDispatcher) that is used for this Actor. * Importing this member will place an implicit ExecutionContext in scope. */ implicit def dispatcher: ExecutionContextExecutor本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Akka学习笔记:ActorSystem(调度)】(https://www.iteblog.com/archives/1166.html)


