做地方网站,微信营销方案,重庆网站供奉,wordpress置顶文章 调用ResourceManager 总结
一、概述
1、ResourceManager 管理 Flink 集群中的计算资源#xff0c;计算资源主要来自 TaskManager 组件。
2、如果集群采用 Native【本地模式】部署#xff0c;则 ResourceManager 会动态地向集群资源管理器申请 Container 并启动TaskManager计算资源主要来自 TaskManager 组件。
2、如果集群采用 Native【本地模式】部署则 ResourceManager 会动态地向集群资源管理器申请 Container 并启动TaskManager例如Hadoop Yarn、Kubernetes等。
3、ResourceManager主要接收来自 JobManager 的 SlotRequest 和 TaskManager 的 SlotReport。
二、分类
1、动态资源管理 和 不支持动态资源管理
1一类支持动态资源管理例如KubernetesResourceManager、YarnResourceManager及MesosResourceManager
支持动态资源管理的集群类型可以按需启动TaskManager资源根据Job所需的资源请求动态启动TaskManager节点这种资源管理方式不用担心资源浪费和资源动态伸缩的问题。
实现动态资源管理的ResourceManager需要继承ActiveResourceManager基本实现类。
2另一类不支持动态资源管理例如StandaloneResourceManager
2、分类图 三、核心服务
ResourceManagerRuntimeServices 中包含 SlotManager 和 JobLeaderldService 两个主要服务和 HeartbeatService 心跳服务。
1、SlotManager 管理整个集群的 Slot 计算资源并对 Slot 计算资源进行统一的分配和管理同时实现了对 TaskManager 信息的注册和管理。
2、JobLeaderldService 通过实现 jobLeaderldListeners 实时监听 JobManager 的运行状态以获取集群启动的作业对应的 JobLeaderld 信息防止出现 JobManager 无法连接的情况用于管理注册的 JobManager 节点包括对 JobManager 的注册和注销等操作。
3、HeartbeatService 主要通过 TaskManagerHeartbeatListener 和 JobManagerHeartbeatListener 两个监听器收集来自 TaskManager和 JobManager 的心跳信息以保证整个运行时中各个组件之间能够正常通信。
四、ResourceManager 的初始化和启动
DefaultDispatcherResourceManagerComponentFactory#create 方法
1、初始化 ResourceManager resourceManager resourceManagerFactory.createResourceManager(configuration,ResourceID.generate(),rpcService,highAvailabilityServices,heartbeatServices,fatalErrorHandler,new ClusterInformation(hostname, blobServer.getPort()),webMonitorEndpoint.getRestBaseUrl(),metricRegistry,hostname,ioExecutor);1创建 ResourceManagerRuntimeServices
1.创建 SlotManager
SlotMatchingStrategy 根据作业中给定的 ResourceProfile 匹配 Slot 计算资源。SlotMatchingStrategy主要分为两种类型
一种是LeastUtilizationSlotMatchingStrategy即按照利用率最低原则匹配Slot资源尽可能保证TaskExecutor上资源的使用率处于比较低的水平这种策略能够有效降低机器的负载。
另一种是AnyMatchingSlotMatchingStrategy即直接返回第一个匹配的Slot资源策略。
private static SlotManager createSlotManager(ResourceManagerRuntimeServicesConfiguration configuration,ScheduledExecutor scheduledExecutor,SlotManagerMetricGroup slotManagerMetricGroup) {final SlotManagerConfiguration slotManagerConfiguration configuration.getSlotManagerConfiguration();if (configuration.isEnableFineGrainedResourceManagement()) {return new FineGrainedSlotManager(scheduledExecutor,slotManagerConfiguration,slotManagerMetricGroup,new DefaultResourceTracker(),new FineGrainedTaskManagerTracker(),new DefaultSlotStatusSyncer(slotManagerConfiguration.getTaskManagerRequestTimeout()),new DefaultResourceAllocationStrategy(SlotManagerUtils.generateTaskManagerTotalResourceProfile(slotManagerConfiguration.getDefaultWorkerResourceSpec()),slotManagerConfiguration.getNumSlotsPerWorker()),Time.milliseconds(REQUIREMENTS_CHECK_DELAY_MS));} else if (configuration.isDeclarativeResourceManagementEnabled()) {return new DeclarativeSlotManager(scheduledExecutor,slotManagerConfiguration,slotManagerMetricGroup,new DefaultResourceTracker(),new DefaultSlotTracker());} else {return new SlotManagerImpl(scheduledExecutor, slotManagerConfiguration, slotManagerMetricGroup);}}2.创建 JobLeaderIdService
final JobLeaderIdService jobLeaderIdService new DefaultJobLeaderIdService(highAvailabilityServices, scheduledExecutor, configuration.getJobTimeout());2返回创建的 StandaloneResourceManager
return new StandaloneResourceManager(rpcService,resourceId,highAvailabilityServices,heartbeatServices,resourceManagerRuntimeServices.getSlotManager(),ResourceManagerPartitionTrackerImpl::new,resourceManagerRuntimeServices.getJobLeaderIdService(),clusterInformation,fatalErrorHandler,resourceManagerMetricGroup,standaloneClusterStartupPeriodTime,AkkaUtils.getTimeoutAsTime(configuration),ioExecutor);在 StandaloneResourceManager 构造方法中启动 RpcServer
this.rpcServer rpcService.startServer(this);2、启动 ResourceManager
resourceManager.start()-ResourceManager#onStartResourceManager#startResourceManagerServices
1获取 leaderElectionService
leaderElectionService highAvailabilityServices.getResourceManagerLeaderElectionService();2初始化 resourceManagerDriver【ActiveResourceManager需要】
resourceManagerDriver.initialize(this, new GatewayMainThreadExecutor(), ioExecutor);3启动 leader 竞选在 leader 节点启动服务
1.启动心跳服务
在ResourceManager中HeartbeatService的启动方法中包括了对taskManagerHeartbeatManager和jobManagerHeartbeatManager两个心跳管理服务的启动操作。
而心跳管理服务主要通过TaskManagerHeartbeatListener和JobManagerHeartbeatListener两个监听器收集来自TaskManager和JobManager的心跳信息以保证整个运行时中各个组件之间能够正常通信。
startHeartbeatServices();2.启动 slotManager 服务
通过scheduledExecutor线程池启动TaskManager周期性超时检查服务通过checkTaskManagerTimeouts()方法实现该检查防止TaskManager长时间掉线等问题。
启动单独的线程对提交的SlotRequest进行周期性超时检查防止Slot请求超时。
slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());4启动 jobLeaderIdService
jobLeaderIdService.start(new JobLeaderIdActionsImpl());五、总结
1、ResourceManager 通过 SlotManager 管理集群中的计算资源TaskManager 的 SlotReport响应 JobManager 的 SlotRequest
2、ResourceManager 通过 HeartBeatService 监听 JobManager 和 TaskManager 的心跳保证运行时各个组件间能够正常通信
3、ResourceManager 通过 JobLeaderldService 管理注册的 JobManager 节点包括对 JobManager 的注册和注销等操作