网站后期维护流程,润滑油东莞网站建设技术支持,wordpress修改阅读量,百度网址大全简单版MR源码解析
new Job(): 读取本地文件, xml配置job.start(): 启动线程job的run():线程方法 runTasks(): 传入对应的接口#xff0c;启动map或者reduceMapTask类的run(): 设置map阶段的参数#xff0c;初始化任务#xff0c;创建上下文对象 创建读取器LineRecordReader判断是…MR源码解析
new Job(): 读取本地文件, xml配置job.start(): 启动线程job的run():线程方法 runTasks(): 传入对应的接口启动map或者reduceMapTask类的run(): 设置map阶段的参数初始化任务创建上下文对象 创建读取器LineRecordReader判断是否压缩 compressFactory如果没有压缩使用seek方法mapTask的write()进行溢写mapper类的init()方法设置溢写百分比和缓冲区大小collector收集器进行map阶段数据类型检查和分数数量检查keySerializer 进行数据的序列化调用自己写的bean对象kvmeta.put(): 写入环形缓冲区mapPhase结束 数据量达到缓冲区的80%对索引进行快速排序input.close():关闭输入关闭输出并同时将缓冲区数据按照分区写入磁盘。 如果开启了combine进行数据合并 mergePart归并分区combine第二次合并如果溢写次数小于3就不合并了collector.close()关闭环形缓冲区 reduceTask的run方法 submit: 5个reduce并行提交cLeanTask初始化shuffle类map的排序recuce中的归并排序Merger合并器两次归并排序先内存归并后磁盘归并抓取数据可以从本地或者网络中抓取sort :归并排序reduce阶段 创建上下文对象调用reducer的run方法real.write(): LineRecordWrite写入HDFS
使用MR来进行拷贝去重
拷贝values写入上下文时需要迭代遍历去重values写入上下文时不遍历
使用MR来实现join操作 实现TableBean类四个属性空参构造器get-set方法 write():序列化 out.writeUTF():该方法有换行不会连在一起 readFields(): 反序列化 实现mapper类 setup() 使用context上下文对象获取InputSplit类强制类型转换为FileSplit类getPath().getName()获取文件名称 map() 切分split封装context写出
public class TableMapper extends MapperLongWritable, Text, Text,TableBean {private String filename;private Text outK;private TableBean outV;//初始化,每个文件开始一次maptask并进行一次初始化//获取到文件的名称Overrideprotected void setup(MapperLongWritable, Text, Text, TableBean.Context context) throws IOException, InterruptedException {//拿到切片信息FileSplit split (FileSplit) context.getInputSplit();filename split.getPath().getName();outK new Text();outV new TableBean();}Overrideprotected void map(LongWritable key, Text value, MapperLongWritable, Text, Text, TableBean.Context context) throws IOException, InterruptedException {//1. 获取一行String line value.toString();//2.判断是哪个文件的if(filename.contains(order)){//处理的是订单表String[] split line.split(\t);//封装outK.set(split[1]);//pid作为keyoutV.setId(split[0]);outV.setPid(split[1]);outV.setAmount(Integer.parseInt(split[2]));outV.setTableName(order);outV.setPname();}else{//处理的是商品表String[] split line.split( );
// System.out.println( Arrays.toString(split) );
// System.out.println( split[1] );//封装outK.set(split[0]);//pid作为keyoutV.setId();outV.setPid(split[0]);outV.setAmount(0);outV.setTableName(pd);outV.setPname(split[1]);}//写出context.write(outK, outV);}
}实现reduce类 为了分辨map传递过来的数据是哪个表给bean对象添加一个表名属性在mapper类中给对应表的抓取过程中添加标记在获取到value时不能直接使用等于号进行赋值values是Iterable集合比较特殊属性赋值工具类BeanUtils.copyProperties(dest, src);
public class TableReducer extends ReducerText, TableBean, TableBean, NullWritable {private ArrayListTableBean orderBeans;private TableBean pdBean;Overrideprotected void setup(ReducerText, TableBean, TableBean, NullWritable.Context context) throws IOException, InterruptedException {//1.创建集合orderBeans new ArrayList();pdBean new TableBean();}Overrideprotected void reduce(Text key, IterableTableBean values, ReducerText, TableBean, TableBean, NullWritable.Context context) throws IOException, InterruptedException {orderBeans.clear();//清空集合//2.遍历赋值for (TableBean value : values) {if (order.equals(value.getTableName())) {TableBean temp new TableBean();try {BeanUtils.copyProperties(temp,value);} catch (IllegalAccessException e) {throw new RuntimeException(e);} catch (InvocationTargetException e) {throw new RuntimeException(e);}orderBeans.add(temp);} else {//商品表try {BeanUtils.copyProperties(pdBean, value);} catch (IllegalAccessException e) {throw new RuntimeException(e);} catch (InvocationTargetException e) {throw new RuntimeException(e);}}}//循环遍历orderBeans赋值pdnamefor (TableBean orderBean : orderBeans) {orderBean.setPname(pdBean.getPname());context.write(orderBean,NullWritable.get());}}
}总结这种写法在reduce阶段创建了对象和集合这些方式都是比较消耗资源的容易造成数据倾斜问题。
MR在环形缓冲区快排时倒排索引反向溢写会导致数据反向输出类似栈结构的的先进后出。