当前位置: 首页 > news >正文

衡水城乡建设局网站首页蛋糕店网站设计模板

衡水城乡建设局网站首页,蛋糕店网站设计模板,世界500强企业中国有多少家,做公司 网站建设价格在Flink中状态主要分为三种: Operator State(算子状态)Keyed State(键控状态)Broadcast State(广播状态) 这里简单介绍一下Operator State的使用,说到使用State就必然要使用到Flink的容错机制也就是Checkpoint.具体内容见代码注解 数据源 这里选用Socket作为Source输入,便于…在Flink中状态主要分为三种: Operator State(算子状态)Keyed State(键控状态)Broadcast State(广播状态) 这里简单介绍一下Operator State的使用,说到使用State就必然要使用到Flink的容错机制也就是Checkpoint.具体内容见代码注解 数据源 这里选用Socket作为Source输入,便于测试➜ ~ nc -lk 8888 a b c k k k状态算子代码/** * Description TODO 自定义状态MapFunc **/ // 状态算子必须要实现对应的算子接口和CheckpointFunction接口 class StateMapFunc implements MapFunctionString, String, CheckpointedFunction{private ListStateString strListState;/*** Param o* return String* Description TODO map方法的正常处理逻辑**/Overridepublic String map(String s) throws Exception {// 模拟Task失败if (s.equals(k) RandomUtils.nextInt(0, 5) 3) {throw new Exception(Task 异常);}// 将数据添加到状态存储器中strListState.add(s);IterableString strings strListState.get();StringBuilder builder new StringBuilder();for (String string : strings) {builder.append(string);}return builder.toString();}/*** Param functionSnapshotContext* return void* Description TODO 系统对状态数据做快照(持久化)会调用此方法, 用户使用此方法在持久化前对状态数据可以做一些操控**/Overridepublic void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {System.out.println(快照生成, checkpointId: functionSnapshotContext.getCheckpointId());}/*** Param functionInitializationContext* return void* Description TODO 算子任务在启动前会调用此方法,未用户状态数据进行初始化**/Overridepublic void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {// 获取算子状态存储器OperatorStateStore operatorStateStore functionInitializationContext.getOperatorStateStore();/*** ListStateDescriptor状态描述* 参数1:一个自定义名称* 参数2:存储的数据类型**/ListStateDescriptorString stateDescriptor new ListStateDescriptor(demo, String.class);/*** 算子状态存储器, 只提供ListSate的形式(和Java中的List不是一回事)来存储状态数据* getListSate方法,会在Task失败后,task自动重启时,会帮助用户加载最近一次的快照数据,如果是job重启则不会加载**/strListState operatorStateStore.getListState(stateDescriptor);} }要注意代码注释中的内容,getListState只作用于Task的自动重启,如果是整个Job重启时不生效的,如果是想Job重启后从重启前的State获取数据需要在Job提交时就指定checkpoint镜像文件.业务代码 public class FlinkOperatorState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度1env.setParallelism(1);// 开启Checkpoint, 8秒一个周期并开启一次性语义env.enableCheckpointing(8000, CheckpointingMode.EXACTLY_ONCE);// 指定checkpoint持久化路径env.getCheckpointConfig().setCheckpointStorage(file:///Users/xxx/data/testData/checkpoint);// 开启Task级别故障自动failover,通过fixedDelayRestart设置Task重启上限和重启间隔,这里设置的重启次数为2次,一旦Task重启次数超过这个次数,整个job也会停止env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(5)));// 获取Socket数据源DataStreamSourceString socketSource env.socketTextStream(localhost, 8888);// 将自定义的StateOperator传入SingleOutputStreamOperatorString map socketSource.map(new StateMapFunc());// 打印结果map.print();env.execute(Operator State);} }具体的代码模板和API的介绍大概就这些内容,具体实践要根据业务逻辑而定.
http://www.hkea.cn/news/14412891/

相关文章:

  • 代理加盟微信网站建设上海闵行刚刚发生的
  • 找人做效果土去那网站找厦门网站制
  • 全屏网站怎么做如何进行网站备案
  • 平潭建设局网站首页济南网站设计公司富
  • 做图片网站 解决版权微信小程序广告投放价格表
  • 义网站建设推荐郑国华页面开发
  • 百度收录哪些网站公司网站模板内容
  • 网站开发需求文档怎么写宁波免费建网站
  • 电子商务网站开发应遵循的基本原则wordpress如何搬家
  • 如何建设招聘网站网站促销计算
  • 河北省住房城乡建设网站wordpress 属于多个栏目
  • 做一整套网站需要什么c 做网站用什么框架
  • 网站建设丨选择金手指排名15东莞手机网站制作
  • 一个公司如何把网站做好合肥网站建设zgkr
  • 兰州网站建设方法苏州新区网站制作建设推
  • 住房和城乡建设报名网站江门网站制作方案
  • 网站建设服务采购方案模板甘肃省城乡与建设厅网站首页
  • 如何给网站加引导页wordpress如何改页面模板
  • 网站外链海南建设银行官方网站
  • 宿州哪家做网站不做页面设置标签wordpress
  • 广州外贸建网站网页设计与制作教程专题分析
  • 培训餐饮网站建设抖音同城推广怎么弄
  • app案例网站广州公司注册核名
  • 如何做弹幕网站wordpress菜单怎么设置目录册
  • 包头网站公司wordpress主题 插件
  • 网站建设策划基本流程图化妆网站模板
  • 个人网站建设优化网站跳转链接生成
  • 景区门户网站建设北京注册公司代理
  • html网站开发心得湛江网站制作
  • 文化网站建设机关网站建设创新