Spark中使用的分布式多线程框架,是Akka,是Scala的一种多线程的类库。Akka也实现了类似Scala Actor的模型,其核心概念同样也是Actor。Scala Actor模型已经在2.1.0的时候还在用,但是在2.1.1的时候已经被遗弃了,Spark开始转换用AKKA来替代Scala Actor,但是Scala Actor概念和原理都还是相同的。所以学习Scala Actor对我们学习AKKA,Spark还是有所帮助的
之所以学习Scala Actor,AKKA是因为在学习Spark源码的时,我们能看懂Spark的源码,因为在底层消息传递机制上大量使用AKKA的传送机制。
package cn.xpleaf.bigdata.p5.myactor
import scala.actors.Actor
* 学习scala actor的基本操作
* 和java中的Runnable Thread几乎一致
* 第一步:编写一个类,扩展特质trait Actor(scala 的actor)
* 第二步:复写其中的act方法
* 第三步:创建该actor的对象,调用该对象的start()方法,启动该线程
* 第四步:通过scala的操作符"!",发送消息
* 第五步:结束的话,调用close即可
* 模拟单向打招呼
object ActorOps {
def main(args: Array[String]): Unit = {
val mFActor = new MyFirstActor()
// 发送消息
mFActor ! "小美,睡了吗?"
mFActor ! "我去洗澡了~"
mFActor ! "呵呵"
class MyFirstActor extends Actor {
override def act(): Unit = {
while(true) {
receive {
case str: String => println(str)
小美,睡了吗? 我去洗澡了~ 呵呵
package cn.xpleaf.bigdata.p5.myactor
import scala.actors.Actor
object GreetingActor {
def main(args: Array[String]): Unit = {
val ga = new GreetingActor
ga ! Greeting("小美")
ga ! WorkContent("装系统")
case class Greeting(name:String)
case class WorkContent(content:String)
class GreetingActor extends Actor {
override def act(): Unit = {
while(true) {
receive {
case Greeting(name) => println(s"Hello, $name")
case WorkContent(content) => println(s"Let's talk about sth. with $content")
Hello, 小美
Let's talk about sth. with 装系统
package cn.xpleaf.bigdata.p5.myactor
import scala.actors.Actor
* actor之线程间,互相通信
* studentActor
* 向老师问了一个问题
* teacherActor
* 向学生做回应
* 通信的协议:
* 请求,使用Request(内容)来表示
* 响应,使用Response(内容)来表示
object _03CommunicationActorOps {
def main(args: Array[String]): Unit = {
val teacherActor = new TeacherActor()
val studentActor = new StudentActor(teacherActor)
studentActor ! Request("老李啊,scala学习为什么这么难啊")
case class Request(req:String)
case class Response(resp:String)
class StudentActor(teacherActor: TeacherActor) extends Actor {
override def act(): Unit = {
while(true) {
receive {
case Request(req) => {
// 向老师请求相关的问题
println("学生向老师说:" + req)
teacherActor ! Request(req)
case Response(resp) => {
class TeacherActor() extends Actor {
override def act(): Unit = {
while (true) {
receive {
case Request(req) => { // 接收到学生的请求
sender ! Response("这个问题,需要如此搞定~")
学生向老师说:老李啊,scala学习为什么这么难啊 这个问题,需要如此搞定~ 高!
val response= activeActor !? activeMessage
val futureResponse = activeActor !! activeMessage
val activeReply = future()
package cn.xpleaf.bigdata.p5.myakka.p1
import akka.actor.{Actor, ActorSystem, Props}
import cn.xpleaf.bigdata.p5.myakka.MessageProtocol.{QuoteRequest, QuoteResponse}
import scala.util.Random
* 基于AKKA Actor的单向通信案例
* 学生向老师发送请求
object _01StudentActorOps {
def main(args: Array[String]): Unit = {
// 第一步:构建Actor操作系统
val actorSystem = ActorSystem("StudentActorSystem")
// 第二步:actorSystem创建TeacherActor的代理对象ActorRef
val teacherActorRef = actorSystem.actorOf(Props[TeacherActor])
// 第三步:发送消息
teacherActorRef ! QuoteRequest()
// 第四步:关闭
class TeacherActor extends Actor {
val quotes = List(
"Moderation is for cowards",
"Anything worth doing is worth overdoing",
"The trouble is you think you have time",
"You never gonna know if you never even try")
override def receive = {
case QuoteRequest() => {
val random = new Random()
val randomIndex = random.nextInt(quotes.size)
val randomQuote = quotes(randomIndex)
val response = QuoteResponse(randomQuote)
package cn.xpleaf.bigdata.p5.myakka
* akka actor通信协议
object MessageProtocol {
case class QuoteRequest()
case class QuoteResponse(resp: String)
case class InitSign()
object Start extends Serializable
object Stop extends Serializable
trait Message {
val id: String
case class Shutdown(waitSecs: Int) extends Serializable
case class Heartbeat(id: String, magic: Int) extends Message with Serializable
case class Header(id: String, len: Int, encrypted: Boolean) extends Message with Serializable
case class Packet(id: String, seq: Long, content: String) extends Message with Serializable
QuoteResponse(Anything worth doing is worth overdoing)
package cn.xpleaf.bigdata.p5.myakka.p2
import akka.actor.Actor
import cn.xpleaf.bigdata.p5.myakka.MessageProtocol.{QuoteRequest, QuoteResponse}
import scala.util.Random
* Teacher Actor
class TeacherActor extends Actor {
val quotes = List(
"Moderation is for cowards",
"Anything worth doing is worth overdoing",
"The trouble is you think you have time",
"You never gonna know if you never even try")
override def receive = {
case QuoteRequest() => {
val random = new Random()
val randomIndex = random.nextInt(quotes.size)
val randomQuote = quotes(randomIndex)
val response = QuoteResponse(randomQuote)
// println(response)
sender ! response
package cn.xpleaf.bigdata.p5.myakka.p2
import akka.actor.{Actor, ActorLogging, ActorRef}
import cn.xpleaf.bigdata.p5.myakka.MessageProtocol.{InitSign, QuoteRequest, QuoteResponse}
* Student Actor
* 当学生接收到InitSign信号之后,便向老师发送一条Request请求的消息
class StudentActor(teacherActorRef:ActorRef) extends Actor with ActorLogging {
override def receive = {
case InitSign => {
teacherActorRef ! QuoteRequest()
// println("student send request")
case QuoteResponse(resp) => {
package cn.xpleaf.bigdata.p5.myakka.p2
import akka.actor.{ActorSystem, Props}
import cn.xpleaf.bigdata.p5.myakka.MessageProtocol.InitSign
object DriverApp {
def main(args: Array[String]): Unit = {
val actorSystem = ActorSystem("teacherStudentSystem")
// 老师的代理对象
val teacherActorRef = actorSystem.actorOf(Props[TeacherActor], "teacherActor")
// 学生的代理对象
val studentActorRef = actorSystem.actorOf(Props[StudentActor](new StudentActor(teacherActorRef)), "studentActor")
studentActorRef ! InitSign
[INFO] [04/24/2018 10:02:19.932] [teacherStudentSystem-akka.actor.default-dispatcher-2] [akka://teacherStudentSystem/user/studentActor] Anything worth doing is worth overdoing
MyRemoteServerSideActor {
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = ""
port = 2552
MyRemoteClientSideActor {
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
package cn.xpleaf.bigdata.p5.myakka.p3
import akka.actor.{Actor, ActorLogging}
import cn.xpleaf.bigdata.p5.myakka.{Header, Shutdown, Start, Stop}
class RemoteActor extends Actor with ActorLogging {
def receive = {
case Start => { // 处理Start消息
log.info("Remote Server Start ==>RECV Start event : " + Start)
case Stop => { // 处理Stop消息
log.info("Remote Server Stop ==>RECV Stop event: " + Stop)
case Shutdown(waitSecs) => { // 处理Shutdown消息
log.info("Remote Server Shutdown ==>Wait to shutdown: waitSecs=" + waitSecs)
log.info("Remote Server Shutdown ==>Shutdown this system.")
context.system.shutdown // 停止当前ActorSystem系统
case Header(id, len, encrypted) => log.info("Remote Server => RECV header: " + (id, len, encrypted)) // 处理Header消息
case _ =>
package cn.xpleaf.bigdata.p5.myakka.p3
import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory
object AkkaServerApplication extends App {
// 创建名称为remote-system的ActorSystem:从配置文件application.conf中获取该Actor的配置内容
val system = ActorSystem("remote-system",
val log = system.log
log.info("===>Remote server actor started: " + system)
// 创建一个名称为remoteActor的Actor,返回一个ActorRef,这里我们不需要使用这个返回值
system.actorOf(Props[RemoteActor], "remoteActor")
package cn.xpleaf.bigdata.p5.myakka.p3
import akka.actor.SupervisorStrategy.Stop
import akka.actor.{Actor, ActorLogging}
import cn.xpleaf.bigdata.p5.myakka.{Header, Start}
class ClientActor extends Actor with ActorLogging {
// akka.<protocol>://<actor system>@<hostname>:<port>/<actor path>
val path = "akka.tcp://remote-system@" // 远程Actor的路径,通过该路径能够获取到远程Actor的一个引用
val remoteServerRef = context.actorSelection(path) // 获取到远程Actor的一个引用,通过该引用可以向远程Actor发送消息
@volatile var connected = false
@volatile var stop = false
def receive = {
case Start => { // 发送Start消息表示要与远程Actor进行后续业务逻辑处理的通信,可以指示远程Actor初始化一些满足业务处理的操作或数据
if (!connected) {
connected = true
log.info("ClientActor==> Actor connected: " + this)
case Stop => {
stop = true
connected = false
log.info("ClientActor=> Stopped")
case header: Header => {
log.info("ClientActor=> Header")
case (seq, result) => log.info("RESULT: seq=" + seq + ", result=" + result) // 用于接收远程Actor处理一个Packet消息的结果
case m => log.info("Unknown message: " + m)
private def send(cmd: Serializable): Unit = {
log.info("Send command to server: " + cmd)
try {
remoteServerRef ! cmd // 发送一个消息到远程Actor,消息必须是可序列化的,因为消息对象要经过网络传输
} catch {
case e: Exception => {
connected = false
log.info("Try to connect by sending Start command...")
package cn.xpleaf.bigdata.p5.myakka.p3
import akka.actor.{ActorSystem, Props}
import cn.xpleaf.bigdata.p5.myakka.{Header, Start}
import com.typesafe.config.ConfigFactory
object AkkaClientApplication extends App {
// 通过配置文件application.conf配置创建ActorSystem系统
val system = ActorSystem("client-system",
val log = system.log
val clientActor = system.actorOf(Props[ClientActor], "clientActor") // 获取到ClientActor的一个引用
clientActor ! Start // 发送一个Start消息,第一次与远程Actor握手(通过本地ClientActor进行转发)
clientActor ! Header("What's your name: Can you tell me ", 20, encrypted = false) // 发送一个Header消息到远程Actor(通过本地ClientActor进行转发)
[INFO] [04/24/2018 09:39:49.271] [main] [Remoting] Starting remoting
[INFO] [04/24/2018 09:39:49.508] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://remote-system@]
[INFO] [04/24/2018 09:39:49.509] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://remote-system@]
[INFO] [04/24/2018 09:39:49.517] [main] [ActorSystem(remote-system)] ===>Remote server actor started: akka://remote-system
[INFO] [04/24/2018 09:46:01.872] [remote-system-akka.actor.default-dispatcher-3] [akka.tcp://remote-system@] Remote Server Start ==>RECV Start event : cn.xpleaf.bigdata.p5.myakka.Start$@325737b3
[INFO] [04/24/2018 09:46:03.501] [remote-system-akka.actor.default-dispatcher-3] [akka.tcp://remote-system@] Remote Server => RECV header: (What's your name: Can you tell me ,20,false)
[INFO] [04/24/2018 09:46:01.274] [main] [Remoting] Starting remoting
[INFO] [04/24/2018 09:46:01.479] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://client-system@]
[INFO] [04/24/2018 09:46:01.480] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://client-system@]
[INFO] [04/24/2018 09:46:01.493] [client-system-akka.actor.default-dispatcher-4] [akka.tcp://client-system@] Send command to server: cn.xpleaf.bigdata.p5.myakka.Start$@4f00805d
[INFO] [04/24/2018 09:46:01.496] [client-system-akka.actor.default-dispatcher-4] [akka.tcp://client-system@] ClientActor==> Actor connected: cn.xpleaf.bigdata.p5.myakka.p3.ClientActor@5a85b576
[INFO] [04/24/2018 09:46:03.490] [client-system-akka.actor.default-dispatcher-2] [akka.tcp://client-system@] ClientActor=> Header
[INFO] [04/24/2018 09:46:03.491] [client-system-akka.actor.default-dispatcher-2] [akka.tcp://client-system@] Send command to server: Header(What's your name: Can you tell me ,20,false)
