MapReduce编程初步及源码分析

初识MapReduce

先看一张图

MapReduce程序词频统计源码分析

  • 前言
    • hdfs中有自己的数据类型,java中的Int对应IntWritable,String对应Text
    • 为了实现文件序列化,编程中必须把Int、String转变为上面的两个
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class WordCount {

public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{//定义MAP的输入输出的Key-Value类型,第一对key-value的key不一定是字符串,所以申明为Object类型

private final static IntWritable one = new IntWritable(1);//key-value中经常用到1,申明为常量
private Text word = new Text();//相当于java中的String

// map方法每次只处理一行【应该是的,我在另一个程序中打印输出是每次一行,还待深究】
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {//这里三个变量,前两个对应上面的,context封装了输出的key-value
StringTokenizer itr = new StringTokenizer(value.toString());//字符串分词器
// String[] strs = value.toString().split(" "); //等于上面这行,就是分词
while (itr.hasMoreTokens()) {// 取出子串,并赋1,写入context。不用管context存在什么地方
word.set(itr.nextToken());
context.write(word, one);
//有个概念要知道,这里他给每个词都赋值value=1,实际上value=2也是可以的(
// 比如hello hello,可以一次计算value=2),看你怎么处理了(value=1省去了一步相同词再汇总过程)
}
}
}

public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {//如上,不过输入、出都是字符串-Int型
private IntWritable result = new IntWritable();//对每个key,求value的和为result,比如<hello,3>

public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();// get是将IntWritable转为Int类型,set相反,见下面
}
result.set(sum);// sum是Int类型,hadoop无法将之序列化,所以转为IntWritable类型
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
// 作业调度
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);// 指定作业调度的类
job.setMapperClass(TokenizerMapper.class);//指定Mapper类
job.setCombinerClass(IntSumReducer.class);//指定Combiner类
job.setReducerClass(IntSumReducer.class);//指定Reducer类
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) { //传入的文件参数可为多个【最后一个是输出文件】
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);//等待执行,0是正常退出,1是错误退出
}
}

Windows下实现MapReduce编程初步

宿主机上开发环境配置

基于MapReduce统计学生平均成绩

如图,在hdfs的/expr/studentgrades/grades/ 下创建,几个文件,随便写几行数据,类似如下(学号 成绩):

1
2
3
1001 90
1002 100
...

代码及注释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package gradesAverage;

import java.io.IOException;
import java.util.StringTokenizer;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import mapReduceTest.wordCount.WordCount;
import mapReduceTest.wordCount.WordCount.IntSumReducer;
import mapReduceTest.wordCount.WordCount.TokenizerMapper;

public class GradesAverage {

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private Text student = new Text();
private IntWritable grade = new IntWritable();

/* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, org.apache.hadoop.mapreduce.Mapper.Context)
*/
/* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, org.apache.hadoop.mapreduce.Mapper.Context)
*/
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// StringTokenizer iTokenizer = new StringTokenizer(value.toString(),"\n");
System.out.println("key is:"+key+"value is: "+value.toString());
// while (iTokenizer.hasMoreTokens()) {
//
// }
String[] list_strs = value.toString().split(" ");
// 因为每行只有一个学号和对应成绩,不需要考虑切分多个词
student.set(list_strs[0]);
grade.set(Integer.parseInt(list_strs[1]));
context.write(student, grade);

// String line = value.toString(); //将输入的纯文本文件的数据转化成String
// System.out.println(line);//为了便于程序的调试,输出读入的内容
// //将输入的数据先按行进行分割
// StringTokenizer tokenizerArticle = new StringTokenizer(line,"\n");
// //分别对每一行进行处理
// while(tokenizerArticle.hasMoreTokens()){
// //每行按空格划分
// StringTokenizer tokenizerLine = new StringTokenizer(tokenizerArticle.nextToken());
// String strName = tokenizerLine.nextToken(); //学生姓名部分
// String strScore = tokenizerLine.nextToken();//成绩部分
// Text name = new Text(strName);//学生姓名
// int scoreInt = Integer.parseInt(strScore);//学生成绩score of student
// context.write(name, new IntWritable(scoreInt));//输出姓名和成绩
// }
}
}

public static class gradesAverageReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable gradesSum = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
int grades = 0;
for (IntWritable val : values) {
sum += 1;
grades += val.get();
}
System.out.println("student is:"+key.toString()+",grades is:"+grades+",sum is:"+sum);
gradesSum.set((int)grades/sum);
context.write(key, gradesSum);
}
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration(); // Hadoop配置类
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true"); // 集群交叉提交
/*
* conf.set("hadoop.job.user", "hadoop"); conf.set("mapreduce.framework.name",
* "yarn"); conf.set("mapreduce.jobtracker.address", namenode_ip + ":9001");
* conf.set("yarn.resourcemanager.hostname", namenode_ip);
* conf.set("yarn.resourcemanager.resource-tracker.address", namenode_ip +
* ":8031"); conf.set("yarn.resourcemtanager.address", namenode_ip + ":8032");
* conf.set("yarn.resourcemanager.admin.address", namenode_ip + ":8033");
* conf.set("yarn.resourcemanager.scheduler.address", namenode_ip + ":8034");
* conf.set("mapreduce.jobhistory.address", namenode_ip + ":10020");
*/

// 2.设置MapReduce作业配置信息
String jobName = "GradesAverage"; // 定义作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(GradesAverage.class); // 指定作业类
job.setJar("export\\GradesAverage.jar"); // 指定本地jar包
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(gradesAverageReducer.class); // 指定Combiner类
job.setReducerClass(gradesAverageReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 3.设置作业输入和输出路径
String dataDir = "/expr/studentgrades/grades"; // 实验数据目录
String outputDir = "/expr/studentgrades/output"; // 实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
// 如果输出目录已存在则删除
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outPath)) {
fs.delete(outPath, true);
}

// 4.运行作业
System.out.println("Job: " + jobName + " is running...");
if (job.waitForCompletion(true)) {
System.out.println("统计 success!");
System.exit(0);
} else {
System.out.println("统计 failed!");
System.exit(1);
}
}
}

运行结果如下:

1
2


查看平均成绩结果文件,目录结构如图

觉得有帮助的话,不妨加个鸡腿,O(∩_∩)O哈哈~