asp网站建设实录源码,公司网站建设带来的好处,绩溪做网站,高端大气企业网站模板#xff08;一#xff09;需求分析
计算每个大区当天金币收入排名前N的主播
背景#xff1a; 我们有一款直播APP#xff0c;已经在很多国家上线并运营了一段时间#xff0c;产品经理希望开发一个功能#xff0c;计算前N主播排行榜#xff0c;按天更新排名信息#xf…一需求分析
计算每个大区当天金币收入排名前N的主播
背景 我们有一款直播APP已经在很多国家上线并运营了一段时间产品经理希望开发一个功能计算前N主播排行榜按天更新排名信息统计的维度有多种其中有一个维度是针对主播当天直播的金币收入进行排名。 一个大区下面包含多个国家不同大区的运营策略是不一样的所以就把不同国家划分到不同大区里面方便运营。 那这个TopN主播排行榜在统计的时候就需要分大区统计了。 针对主播每天的开播数据我们已经有了以及直播间内用户的送礼记录也都是有的。那这样其实就可以统计主播当天的金币收入了主播一天可能会开播多次所以后期在统计主播当天收入的时候是需要把他当天所有直播中的金币收入都计算在内的。
分析 我们有两份数据数据都是json格式的
video_info.log 主播的开播记录其中包含主播的iduid、直播间idvid 、大区area、视频开播时长length、增加粉丝数量follow等信息gift_record.log 用户送礼记录其中包含送礼人iduid直播间idvid礼物idgood_id金币数 量gold 等信息.
其实就是按照当天主播所有开播的直播间内的收入汇总按大区分组统计每个大区内收入排名前N的主播。
二开发步骤
1首先获取两份数据中的核心字段使用fastjson包解析数据 主播开播记录主播IDuid直播间IDvid大区area (vid,(uid,area))用户送礼记录直播间IDvid金币数量gold(vid,gold) 这样的可以把这两份数据关联到一块就能获取到大区、主播、金币这些信息了使用直播间vid进行关联。
2对用户送礼记录数据进行聚合对相同vid的数据求和 因为用户可能在一次直播中给主播送多次礼物 (vid,gold_sum) 3把这两份数据join到一块vid作为join的key (vid,((uid,area),gold_sum)) 4使用map迭代join之后的数据最后获取到uid,area,gold_sum字段由于一个主播一天可能会开播多 次后面需要基于uid和area再做一次聚合所以把数据转换成这种格式uid和area是一一对应的一个人只能属于一个大区 ((uid,area),gold_sum) 5使用reduceByKey算子对数据进行聚合 ((uid,area),gold_sum_all) 6接下来对需要使用groupByKey对数据进行分组所以先使用map进行转换 map(area,(uid,gold_sum_all)) groupByKey: area,(uid,gold_sum_all),(uid,gold_sum_all),(uid,gold_sum_all) 7使用map迭代每个分组内的数据按金币数量倒序排序取前N个最终输出area、topN 这个TopN其实就是把前几名主播的id还有金币数量拼接成一个字符串(area,topN) 8使用foreach将结果打印到控制台多个字段使用制表符分割area topN
三环境依赖
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.11/artifactId
version2.4.3/version
!--scopeprovided/scope--
/dependency
dependency
groupIdcom.alibaba/groupId
artifactIdfastjson/artifactId
version1.2.68/version
/dependency
四代码开发
object TopNScala {def main(args: Array[String]): Unit {val conf new SparkConf()conf.setAppName(TopNScala).setMaster(local)val sc new SparkContext(conf)//1首先获取两份数据中的核心字段使用fastjson包解析数据val videoInfoRDD sc.textFile(D:\\video_info.log)val giftRecordRDD sc.textFile(D:\\gift_record.log)//(vid,(uid,area))val videoInfoFieldRDD videoInfoRDD.map(line{val jsonObj JSON.parseObject(line)val vid jsonObj.getString(vid)val uid jsonObj.getString(uid)val area jsonObj.getString(area)(vid,(uid,area))})//(vid,gold)val giftRecordFieldRDD giftRecordRDD.map(line{val jsonObj JSON.parseObject(line)val vid jsonObj.getString(vid)val gold Integer.parseInt(jsonObj.getString(gold))(vid,gold)})//2对用户送礼记录数据进行聚合对相同vid的数据求和//(vid,gold_sum)val giftRecordFieldAggRDD giftRecordFieldRDD.reduceByKey(_ _)//3把这两份数据join到一块vid作为join的key//(vid,((uid,area),gold_sum))val joinRDD videoInfoFieldRDD.join(giftRecordFieldAggRDD)//4使用map迭代join之后的数据最后获取到uid、area、gold_sum字段//joinRDD: (vid,((uid,area),gold_sum))val joinMapRDD joinRDD.map(tup{//joinRDD: (vid,((uid,area),gold_sum))//获取uidval uid tup._2._1._1//获取areaval area tup._2._1._2//获取gold_sumval gold_sum tup._2._2((uid,area),gold_sum)})//5使用reduceByKey算子对数据进行聚合//((uid,area),gold_sum_all)val reduceRDD joinMapRDD.reduceByKey(_ _)//6接下来对需要使用groupByKey对数据进行分组所以先使用map进行转换//map(area,(uid,gold_sum_all))//groupByKey: area,(uid,gold_sum_all),(uid,gold_sum_all),(uid,gold_sum_all)val groupRDD reduceRDD.map(tup(tup._1._2,(tup._1._1,tup._2))).groupByKey()//7使用map迭代每个分组内的数据按照金币数量倒序排序取前N个最终输出area,t//(area,topN)val top3RDD groupRDD.map(tup{val area tup._1//toList把iterable转成list//sortBy排序默认是正序//reverse反转实现倒序效果//take(3)取前3个元素//mkString使用指定字符把集合转成字符串//uid:gold_sum_all,uid:gold_sum_all,uid:gold_sum_allval top3 tup._2.toList.sortBy(_._2).reverse.take(3).map(tuptup._1:tup._2).mkString(,)(area,top3)})//8使用foreach将结果打印到控制台多个字段使用制表符分割top3RDD.foreach(tupprintln(tup._1\ttup._2))sc.stop()}
}