MapReduce中局部合并组件
1、什么是Combiner
Combiner 是 MapReduce 程序中 Mapper 和 Reducer 之外的一种组件,它的作用是在 maptask 之后给 maptask 的结果进行局部汇总,以减轻 reducetask 的计算负载,减少网络传输
2、如何使用Combiner
Combiner 和 Reducer 一样,编写一个类,然后继承 Reducer,reduce 方法中写具体的 Combiner 逻辑,然后在 job 中设置 Combiner 组件:job.setCombinerClass(MyCombiner.class)
3、本案例基于WordCount基础
Mapper:WordMapper.java
/**
* Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
*
*/
public class WordMapper extends Mapper<LongWritable,Text,Text,LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1、切分单词
String[] words = value.toString().split(" ");
//2、将单词进行计数 <hello,1>这样的key value往外进行输出的
for (String word : words) {
//3、写入到上下文中
context.write(new Text(word),new LongWritable(1));
}
}
}
Combiner:MyCombiner.java
/**
* Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
* KEYIN:map传过来的key
* VALUEIN:单词
* KEYOUT:单词
* VALUEOUT:次数
*/
public class MyCombiner extends Reducer<Text,LongWritable,Text,LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
//1、定义一个变量
long count = 0;
//2、对values进行迭代
for (LongWritable value : values) {
count += value.get();
}
//3、写入到上下文中
context.write(key,new LongWritable(count));
}
}
Reducer:WordReducer.java
/**
* Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
*/
public class WordReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
//1、定义一个变量
long count = 0;
//2、迭代
for (LongWritable value : values) {
count += value.get();
}
//3、写入到上下文
context.write(key,new LongWritable(count));
}
}
Driver:JobMain.java
/**
* Combiner局部合并 案例
*/
public class JobMain {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
//一、初始化job
Job job = Job.getInstance(configuration, "mycombiner");
//二、配置job 8个小步骤
//1、设置输入
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("D://input/test2.txt"));
//2、设置Mapper类,并设置k2 v2
job.setMapperClass(WordMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//3分区 4排序 5局部合并 6分组
//5、局部合并设置 本案例的重点
job.setCombinerClass(MyCombiner.class);
//7、设置Reducer类,并设置k3 v3
job.setReducerClass(WordReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//8、设置输出
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("D://wordoutcombiner"));
//三、等待完成
boolean b = job.waitForCompletion(true);
System.out.println(b);
System.exit(b ? 0 : 1);
}
}
test2.txt

结果

网友评论