珠海专业的免费建站,东莞高端网站定制,做分销的网站,泰安房产网签西湖春晓【智能大数据分析】实验1 MapReduce实验#xff1a;单词计数 文章目录 【智能大数据分析】实验1 MapReduce实验#xff1a;单词计数一、实验目的二、实验要求三、实验原理1 MapReduce编程2 Java API解析 四、实验步骤1 启动Hadoop2 验证HDFS上没有wordcount的文件夹3 上传数据…【智能大数据分析】实验1 MapReduce实验单词计数 文章目录 【智能大数据分析】实验1 MapReduce实验单词计数一、实验目的二、实验要求三、实验原理1 MapReduce编程2 Java API解析 四、实验步骤1 启动Hadoop2 验证HDFS上没有wordcount的文件夹3 上传数据文件到HDFS4 编写MapReduce程序5 使用命令将代码打包6 在Hadoop集群上提交jar文件来运行MapReduce作业 在我之前的一篇博客中云计算中的大数据处理尝试HDFS和MapReduce的应用有过类似的操作具体不会的可以去这篇博客中看看。 一、实验目的
基于MapReduce思想编写WordCount程序。
二、实验要求
1.理解MapReduce编程思想
2.会编写MapReduce版本WordCount
3.会执行该程序
4.自行分析执行过程。
三、实验原理
MapReduce是一种计算模型简单的说就是将大批量的工作数据分解MAP执行然后再将结果合并成最终结果REDUCE。这样做的好处是可以在任务被分解后可以通过大量机器进行并行计算减少整个操作的时间。
适用范围数据量大但是数据种类小可以放入内存。
基本原理及要点将数据交给不同的机器去处理数据划分结果归约。
理解MapReduce和Yarn在新版Hadoop中Yarn作为一个资源管理调度框架是Hadoop下MapReduce程序运行的生存环境。其实MapRuduce除了可以运行Yarn框架下也可以运行在诸如MesosCorona之类的调度框架上使用不同的调度框架需要针对Hadoop做不同的适配。
一个完成的MapReduce程序在Yarn中执行过程如下
1ResourcManager JobClient向ResourcManager提交一个job。
2ResourcManager向Scheduler请求一个供MRAppMaster运行的container然后启动它。
3MRAppMaster启动起来后向ResourcManager注册。
4ResourcManagerJobClient向ResourcManager获取到MRAppMaster相关的信息然后直接与MRAppMaster进行通信。
5MRAppMaster算splits并为所有的map构造资源请求。
6MRAppMaster做一些必要的MR OutputCommitter的准备工作。
7MRAppMaster向RM(Scheduler)发起资源请求得到一组供map/reduce task运行的container然后与NodeManager一起对每一个container执行一些必要的任务包括资源本地化等。
8MRAppMaster 监视运行着的task 直到完成当task失败时申请新的container运行失败的task。
9当每个map/reduce task完成后MRAppMaster运行MR OutputCommitter的cleanup 代码也就是进行一些收尾工作。
10当所有的map/reduce完成后MRAppMaster运行OutputCommitter的必要的job commit或者abort APIs。
11MRAppMaster退出。
1 MapReduce编程
编写在Hadoop中依赖Yarn框架执行的MapReduce程序并不需要自己开发MRAppMaster和YARNRunner因为Hadoop已经默认提供通用的YARNRunner和MRAppMaster程序 大部分情况下只需要编写相应的Map处理和Reduce处理过程的业务程序即可。
编写一个MapReduce程序并不复杂关键点在于掌握分布式的编程思想和方法主要将计算过程分为以下五个步骤
1迭代。遍历输入数据并将之解析成key/value对。
2将输入key/value对映射(map)成另外一些key/value对。
3依据key对中间数据进行分组(grouping)。
4以组为单位对数据进行归约(reduce)。
5迭代。将最终产生的key/value对保存到输出文件中。
2 Java API解析
1InputFormat用于描述输入数据的格式常用的为TextInputFormat提供如下两个功能
数据切分 按照某个策略将输入数据切分成若干个split以便确定Map Task个数以及对应的split。
为Mapper提供数据给定某个split能将其解析成一个个key/value对。
2OutputFormat用于描述输出数据的格式它能够将用户提供的key/value对写入特定格式的文件中。
3Mapper/Reducer: Mapper/Reducer中封装了应用程序的数据处理逻辑。
4Writable:Hadoop自定义的序列化接口。实现该类的接口可以用作MapReduce过程中的value数据使用。
5WritableComparable在Writable基础上继承了Comparable接口实现该类的接口可以用作MapReduce过程中的key数据使用。因为key包含了比较排序的操作。
四、实验步骤
本实验主要分为确认前期准备编写MapReduce程序打包提交代码。查看运行结果这几个步骤详细如下
1 启动Hadoop 2 验证HDFS上没有wordcount的文件夹 此时HDFS上应该是没有wordcount文件夹。
3 上传数据文件到HDFS
wordcount.txt:
4 编写MapReduce程序
主要编写Map和Reduce类其中Map过程需要继承org.apache.hadoop.mapreduce包中Mapper类并重写其map方法Reduce过程需要继承org.apache.hadoop.mapreduce包中Reduce类并重写其reduce方法。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;import java.io.IOException;
import java.util.StringTokenizer;public class WordCount {public static class TokenizerMapper extends MapperObject, Text, Text, IntWritable {private final static IntWritable one new IntWritable(1);private Text word new Text();//map方法划分一行文本读一个单词写出一个单词,1public void map(Object key, Text value, Context context)throws IOException, InterruptedException {StringTokenizer itr new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);//写出单词,1}}}//定义reduce类对相同的单词把它们中的VList值全部相加public static class IntSumReducer extends ReducerText, IntWritable, Text, IntWritable {private IntWritable result new IntWritable();public void reduce(Text key, IterableIntWritable values,Context context)throws IOException, InterruptedException {int sum 0;for (IntWritable val : values) {sum val.get();//相当于Hello,1Hello,1将两个1相加}result.set(sum);context.write(key, result);//写出这个单词和这个单词出现次数单词单词出现次数}}public static void main(String[] args) throws Exception {//主方法函数入口Configuration conf new Configuration(); //实例化配置文件类Job job new Job(conf, WordCount); //实例化Job类job.setInputFormatClass(TextInputFormat.class); //指定使用默认输入格式类TextInputFormat.setInputPaths(job, args[0]); //设置待处理文件的位置job.setJarByClass(WordCount.class); //设置主类名job.setMapperClass(TokenizerMapper.class); //指定使用上述自定义Map类job.setCombinerClass(IntSumReducer.class); //指定开启Combiner函数job.setMapOutputKeyClass(Text.class); //指定Map类输出的K类型job.setMapOutputValueClass(IntWritable.class); //指定Map类输出的V类型job.setPartitionerClass(HashPartitioner.class); //指定使用默认的HashPartitioner类job.setReducerClass(IntSumReducer.class); //指定使用上述自定义Reduce类job.setNumReduceTasks(Integer.parseInt(args[2])); //指定Reduce个数job.setOutputKeyClass(Text.class); //指定Reduce类输出的,K类型job.setOutputValueClass(Text.class); //指定Reduce类输出的,V类型job.setOutputFormatClass(TextOutputFormat.class); //指定使用默认输出格式类TextOutputFormat.setOutputPath(job, new Path(args[1])); //设置输出结果文件位置System.exit(job.waitForCompletion(true) ? 0 : 1); //提交任务并监控任务状态}
}5 使用命令将代码打包
上述代码在编译运行的时候会进行报错
主要是在Hadoop版本3.x中Job构造函数已过时需要使用Job.getInstance构造函数。另外有一个潜在的问题是设置job.setOutputValueClass为Text.class但您的Reduce类输出类型是IntWritable这两者需要匹配。
下面是修改之后的代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.util.StringTokenizer;public class WordCount {public static class TokenizerMapper extends MapperObject, Text, Text, IntWritable {private final static IntWritable one new IntWritable(1);private Text word new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}public static class IntSumReducer extends ReducerText, IntWritable, Text, IntWritable {private IntWritable result new IntWritable();public void reduce(Text key, IterableIntWritable values, Context context) throws IOException, InterruptedException {int sum 0;for (IntWritable val : values) {sum val.get();}result.set(sum);context.write(key, result);}}public static void main(String[] args) throws Exception {Configuration conf new Configuration();Job job Job.getInstance(conf, WordCount);job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(args[0])); // 输入路径FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出路径System.exit(job.waitForCompletion(true) ? 0 : 1);}
}下面是打包过程 在我们创建的java项目根目录下创建一个名为src的文件夹。 将所有的Java源代码文件.java移动到src文件夹中。 在项目根目录中创建一个名为Manifest.txt的文件用于指定JAR文件的入口点。 在Manifest.txt文件中添加以下内容 Main-Class: Main-Class将Main-Class替换为包含main方法的主类的完整类名例如我的是SalesDriver 回到项目根目录下使用以下命令编译Java源代码并创建一个临时目录来保存编译后的类文件 mkdir classes
javac -d classes src/*.java如果你在使用编译命令时出现程序包×××存在的问题这个时候我们需要将Hadoop相关的jar文件添加到编译路径中才可以解决 javac -classpath /usr/local/servers/hadoop/share/hadoop/common/hadoop-common-3.1.3.jar:/usr/local/servers/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.3.jar -d classes src/*.java注意上面的命令是一个而不是多个。 创建一个空的JAR文件命名为WordCount.jar jar -cvf WordCount.jar -C classes/ .将编译后的类文件和Manifest.txt添加到JAR文件中 jar -uf WordCount.jar -C classes/ .jar -uf WordCount.jar Mainfest.txt 到现在我们的整个java项目就打包成功了。
6 在Hadoop集群上提交jar文件来运行MapReduce作业
我们将打包好的WordCount.jar使用如下命令提交到集群上面
hadoop jar WordCount.jar WordCount /user/wordcount.txt /wordcount顺利执行之后终端会打印如下信息 然后我们查看我们的输出目录
hdfs dfs -ls /wordcount红框所示就是我们需要的结果我们将其下载下来进行查看
hdfs dfs -get /wordcount1/part-r-00000 /root/WordCount
vim part-r-00000可以看见运行出我们想要的结果了至此本次实验结束。