当前位置: 首页 > news >正文

网站伪静态全站伪静态马鞍山专业网站制作公司

网站伪静态全站伪静态,马鞍山专业网站制作公司,网站建设与网页设计总结,o2o网站建设公司排名广播流是什么#xff1f; 将一条数据广播到所有的节点。使用 dataStream.broadCast() 广播流使用场景#xff1f; 一般用于动态加载配置项。比如lol#xff0c;每天不断有人再投诉举报#xff0c;客服根本忙不过来#xff0c;腾讯内部做了一个判断#xff0c;只有vip3…广播流是什么 将一条数据广播到所有的节点。使用 dataStream.broadCast() 广播流使用场景 一般用于动态加载配置项。比如lol每天不断有人再投诉举报客服根本忙不过来腾讯内部做了一个判断只有vip3以上的客户的投诉才会有人工一对一回复过了一段时间大家都发现vip3才有人工都开始充钱到vip3此时人还是很多于是只有vip4上的客户才能人工回复 vip3-vip4 这种判断标准在不断的变化。此时就需要广播流。因为此时数据只有1条需要多个节点都收到这个变化的数据。 广播流怎么用 一般通过connect合流去操作 a connect b.broadcast 。a是主流也就是数据流b是配置变化流 不多说直接上demo开箱即用 package com.chenchi.broadcast;import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoProcessFunction; import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Collector;import java.util.HashMap; import java.util.Random;public class BroadCastStreamDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamPattern patternDataStream env.addSource(new ChangeSource());DataStreamUser userDataStream env.addSource(new CustomerSource());userDataStream.print(user);patternDataStream.print(pattern);//test1 直接合流 不广播。只会在一个节点更新。 用于特殊需求 // userDataStream // .keyBy(user - user.userId) // .connect(patternDataStream) // .process(new CustomerSimpleProcess()) // .print();//test2// 定义广播状态的描述器创建广播流 如何保存需要的广播数据呢 这个案例是通过map保留变化数据 // userDataStream // .keyBy(user - user.userId) // .connect(patternDataStream.broadcast()) // .process(new CustomerSimpleProcess()) // .print();//test3MapStateDescriptorVoid, Pattern bcStateDescriptor new MapStateDescriptor(patterns, Types.VOID, Types.POJO(Pattern.class));//通过描述器 更新BroadcastStreamPattern broadcast patternDataStream.broadcast(bcStateDescriptor);userDataStream.keyBy(user - user.userId).connect(broadcast).process(new CustomerBroadCastProcess()).print();env.execute();}private static class CustomerBroadCastProcess extends KeyedBroadcastProcessFunctionInteger, User, Pattern, String {Overridepublic void processElement(User user, KeyedBroadcastProcessFunctionInteger, User, Pattern, String.ReadOnlyContext readOnlyContext, CollectorString collector) throws Exception {Integer userVip user.getVip();//获取广播流的数据 不是通过map保存Pattern pattern readOnlyContext.getBroadcastState(new MapStateDescriptor(patterns, Types.VOID, Types.POJO(Pattern.class))).get(null);if (pattern!null){Integer patternVip pattern.vip;String result 当前系统需要的vip等级 patternVip ,用户id user.userId ,vip userVip;if (userVip patternVip){resultresult符合要求;}else {resultresult不符合要求;}collector.collect(result);}else {System.out.println(pattern is null );}}Overridepublic void processBroadcastElement(Pattern pattern, KeyedBroadcastProcessFunctionInteger,User, Pattern, String.Context context, CollectorString collector) throws Exception {BroadcastStateVoid, Pattern bcState context.getBroadcastState(new MapStateDescriptor(patterns, Types.VOID, Types.POJO(Pattern.class)));// 将广播状态更新为当前的patternbcState.put(null, pattern);}}public static class CustomerSimpleProcess extends CoProcessFunctionUser, Pattern, String {ValueStateInteger vip; //这个是保留主流的state的。 不是保留广播流的stateHashMapString,Integer vipMap;Overridepublic void open(Configuration parameters) throws Exception {vip getRuntimeContext().getState(new ValueStateDescriptor(vip, Integer.class));vipMapnew HashMapString,Integer();super.open(parameters);}Overridepublic void processElement1(User user, CoProcessFunctionUser, Pattern, String.Context context, CollectorString collector) throws Exception {Integer userVip user.getVip();Integer patternVip vipMap.getOrDefault(vip, 0);String result 当前系统需要的vip等级 patternVip ,用户id user.userId ,vip userVip;if (userVippatternVip){resultresult符合要求;}else {resultresult不符合要求;}collector.collect(result);}Overridepublic void processElement2(Pattern pattern, CoProcessFunctionUser, Pattern, String.Context context, CollectorString collector) throws Exception {vipMap.put(vip,pattern.vip);}}public static class User {public Integer userId;public Integer vip;public User() {}public User(Integer userId, Integer vip) {this.userId userId;this.vip vip;}public Integer getUserId() {return userId;}public void setUserId(Integer userId) {this.userId userId;}public Integer getVip() {return vip;}public void setVip(Integer vip) {this.vip vip;}Overridepublic String toString() {return Action{ userId userId , vip vip \ };}}// 定义行为模式POJO类包含先后发生的两个行为public static class Pattern {public Integer vip;public Pattern() {}public Pattern(Integer vip) {this.vip vip;}Overridepublic String toString() {return Pattern{ vip vip \ };}}private static class CustomerSource implements SourceFunctionUser {boolean run true;Overridepublic void run(SourceContextUser sourceContext) throws Exception {while (true) {Integer userId new Random().nextInt(1000);Integer vip new Random().nextInt(10);sourceContext.collect(new User(userId, vip));Thread.sleep(1000);}}Overridepublic void cancel() {run false;}}private static class ChangeSource implements SourceFunctionPattern {boolean run true;Overridepublic void run(SourceContextPattern sourceContext) throws Exception {int i 1;while (true) {sourceContext.collect(new Pattern(i));Thread.sleep(5000);}}Overridepublic void cancel() {run false;}}}demo思想以上述vip做例子获取用户不断投诉的id和vip等级 数据库保存可以享受人工服务的vip等级该等级可以自行调整(我是随着时间变化主键增大)。 test1 不广播 注意看pattern:4 print vip2的消息但是不代表是task4收到的消息我们看到1输出了vip2 但是task10 task9都还是vip0 说明流没有广播除非此处并行度设置为1 test2 map保存变化数据 test3通过描述器获取数据 和test2 一样不过要注意因为两个流的数据有先后可能还没有pattern就来了user信息所以建议先初始化或者先添加pattern流。
http://www.hkea.cn/news/14282383/

相关文章:

  • 随州网站建设便宜网站建设容易吗
  • 手机创建网站的软件南阳做网站的公
  • 接网站 建设如何做导航网站
  • 湛江大型网站模板建设排版设计说明
  • 三位数的域名网站购买wordpress现有模板
  • 做公司 网站建设价格低外贸 wordpress
  • 用电脑做网站服务器可视化编辑器wordpress
  • 定制网站开发方案ppt有口碑的企业网站建设
  • 网站域名被抢注做商标深圳网站建设手机网站建设
  • 网站建设的一般流程是晋城市 制作网站
  • 温岭 网站建设计算机软件开发培训
  • 网站建设报价 下载wordpress数据库里查看密码
  • 椒江建设网保障性阳光工程网站微信小程序如何生成二维码
  • wordpress简单主题优化型网站建设的基本要求
  • 长沙做php的网站建设电商网站开发设计方法
  • 如何在网上注册公司网站安阳流调报告
  • 茂名网站建设优化seo专做自驾游的网站
  • html5 服装网站微信小程序的代码
  • 晚上做设计挣钱的网站重庆网站制
  • 给别人做网站的公司自己做的网站验证码出不来怎么
  • 如何查询网站icp备案大连网站建设意动科技公司
  • 海南爱心扶贫网站是哪个公司做的怎么在网站上做下载
  • 做网站不挣钱个人网页模板html源代码
  • 网站建设与管理用什么软件有哪些内容郑州模板建站平台
  • 网站建设基本内容建筑作品集网站代做
  • 电脑做网站软件南充房管局网站查询房产
  • 东莞网站优化关键词费用网站内做关键词连接
  • 网站下拉单设计欣赏简述网站开发的具体流程
  • 上海网站建设规划延庆网站建设
  • 做视频网站怎么赚钱儿童创意产品设计