凤岗镇网站仿做,上海市建设工程咨询网,网站开发前端应用程序,网站制作的步骤不包括哪些背景
在大数据的实时处理中#xff0c;实时的大屏展示已经成了一个很重要的展示项#xff0c;比如最有名的双十一大屏实时销售总价展示。除了这个#xff0c;还有一些其他场景的应用#xff0c;比如我们在我们的后台系统实时的展示我们网站当前的pv、uv等等#xff0c;其…背景
在大数据的实时处理中实时的大屏展示已经成了一个很重要的展示项比如最有名的双十一大屏实时销售总价展示。除了这个还有一些其他场景的应用比如我们在我们的后台系统实时的展示我们网站当前的pv、uv等等其实做法都是类似的。
今天我们就做一个最简单的模拟电商统计大屏的小例子我们抽取一下最简单的需求。
实时计算出当天零点截止到当前时间的销售总额计算出各个分类的销售top3每秒钟更新一次统计结果
实例讲解
构造数据
首先我们通过自定义source 模拟订单的生成生成了一个Tuple2,第一个元素是分类第二个元素表示这个分类下产生的订单金额金额我们通过随机生成. /*** 模拟生成某一个分类下的订单生成*/public static class MySource implements SourceFunctionTuple2String,Double{private volatile boolean isRunning true;private Random random new Random();String category[] {女装, 男装,图书, 家电,洗护, 美妆,运动, 游戏,户外, 家具,乐器, 办公};Overridepublic void run(SourceContextTuple2String,Double ctx) throws Exception{while (isRunning){Thread.sleep(10);//某一个分类String c category[(int) (Math.random() * (category.length - 1))];//某一个分类下产生了price的成交订单double price random.nextDouble() * 100;ctx.collect(Tuple2.of(c, price));}}Overridepublic void cancel(){isRunning false;}}复制代码
构造统计结果类 public static class CategoryPojo{// 分类名称private String category;// 改分类总销售额private double totalPrice;// 截止到当前时间的时间private String dateTime;getter and setter ........}复制代码
定义窗口和触发器
DataStreamCategoryPojo result dataStream.keyBy(0).window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))).trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1))).aggregate(new PriceAggregate(),new WindowResult());复制代码
首先我们定义一个窗口期是一天的滚动窗口然后设置一个1秒钟的触发器之后进行聚合计算.
集合计算 private static class PriceAggregateimplements AggregateFunctionTuple2String,Double,Double,Double{Overridepublic Double createAccumulator(){return 0D;}Overridepublic Double add(Tuple2String,Double value, Double accumulator){return accumulator value.f1;}Overridepublic Double getResult(Double accumulator){return accumulator;}Overridepublic Double merge(Double a, Double b){return a b;}}复制代码
聚合计算也比较简单其实就是对price的简单sum操作
收集窗口结果数据
private static class WindowResultimplements WindowFunctionDouble,CategoryPojo,Tuple,TimeWindow{SimpleDateFormat simpleDateFormat new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);Overridepublic void apply(Tuple key,TimeWindow window,IterableDouble input,CollectorCategoryPojo out) throws Exception{CategoryPojo categoryPojo new CategoryPojo();categoryPojo.setCategory(((Tuple1String) key).f0);BigDecimal bg new BigDecimal(input.iterator().next());double p bg.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();categoryPojo.setTotalPrice(p);categoryPojo.setDateTime(simpleDateFormat.format(new Date()));out.collect(categoryPojo);}}复制代码
我们最聚合的结果进行简单的封装封装成CategoryPojo类以便后续处理
使用聚合窗口的结果 result.keyBy(dateTime).window(TumblingProcessingTimeWindows.of(Time.seconds(1))).process(new WindowResultProcess());复制代码
接下来我们要使用上面聚合的结果所以我们使用上面的window聚合结果流又定义了时间是1秒的滚动窗口.
如何使用窗口的结果可以参考flink的官网[1]
结果统计
接下来我们做最后的结果统计在这里我们会把各个分类的总价加起来就是全站的总销量金额然后我们同时使用优先级队列计算出分类销售的Top3打印出结果在生产过程中我们可以把这个结果数据发到hbase或者redis等外部存储以供前端的实时页面展示。
private static class WindowResultProcessextends ProcessWindowFunctionCategoryPojo,Object,Tuple,TimeWindow{Overridepublic void process(Tuple tuple,Context context,IterableCategoryPojo elements,CollectorObject out) throws Exception{String date ((Tuple1String) tuple).f0;QueueCategoryPojo queue new PriorityQueue(3,(o1, o2)-o1.getTotalPrice() o2.getTotalPrice() ? 1 : -1);double price 0D;IteratorCategoryPojo iterator elements.iterator();int s 0;while (iterator.hasNext()){CategoryPojo categoryPojo iterator.next();if (queue.size() 3){queue.add(categoryPojo);} else {CategoryPojo tmp queue.peek();if (categoryPojo.getTotalPrice() tmp.getTotalPrice()){queue.poll();queue.add(categoryPojo);}}price categoryPojo.getTotalPrice();}ListString list queue.stream().sorted((o1, o2)-o1.getTotalPrice() o2.getTotalPrice() ? 1 : -1).map(f-(分类 f.getCategory() 销售额 f.getTotalPrice() )).collect(Collectors.toList());System.out.println(时间 date 总价 : price top3 StringUtils.join(list, ,));System.out.println(-------------);}}复制代码
示例运行结果 3 CategoryPojo{category户外, totalPrice734.45, dateTime2020-06-13 22:55:34}
2 CategoryPojo{category游戏, totalPrice862.86, dateTime2020-06-13 22:55:34}
4 CategoryPojo{category洗护, totalPrice926.83, dateTime2020-06-13 22:55:34}
3 CategoryPojo{category运动, totalPrice744.98, dateTime2020-06-13 22:55:34}
2 CategoryPojo{category乐器, totalPrice648.81, dateTime2020-06-13 22:55:34}
4 CategoryPojo{category图书, totalPrice1010.12, dateTime2020-06-13 22:55:34}
1 CategoryPojo{category家具, totalPrice880.35, dateTime2020-06-13 22:55:34}
3 CategoryPojo{category家电, totalPrice1225.34, dateTime2020-06-13 22:55:34}
2 CategoryPojo{category男装, totalPrice796.06, dateTime2020-06-13 22:55:34}
1 CategoryPojo{category女装, totalPrice1018.88, dateTime2020-06-13 22:55:34}
1 CategoryPojo{category美妆, totalPrice768.37, dateTime2020-06-13 22:55:34}
时间 2020-06-13 22:55:34 总价 : 9617.050000000001 top3 (分类家电 销售额1225.34),(分类女装 销售额1018.88),(分类图书 销售额1010.12)复制代码