代码实现:
pom.xml
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<scala.version>2.11.8</scala.version>
<scala.actors.version>2.11.8</scala.actors.version>
<akka.version>2.4.17</akka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-actors -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-actors</artifactId>
<version>${scala.actors.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.5</version>
</dependency>
</dependencies>
用于存放,相应的ActorSystem,Actor的name
//用于存放,相应的ActorSystem,Actor的name
object Constant {
val RMRS="MyResourceManagerActorSystem"
val RMA="MyResourceManagerActor"
val NMAS="MyNodeManagerActorSystem"
val NMA="MyNodeManagerActor"
}
用于存放相应的receiver中的模式匹配
//注册消息 nodemanager -> resourcemanager
case class RegisterNodeManager(val nodemanagerid:String,val memory:Int,val cpu:Int)
//注册完成消息 resourcemanager -》 nodemanager
case class RegisteredNodeManager(val resourcemanagerhostname:String)
//用于匹配心跳
case class Heartbeat(val nodemanagerid:String)
//用于存放NodeManagerInfo的相关信息
class NodeManagerInfo(val nodemanagerid:String,val memory:Int,val cpu:Int){
var lastHeartBeatTime:Long=_
}
//用于匹配超时
case object CheckTimeOut
//用于匹配心跳
case object SendMessage
MyResourceManager
class MyResourceManager(var hostname:String,var port:Int) extends Actor{
// 用来存储每个注册的NodeManager节点的信息
private var id2nodemanagerinfo=new mutable.HashMap[String,NodeManagerInfo]()
// 对所有注册的NodeManager进行去重,其实就是一个HashSet
private var nodemanagerInfoes = new mutable.HashSet[NodeManagerInfo]()
// 调度一个任务, 每隔五秒钟执行一次,用于检查是否超时
override def preStart(): Unit = {
import scala.concurrent.duration._
import context.dispatcher
//也就是发送给receive的相关匹配
context.system.scheduler.schedule(0 millis,5000 millis,self,CheckTimeOut)
}
override def receive: Receive = {
//用于处理从NodeManager中传来的注册信息
case RegisterNodeManager(nodemanagerid, memory, cpu)=>{
val nodeManagerInfo = new NodeManagerInfo(nodemanagerid, memory, cpu)
// 对注册的NodeManager节点进行存储管理
id2nodemanagerinfo.put(nodemanagerid,nodeManagerInfo)
nodemanagerInfoes+=nodeManagerInfo
//注册完成,将信息,返回到NodeManager
sender() ! RegisteredNodeManager(hostname+":"+port)
}
//注册心跳
case Heartbeat(nodemanagerid) =>{
val currentTime = System.currentTimeMillis()
//获取旧的nodeManagerInfo
val nodeManagerInfo = id2nodemanagerinfo(nodemanagerid)
nodeManagerInfo.lastHeartBeatTime=currentTime
//完成心跳,更新时间
id2nodemanagerinfo(nodemanagerid)=nodeManagerInfo
nodemanagerInfoes += nodeManagerInfo
}
// 检查过期失效的 NodeManager
case CheckTimeOut=>{
val currentTime = System.currentTimeMillis()
// 15 秒钟失效 并过滤
nodemanagerInfoes.filter(nm=>currentTime-nm.lastHeartBeatTime>15000)
.foreach(deadnm=>{
nodemanagerInfoes -= deadnm
id2nodemanagerinfo.remove(deadnm.nodemanagerid)
})
println("当前注册成功的节点数"+nodemanagerInfoes.size);
}
}
}
//注册,创建相应Actor
object MyResourceManager{
def main(args: Array[String]): Unit = {
//localhost 9999
val RESOURCEMANAGER_HOSTNAME=args(0) //主机
val RESOURCEMANAGER_PORT=args(1).toInt //端口
val strConfig=
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname =${RESOURCEMANAGER_HOSTNAME}
|akka.remote.netty.tcp.port=${RESOURCEMANAGER_PORT}
""".stripMargin
val conf=ConfigFactory.parseString(strConfig)
val rmrs: ActorSystem = ActorSystem(Constant.RMRS,conf)
rmrs.actorOf( Props(new MyResourceManager(RESOURCEMANAGER_HOSTNAME,RESOURCEMANAGER_PORT)),Constant.RMA)
}
}
MyNodeManager
class MyNodeManager(val resourcemanagerhostname:String, val resourcemanagerport:Int, val memory:Int, val cpu:Int) extends Actor{
var nodemanagerid:String=_
var rmRef:ActorSelection=_
//在启动NodeManager时,向ResourceManager发送注册信息
override def preStart(): Unit = {
// 远程path akka.tcp://(ActorSystem的名称)@(远程地址的IP):(远程地址的端口)/user/(Actor的名称)
rmRef=context.actorSelection(s"akka.tcp://${Constant.RMRS}@${resourcemanagerhostname}:${resourcemanagerport}/user/${Constant.RMA}")
//生产 唯一的nodemanagerid
nodemanagerid = UUID.randomUUID().toString
rmRef ! RegisterNodeManager(nodemanagerid,memory,cpu)
}
override def receive: Receive = {
//resourceManager,接受到NodeManager的注册信息后,成功返回给NodeManager信息
case RegisteredNodeManager(masterURL)=>{
println(masterURL);
//发送心跳
/**
* initialDelay: FiniteDuration, 多久以后开始执行
* interval: FiniteDuration, 每隔多长时间执行一次
* receiver: ActorRef, 给谁发送这个消息
* message: Any 发送的消息是啥
*/
import scala.concurrent.duration._
import context.dispatcher
context.system.scheduler.schedule(0 millis,4000 millis,self,SendMessage)
}
case SendMessage=>{
//向主节点发送心跳信息
rmRef ! Heartbeat(nodemanagerid)
println(Thread.currentThread().getId)
}
}
}
object MyNodeManager{
def main(args: Array[String]): Unit = {
//localhost localhost 9999 1000 5 8888
val HOSTNAME=args(0)
val RM_HOSTNAME =args(1)
val RM_PORT=args(2).toInt
val NODEMANAGER_MEMORY = args(3).toInt
val NODEMANAGER_CORE=args(4).toInt
var NODEMANAGER_PORT=args(5).toInt
val strConfig=
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname =${HOSTNAME}
|akka.remote.netty.tcp.port=${NODEMANAGER_PORT}
""".stripMargin
val conf=ConfigFactory.parseString(strConfig)
val nmas: ActorSystem = ActorSystem(Constant.NMAS,conf)
nmas.actorOf(Props(new MyNodeManager(RM_HOSTNAME,RM_PORT,NODEMANAGER_MEMORY,NODEMANAGER_CORE)),Constant.NMA)
}
}
最后启动运行
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。