好程序员大数据学习路线分享Actor学习笔记,在scala中她能实现很强大的功能,他是基于并发机制的一个事件模型
我们现在学的scala2.10.x版本就是之前的Actor
同步:在主程序上排队执行的任务,只有前一个任务执行完毕后,才能执行下一个任务
异步:指不进入主程序,而进入"任务对列"的任务,只有等主程序任务执行完毕,"任务对列"开始请求主程序,请求任务执行,该任务会进入主程序
java
共享变量 -- 加锁
会出现锁死问题
scala
Actor不共享数据
没有锁的概念
Actor通信之间需要message(通信)
Aactor执行顺序
1.首先调用start()方法启动Actor
2.调用start()方法后act()方法会被执行
3.Actor之间进行发送消息
Actor发送消息的三种方式
! -> 发送异步消息,没有返回值
!? -> 发送同步消息,有返回值,会有线程等待
!! -> 发送异步消息,有返回值,返回值类型Future[Any](用来获取异步操作结果)
Actor并行执行
//注意,这两个actor会并行执行,当其中一个for循环结束后,actor结束 object ActorDemo01 { def main(args: Array[String]): Unit = { MyActor1.start() MyActor2.start() } }
object MyActor1 extends Actor{ override def act(): Unit = { for (i <- 1 to 10){ println(s"actor => $i") Thread.sleep(2000) } }
object MyActor2 extends Actor{ override def act(): Unit = { for (i <- 1 to 5){ println(s"actor2 => $i") Thread.sleep(2000) } } } } |
用Actor不断接受消息
执行第一种方式,异步
object ActorDemo02 { def main(args: Array[String]): Unit = { val actor: MyActor = new MyActor actor.start()
//并行执行 actor ! "start" // !->异步 actor ! "stop" println("发送完成")
} }
class MyActor extends Actor{ override def act(): Unit = { while (true){ //死循环 receive { //接收 case "start" => { println("starting") Thread.sleep(1000) println("started") } case "stop" => { println("stopping") Thread.sleep(1000) println("stopped") } } } } } |
第二种方式:利用react来代替receive,也就是说react线程可复用,比receive更高效
object ActorDemo03 { def main(args: Array[String]): Unit = { val actor: MyActor3 = new MyActor3 actor.start() actor ! "start" actor ! "stop" println("成功了") } }
class MyActor3 extends Actor{ override def act(): Unit = { loop { react{ case "start" =>{ println("starting") Thread.sleep(1000) println("sarted") } case "stop" =>{ println("stoppting") Thread.sleep(1000) println("stopped") } } } } } |
结合样例类练习Actor发送消息
//创建样例类 case class AsyncMsg(id: Int, msg: String) case class SyncMsg(id: Int, msg: String) case class ReplyMsg(id: Int, msg: String)
object ActorDemo01 extends Actor { override def act(): Unit = { while (true) { receive { case "start" => println("starting...") case AsyncMsg(id, msg) => { println(s"id:$id,msg:$msg") sender ! ReplyMsg(1,"sucess") //接收到消息后返回响应消息 } case SyncMsg(id,msg) => { println(s"id:$id,msg:$msg") sender ! ReplyMsg(2,"sucess") } } } } }
object ActorTest{ def main(args: Array[String]): Unit = {
val actor: Actor = ActorDemo01.start()
// //异步发送消息,没有返回值 // actor ! AsyncMsg(3,"heihei") // println("异步消息发送完成,没有返回值")
// //同步发送消息,有返回值 // val text: Any = actor !? SyncMsg(4,"OK") // println(text) // println("同步消息发送成功")
//异步发送消息,有返回值,返回类型为Future[Any] val reply: Future[Any] = actor !! SyncMsg(5,"OK is 不存在的") Thread.sleep(2000) if (reply.isSet){ val applyMsg: Any = reply.apply() println(applyMsg) }else{ println("Nothing") } } } |
Actor并行化的wordcount
class Task extends Actor {
override def act(): Unit = { loop { react { case SubmitTask(fileName) => { val contents = Source.fromFile(new File(fileName)).mkString val arr = contents.split("\r\n") val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.length) //val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2)) sender ! ResultTask(result) } case StopTask => { exit() } } } } }
object WorkCount { def main(args: Array[String]) { val files = Array("c://words.txt", "c://words.log")
val replaySet = new mutable.HashSet[Future[Any]] val resultList = new mutable.ListBuffer[ResultTask]
for(f <- files) { val t = new Task val replay = t.start() !! SubmitTask(f) replaySet += replay }
while(replaySet.size > 0){ val toCumpute = replaySet.filter(_.isSet) for(r <- toCumpute){ val result = r.apply() resultList += result.asInstanceOf[ResultTask] replaySet.remove(r) } Thread.sleep(100) } val finalResult = resultList.map(_.result).flatten.groupBy(_._1).mapValues(x => x.foldLeft(0)(_ + _._2)) println(finalResult) } }
case class SubmitTask(fileName: String) case object StopTask case class ResultTask(result: Map[String, Int]) |
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。