当前位置: 首页 > news >正文

佛山新网站建设效果下载手机微信

佛山新网站建设效果,下载手机微信,电子商务网站建设与安全,网站建设的论文参考文献3.7.1Reduce Join 1、工作原理 Map端的主要工作#xff1a;为来自不同表或文件的key/value对#xff0c;打标签以区别不同来源的记录。然后用连接字段作为key#xff0c;其余部分和新加的标志作为value#xff0c;最后进行输出。 Reduce端的主要工作#xff1a;在Reduc…3.7.1Reduce Join 1、工作原理 Map端的主要工作为来自不同表或文件的key/value对打标签以区别不同来源的记录。然后用连接字段作为key其余部分和新加的标志作为value最后进行输出。 Reduce端的主要工作在Reduce端以连接字段作为key的分组已经完成我们只需要在每一个分组当中将那些来源于不同文件的记录在Map阶段已经达标分开最后进行合并就ok了。 3.7.2Reduce Join案例实操 1、需求 将商品信息表中数据根据商品pid合并到订单数据表中。 1输入 order.txt 1001 01 1 1002 02 2 1003 03 3 1004 01 4 1005 02 5 1006 03 6pd.txt 01 小米 02 华为 03 格力2输出 1001 小米 1 1001 小米 1 1002 华为 2 1002 华为 2 1003 格力 3 1003 格力 32、需求分析 通过将关联条件作为Map输出的key将两表满足Join条件的数据并携带数据所来源的文件信息发往同一个ReduceTask在Reduce中进行数据的串联如图4-20所示。 3、代码实现 1创建商品和订合并后的Bean类 package com.cuiyf41.join;import org.apache.hadoop.io.Writable;import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;public class TableBean implements Writable {private String order_id; // 订单idprivate String p_id; // 产品idprivate int amount; // 产品数量private String pname; // 产品名称private String flag; // 表的标记public TableBean() {super();}public TableBean(String order_id, String p_id, int amount, String pname, String flag) {super();this.order_id order_id;this.p_id p_id;this.amount amount;this.pname pname;this.flag flag;}Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(order_id);out.writeUTF(p_id);out.writeInt(amount);out.writeUTF(pname);out.writeUTF(flag);}Overridepublic void readFields(DataInput in) throws IOException {order_id in.readUTF();p_id in.readUTF();amount in.readInt();pname in.readUTF();flag in.readUTF();}Overridepublic String toString() {return order_id \t pname \t amount;}public String getOrder_id() {return order_id;}public void setOrder_id(String order_id) {this.order_id order_id;}public String getP_id() {return p_id;}public void setP_id(String p_id) {this.p_id p_id;}public int getAmount() {return amount;}public void setAmount(int amount) {this.amount amount;}public String getPname() {return pname;}public void setPname(String pname) {this.pname pname;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag flag;} }2编写TableMapper类 package com.cuiyf41.join;import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class TableMapper extends MapperLongWritable, Text, Text, TableBean {String name;TableBean bean new TableBean();Text k new Text();Overrideprotected void setup(MapperLongWritable, Text, Text, TableBean.Context context) throws IOException, InterruptedException {// 1 获取输入文件切片FileSplit split (FileSplit) context.getInputSplit();// 2 获取输入文件名称name split.getPath().getName();}Overrideprotected void map(LongWritable key, Text value, MapperLongWritable, Text, Text, TableBean.Context context) throws IOException, InterruptedException {// 1 获取输入数据String line value.toString();// 2切割String[] fields line.split(\t);// 3 不同文件分别处理if (name.startsWith(order)) {// 订单表处理// 3.1 封装bean对象bean.setOrder_id(fields[0]);bean.setP_id(fields[1]);bean.setAmount(Integer.parseInt(fields[2]));bean.setPname();bean.setFlag(order);k.set(fields[1]);} else {// 产品表处理// 3.2 封装bean对象bean.setP_id(fields[0]);bean.setPname(fields[1]);bean.setFlag(pd);bean.setAmount(0);bean.setOrder_id();k.set(fields[0]);}// 4 写出context.write(k, bean);} }3编写TableReducer类 package com.cuiyf41.join;import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException; import java.util.ArrayList;public class TableReducer extends ReducerText,TableBean, TableBean, NullWritable {Overrideprotected void reduce(Text key, IterableTableBean values, ReducerText, TableBean, TableBean, NullWritable.Context context) throws IOException, InterruptedException {// 1准备存储订单的集合ArrayListTableBean orderBeans new ArrayList();// 2 准备bean对象TableBean pdBean new TableBean();for (TableBean bean : values) {if (order.equals(bean.getFlag())) {// 订单表// 拷贝传递过来的每条订单数据到集合中TableBean tmpBean new TableBean();try {BeanUtils.copyProperties(tmpBean, bean);} catch (Exception e) {e.printStackTrace();}orderBeans.add(tmpBean);} else {// 产品表try {// 拷贝传递过来的产品表到内存中BeanUtils.copyProperties(pdBean, bean);} catch (Exception e) {e.printStackTrace();}}}// 3 表的拼接for(TableBean bean:orderBeans) {bean.setPname(pdBean.getPname());// 4 数据写出去context.write(bean, NullWritable.get());}} }4编写TableDriver类 package com.cuiyf41.join;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class TableDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 0 根据自己电脑路径重新配置args new String[]{e:/input/inputtable,e:/output1};// 1 获取配置信息或者job对象实例Configuration conf new Configuration();Job job Job.getInstance(conf);// 2 指定本程序的jar包所在的本地路径job.setJarByClass(TableDriver.class);// 3 指定本业务job要使用的Mapper/Reducer业务类job.setMapperClass(TableMapper.class);job.setReducerClass(TableReducer.class);// 4 指定Mapper输出数据的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TableBean.class);// 5 指定最终输出的数据的kv类型job.setOutputKeyClass(TableBean.class);job.setOutputValueClass(NullWritable.class);// 6 指定job的输入原始文件所在目录Path input new Path(args[0]);Path output new Path(args[1]);// 如果输出路径存在则进行删除FileSystem fs FileSystem.get(conf);if (fs.exists(output)) {fs.delete(output,true);}FileInputFormat.setInputPaths(job, input);FileOutputFormat.setOutputPath(job, output);// 7 将job中配置的相关参数以及job所用的java类所在的jar包 提交给yarn去运行boolean result job.waitForCompletion(true);System.exit(result ? 0 : 1);} }4、测试 运行程序查看结果 1001 小米 1 1001 小米 1 1002 华为 2 1002 华为 2 1003 格力 3 1003 格力 35、总结 缺点这种方式中合并的操作是在Reduce阶段完成的Reduce端的处理压力太大Map节点的运算负载则很低资源利用率不高且在Reduce杰顿极易产生数据倾斜。 解决方案Map端实现数据合并。 Mapper// 1 获取输入数据 3.7.3Map Join 一、概述 1使用场景 Map Join适用于一张表十分小、一张表很大的场景。 2优点 思考在Reduce端处理过多的表非常容易产生数据倾斜。怎么办 在Map端缓存多张表提前处理业务逻辑这样增加Map端业务减少Reduce端数据的压力尽可能的减少数据倾斜。 3具体办法采用DistributedCache 1在Mapper的setup阶段将文件读取到缓存集合中。 2在驱动函数中加载缓存。 // 缓存普通文件到Task运行节点。 job.addCacheFile(new URI(file://e:/cache/pd.txt))二、Map Join案例实操 1、需求 将商品信息表中数据根据商品pid合并到订单数据表中。 1输入数据 order.txt 1001 01 1 1002 02 2 1003 03 3 1004 01 4 1005 02 5 1006 03 6pd.txt 01 小米 02 华为 03 格力2输出数据 2、需求分析 MapJoin适用于关联表中有小表的情形。 3、实现代码 1先在驱动模块中添加缓存文件 package com.cuiyf41.mapjoin;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException; import java.net.URI; import java.net.URISyntaxException;public class DistributedCacheDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {// 0 根据自己电脑路径重新配置args new String[]{e:/input/inputtable2, e:/output1};// 1 获取job信息Configuration configuration new Configuration();Job job Job.getInstance(configuration);// 2 设置加载jar包路径job.setJarByClass(DistributedCacheDriver.class);// 3 关联mapjob.setMapperClass(DistributedCacheMapper.class);// 4 设置最终输出数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 5 设置输入输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 6 加载缓存数据job.addCacheFile(new URI(file:///e:/input/inputcache/pd.txt));// 7 Map端Join的逻辑不需要Reduce阶段设置reduceTask数量为0job.setNumReduceTasks(0);// 8 提交boolean result job.waitForCompletion(true);System.exit(result ? 0 : 1);} }2读取缓存的文件数据 package com.cuiyf41.mapjoin;import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map;public class DistributedCacheMapper extends MapperLongWritable, Text, Text, NullWritable {MapString, String pdMap new HashMap();Text k new Text();Overrideprotected void setup(MapperLongWritable, Text, Text, NullWritable.Context context) throws IOException, InterruptedException {// 1 获取缓存的文件URI[] cacheFiles context.getCacheFiles();String path cacheFiles[0].getPath().toString();BufferedReader reader new BufferedReader(new InputStreamReader(new FileInputStream(path), UTF-8));String line;while(StringUtils.isNotEmpty(line reader.readLine())){// 2 切割String[] fields line.split(\t);// 3 缓存数据到集合pdMap.put(fields[0], fields[1]);}// 4 关流reader.close();}Overrideprotected void map(LongWritable key, Text value, MapperLongWritable, Text, Text, NullWritable.Context context) throws IOException, InterruptedException {// 1 获取一行String line value.toString();// 2 截取String[] fields line.split(\t);// 3 获取产品idString pId fields[1];// 4 获取商品名称String pdName pdMap.get(pId);// 5 拼接k.set(line \t pdName);// 6 写出context.write(k, NullWritable.get());} }
http://www.hkea.cn/news/14260617/

相关文章:

  • 谁有做爰网站号成都建筑设计公司排名
  • 能免费建手机网站吗物联网网站开发
  • 企业网站主页素描模板办文明网站做文明网民活动方案
  • 大连建设网站制作营销策划书模板
  • 建站网站 国外平台公司转型
  • 官方网站建设哪儿有网站seo知识
  • 网站建设最新外文翻译kj6699的seo综合查询
  • 用jquery做网站信息安全工程师含金量
  • t恤在线制作网站个人建设网站论文
  • 衡水网站制作公司wordpress男性主题
  • 建设一个网站需要注意哪些内容一级消防工程师考试报名
  • 重庆网站制作一般多少钱长沙房产网官网
  • 有什么ae做动图的网站做电容元器件的网站有哪些
  • 自己怎么做电影网站做企业官网的步骤
  • 中小企业网站制作方法备案网站名称与实际网站名称不一致
  • 国内网站建设网站排名百度平台app
  • excel做网站小米wifi设置网址入口网站
  • 百度地图 企业网站一个可以做行程的网站
  • 怎样注册网站建立网页国外订房网站怎么和做
  • 网站建设陆金手指科捷11网站文章不收录的原因
  • 电子商务网站的实施包括哪些步骤展厅设计说明
  • 上海微信小程序网站建设高端网站建设 n磐石网络
  • 湘潭企业网站建设 p磐石网络移动页面
  • 做网站文案用哪个软件番禺大石做网站
  • 网站界面设计需要首先做市场研究对吗广安网站seo
  • 图书馆门户网站建设网站怎么做域名实名认证吗
  • 电商网站为什么要提高网站友好度网站建设具备什么条件
  • 机关 网站 建设方案仿做唯品会网站
  • 登录深圳住房和建设局网站如何制作假网页
  • 做网站前段用什么软件品牌营销和品牌推广