当前位置: 首页 > news >正文

视频网站用户增长怎么做营销策略有哪几种

视频网站用户增长怎么做,营销策略有哪几种,广西茶叶网站建设,地产网站建设案例使用Akka的Actor模拟Spark的Master和Worker工作机制 Spark的Master和Worker协调工作原理 在 Apache Spark 中,Master 和 Worker 之间通过心跳机制进行通信和保持活动状态。下面是 Master 和 Worker 之间心跳机制的工作流程: Worker 启动后&#xff0c…

使用Akka的Actor模拟Spark的Master和Worker工作机制

Spark的Master和Worker协调工作原理

在 Apache Spark 中,Master 和 Worker 之间通过心跳机制进行通信和保持活动状态。下面是 Master 和 Worker 之间心跳机制的工作流程:

  1. Worker 启动后,会向预先配置的 Master 节点发送注册请求。
  2. Master 接收到注册请求后,会为该 Worker 创建一个唯一的标识符(Worker ID)并将其信息保存在内存中。
  3. Master 向 Worker 发送包含 Master URL、Worker ID 等信息的注册响应。
  4. Worker 收到注册响应后,会启动一个定时器并开始周期性地向 Master 发送心跳消息。
  5. Worker 的心跳消息中包含当前的负载状况、可用资源等信息。
  6. Master 接收到心跳消息后,更新该 Worker 的最近心跳时间,并根据需要对集群进行动态调整,例如添加新的任务或删除故障的 Worker。
  7. 如果 Master 在一段时间内没有收到某个 Worker 的心跳消息,它将把该 Worker 标记为失效,并将其相应的资源标记为可用以供后续使用。

具体原理如下:

  • Worker 通过网络向 Master 发送心跳消息,通常使用基于 TCP 的长连接。这些心跳消息可以包含有关 Worker 健康状况、资源利用情况等的信息。
  • Master 使用一个内部的心跳管理组件来处理接收到的心跳消息,并维护每个 Worker 的状态。它根据心跳消息的频率和时间戳来判断 Worker 是否正常运行。
  • 如果 Master 在预定的时间内没有收到 Worker 的心跳消息,它会将该 Worker 标记为失效并触发一系列的故障处理机制,例如重新分配任务给其他可用的 Worker。
  • Worker 定期发送心跳消息,以确保在网络故障、Worker 故障或其他问题发生时能够及时通知 Master。

通过心跳机制,Master 能够实时监控 Worker 的状态,并根据需要进行集群的动态管理和资源调度,从而实现高可用性和容错性。

使用Akka的Actor模拟Spark的Master和Worker工作机制

  1. worker注册到Master, Master完成注册,并回复worker注册成功。
  2. worker定时发送心跳,并在Master接收到。
  3. Master接收到worker心跳后,要更新该worker的最近一次发送心跳的时间。
  4. 给Master启动定时任务,定时检测注册的worker有哪些没有更新心跳,并将其从hashmap中删除。
  5. master worker 进行分布式部署(Linux系统)-》如何给maven项目打包->上传linux。
  • 创建SparkMaster类继承Actor特质,实现Receive方法,并定义对应的伴生对象,在伴生对象中创建SparkMaster-actor引用,并启动Actor发送消息。服务端Master对worker进行心跳监测,发现6秒内无法获取worker心跳,将异常的Worker的实例从HashMap中移除。若能正常获取到心跳,则获取心跳信息后更新心跳时间。定时保持心跳机制。

代码实现:

class SparkMaster extends Actor {//定义个hashMap,管理workers(所有worker的实例)val workers = mutable.Map[String, WorkerInfo]()override def receive: Receive = {case "start" => {println("master服务器启动了...")//这里开始。。self ! StartTimeOutWorker}case RegisterWorkerInfo(id, cpu, ram) => {//接收到worker注册信息if (!workers.contains(id)) {//创建WorkerInfo 对象val workerInfo = new WorkerInfo(id, cpu, ram)//加入到workersworkers += ((id, workerInfo))println("服务器的workers=" + workers)//回复一个消息,说注册成功sender() ! RegisteredWorkerInfo}}case HeartBeat(id) => {//更新对应的worker的心跳时间//1.从workers对应的HashMap中取出WorkerInfo,然后更新worker心跳时间val workerInfo = workers(id)workerInfo.lastHeartBeat = System.currentTimeMillis()println("master更新了 " + id + " 心跳时间...")}case StartTimeOutWorker => {println("开始了定时检测worker心跳的任务")import context.dispatcher//说明//1. 0 millis 不延时,立即执行定时器//2. 9000 millis 表示每隔3秒执行一次//3. self:表示发给自己//4. RemoveTimeOutWorker 发送的内容context.system.scheduler.schedule(0 millis, 9000 millis, self, RemoveTimeOutWorker)}//对RemoveTimeOutWorker消息处理//这里需求检测哪些worker心跳超时(now - lastHeartBeat > 6000),并从map中删除case RemoveTimeOutWorker => {//首先将所有的 workers 的 所有WorkerInfoval workerInfos = workers.valuesval nowTime = System.currentTimeMillis()//先把超时的所有workerInfo,删除即可workerInfos.filter(workerInfo => (nowTime - workerInfo.lastHeartBeat) > 6000).foreach(workerInfo => workers.remove(workerInfo.id))println("当前有 " + workers.size + " 个worker存活的")}}
}object SparkMaster {def main(args: Array[String]): Unit = {//这里我们分析出有3个host,port,sparkMasterActorif (args.length != 3) {println("请输入参数 host port sparkMasterActor名字")sys.exit()}val host = args(0)val port = args(1)val name = args(2)//先创建ActorSystemval config = ConfigFactory.parseString(s"""|akka.actor.provider="akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname=${host}|akka.remote.netty.tcp.port=${port}""".stripMargin)val sparkMasterSystem = ActorSystem("SparkMaster", config)//创建SparkMaster -actorval sparkMasterRef = sparkMasterSystem.actorOf(Props[SparkMaster], s"${name}")//启动SparkMastersparkMasterRef ! "start"}
}
  • 定义SparkWorker类继承Actor特质,实现Receive方法,在方法中实现向master发送注册信息的请求,获取到服务端Master注册成功的消息后,定义定时任务发送心跳包给Master。
class SparkWorker(masterHost:String,masterPort:Int,masterName:String) extends Actor{//masterProxy是Master的代理/引用refvar masterPorxy :ActorSelection = _val id = java.util.UUID.randomUUID().toStringoverride def preStart(): Unit = {println("preStart()调用")//初始化masterPorxymasterPorxy = context.actorSelection(s"akka.tcp://SparkMaster@${masterHost}:${masterPort}/user/${masterName}")println("masterProxy=" + masterPorxy)}override def receive:Receive = {case "start" => {println("worker启动了")//发出一个注册消息masterPorxy ! RegisterWorkerInfo(id, 16, 16 * 1024)}case RegisteredWorkerInfo => {println("workerid= " + id + " 注册成功~")//当注册成功后,就定义一个定时器,每隔一定时间,发送SendHeartBeat给自己import context.dispatcher//说明//1. 0 millis 不延时,立即执行定时器//2. 3000 millis 表示每隔3秒执行一次//3. self:表示发给自己//4. SendHeartBeat 发送的内容context.system.scheduler.schedule(0 millis, 3000 millis, self, SendHeartBeat)}case SendHeartBeat =>{println("worker = " + id + "给master发送心跳")masterPorxy ! HeartBeat(id)}}
}object SparkWorker {def main(args: Array[String]): Unit = {if (args.length != 6) {println("请输入参数 workerHost workerPort workerName masterHost masterPort masterName")sys.exit()}val workerHost = args(0)val workerPort = args(1)val workerName = args(2)val masterHost = args(3)val masterPort = args(4)val masterName = args(5)val config = ConfigFactory.parseString(s"""|akka.actor.provider="akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname=${workerHost}|akka.remote.netty.tcp.port=${workerPort}""".stripMargin)//创建ActorSystemval sparkWorkerSystem = ActorSystem("SparkWorker",config)//创建SparkWorker 的引用/代理val sparkWorkerRef = sparkWorkerSystem.actorOf(Props(new SparkWorker(masterHost, masterPort.toInt,masterName)), s"${workerName}")//启动actorsparkWorkerRef ! "start"}
}
  • 分别定义发送注册信息的RegisterWorkerInfo的样例类,WorkerInfo消息类,定义注册成功的消息样例对象RegisteredWorkerInfo,心跳信息样例类HeartBeat,以及确认发送心跳信息样例对象SendHeartBeat,触发超时work的样例对象StartTimeOutWorker,移除超时worker的样例对象RemoveTimeOutWorker。

代码如下:

// worker注册信息 //MessageProtocol.scala
case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int)// 这个是WorkerInfo, 这个信息将来是保存到master的 hm(该hashmap是用于管理worker)
// 将来这个WorkerInfo会扩展(比如增加worker上一次的心跳时间)
class WorkerInfo(val id: String, val cpu: Int, val ram: Int) {var lastHeartBeat : Long = System.currentTimeMillis()
}// 当worker注册成功,服务器返回一个RegisteredWorkerInfo 对象
case object RegisteredWorkerInfo//worker每隔一定时间由定时器发给自己的一个消息
case object SendHeartBeat
//worker每隔一定时间由定时器触发,而向master发现的协议消息
case class HeartBeat(id: String)//master给自己发送一个触发检查超时worker的信息
case object StartTimeOutWorker
// master给自己发消息,检测worker,对于心跳超时的.
case object RemoveTimeOutWorker

运行效果:
在这里插入图片描述
通过这个案例我们可以深入理解Spark的Master和Worker的通讯机制,为了方便以后对Spark的底层源码的学习,命名的方式和源码保持一致.(如: 通讯消息类命名就是一样的);同时也加深了我们对主从服务心跳检测机制(HeartBeat)的理解,方便以后spark源码二次开发。

http://www.hkea.cn/news/850465/

相关文章:

  • 优秀网站建设公司电话下列哪些店铺适合交换友情链接
  • 58同城乌鲁木齐网站建设重庆网站到首页排名
  • wordpress知言主题山东服务好的seo公司
  • 旅游商务平台网站建设功能需求关键词排名查询官网
  • 做网站要搭建本地服务器么微商引流被加方法精准客源
  • 网站名字要备案吗友情链接怎么弄
  • 江苏网站开发外链网站大全
  • 网站代备案流程图百度关键词优化排名技巧
  • 石狮建设局网站今日头条站长平台
  • 修改公司网站网页站长素材音效
  • 网站速度测速免费访问国外网站的app
  • 常州网站搭建公司宣传推广渠道有哪些
  • 中国建设监理网站广告网络
  • 网站维护费用怎么收路由优化大师官网
  • 如何加入小说网站做打字员合肥网站优化推广方案
  • 网站建设现状关键词在线优化
  • 网站建设就业百度网址导航主页
  • 郑州公司做网站汉狮中囯联通腾迅
  • 专业网上购物平台优化网站的步骤
  • 用web开发一个网站怎么做网站推广优化平台
  • 建设企业网站进去无法显示搜索引擎seo
  • 网站 分辨率百度视频推广
  • 中国红河网seo排名工具
  • 做网站商丘3a汽车集团公司网络营销方案
  • 网络宣传推广策划范文seo如何优化排名
  • 网站 建设 原则新闻今天的最新新闻
  • 服装网站首页设计主要推广手段免费
  • 网站建设公司做销售好不好?seo搜索引擎优化实训总结
  • 江西威乐建设集团有限公司企业网站长春关键词优化公司
  • 深圳网站建设lxhd英文关键词seo