通信模型架构图
master 端代码
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
// 需要导入这2个包 封装一些属性。
class MasterActor extends Actor {
//在开始之前调用一次
override def preStart(): Unit = {
}
//用于接收消息
override def receive: Receive = {
case "started" => {
println("Master has been started!")
//进入这个分支,说明这个Master线程已经启动完成
}
case "connecting" => {
println("Master has been get connect from Worker!")
println("a Worker Node has been register!")
//返回消息给Worker
sender() ! "connected"
Thread.sleep(1000)
}
case "stoped" => {
}
}
}
object Demo01MasterActor {
def main(args: Array[String]) {
//设置MasterIP和端口
val masterHost = "localhost"
val masterPort = "1234"
//端口和IP封装到akka架构,获取一个属性配置文件
val conStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$masterHost"
|akka.remote.netty.tcp.port = "$masterPort"
""".stripMargin
val config = ConfigFactory.parseString(conStr)
val masterActorSystem = ActorSystem("MasterActorSystem", config)
val masterActor = masterActorSystem.actorOf(Props[MasterActor], "MasterActor")
masterActor ! "started"
masterActorSystem.awaitTermination();
}
}
worker端代码
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
class WorkerActor extends Actor {
var masterURL: ActorSelection = null
//启动Actor之前执行,做初始化工作
override def preStart(): Unit = {
//配置访问Master的URL
//MasterIP:localhost
//MasterPort:8888(根据Master配置)
//Master的 ActorSystem对象:MasterActorSystem、MasterActor
masterURL = context.actorSelection("akka.tcp://MasterActorSystem@localhost:8888/user/MasterActor")
}
override def receive: Receive = {
case "started" => {
println("Worker has been started!")
//进入这个分支,说明这个Worker线程已经启动完成
//可以去向Master注册
//请求和Master建立连接
masterURL ! "connecting"
}
case "connected" => {
println("Worker 收到来自Master确认信息!")
}
case "stoped" => {
}
}
}
object Demo01WorkerActor {
def main(args: Array[String]) {
//初始化MastereIP和端口、WorkerIP和端口
// val masterHost = args(0)
// val masterPort = args(1)
// val workerHost = args(2)
// val workePort = args(3)
val masterHost = "localhost"
val masterPort = "8888"
val workerHost = "localhost"
val workePort = "8889"
//端口和IP封装到akka架构,获取一个属性配置文件
val conStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$workerHost"
|akka.remote.netty.tcp.port = "$workePort"
""".stripMargin
val config = ConfigFactory.parseString(conStr)
val workerActorSystem = ActorSystem("WorkerActorSystem", config)
val workerActor = workerActorSystem.actorOf(Props[WorkerActor], "WorkerActor")
workerActor ! "started"
workerActorSystem.awaitTermination();
}
}
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。