温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Scala Actor多线程怎么理解

发布时间:2021-12-09 09:06:58 来源:亿速云 阅读:139 作者:iii 栏目:编程语言

本篇内容介绍了“Scala Actor多线程怎么理解”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

Scala Actor是Scala里多线程的基础,核心思想是用消息传递来进行线程间的信息共享和同步。

Scala Actor线程模型可以这样理解:所有Actor共享一个线程池,总的线程个数可以配置,也可以根据CPU个数决定;当一个Actor启动之后,Scala分配一个线程给它使用,如果使用receive模型,这个线程就一直为该Actor所有,如果使用react模型,Scala执行完react方法后抛出异常,则该线程就可以被其它Actor使用。

下面看一些核心代码。

 def start(): Actor = synchronized {    // Reset various flags.    //    // Note that we do *not* reset `trapExit`. The reason is that    // users should be able to set the field in the constructor    // and before `act` is called.     exitReason = 'normal    exiting = false   shouldExit = false    scheduler execute {      ActorGC.newActor(Actor.this)      (new Reaction(Actor.this)).run()    }     this }

其中Reaction实现Runnable接口,scheduler基本相当于是一个线程池,所以调用start方法之后会有一个线程来为该Actor服务。

使用receive模型。

def receive[R](f: PartialFunction[Any, R]): R = {   assert(Actor.self == this, "receive from channel belonging to other actor")   this.synchronized {     if (shouldExit) exit() // links     val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))     if (null eq qel) {       waitingFor = f.isDefinedAt       isSuspended = true      suspendActor()     } else {       received = Some(qel.msg)       sessions = qel.session :: sessions     }     waitingFor = waitingForNone     isSuspended = false  }   val result = f(received.get)   sessions = sessions.tail   result

如果当前mailbox里面没有可以处理的消息,调用suspendActor,该方法会调用wait;如果有消息,这调用PartialFunction进行处理。

使用react模型。

def react(f: PartialFunction[Any, Unit]): Nothing = {   assert(Actor.self == this, "react on channel belonging to other actor")   this.synchronized {     if (shouldExit) exit() // links     val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))     if (null eq qel) {       waitingFor = f.isDefinedAt       continuation = f       isDetached = true    } else {       sessions = List(qel.session)       scheduleActor(f, qel.msg)     }     throw new SuspendActorException   }

如果当前mailbox没有可以处理的消息,设置waitingFor和continuation,这两个变量会在接收到消息的时候使用;如果有消息,则调用scheduleActor,该方法会在线程池里选择一个新的线程来处理,具体的处理方法也是由PartialFunction决定。不管是哪条路径,react都会立即返回,或者说是立即抛出异常,结束该线程的执行,这样该线程就可以被其它Actor使用。

再来看看接收消息的处理代码。

def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized {   if (waitingFor(msg)) {     received = Some(msg)      if (isSuspended)       sessions = replyTo :: sessions     else      sessions = List(replyTo)      waitingFor = waitingForNone      if (!onTimeout.isEmpty) {       onTimeout.get.cancel()       onTimeout = None     }      if (isSuspended)       resumeActor()     else // assert continuation != null       scheduler.execute(new Reaction(this, continuation, msg))   } else {     mailbox.append(msg, replyTo)   }

如果当前没有在等待消息或者接收到的消息不能处理,就丢到mailbox里去;相反,则进行消息的处理。这里对于receive模型和react模型就有了分支:如果isSuspended为true,表示是receive模型,并且线程在wait,就调用resumeActor,该方法会调用notify;否则就是react模型,同样在线程池里选择一个线程进行处理。

“Scala Actor多线程怎么理解”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI