美文网首页
MapReduce中局部合并组件

MapReduce中局部合并组件

作者: 羋学僧 | 来源:发表于2020-08-07 17:10 被阅读0次

MapReduce中局部合并组件

1、什么是Combiner

Combiner 是 MapReduce 程序中 Mapper 和 Reducer 之外的一种组件,它的作用是在 maptask 之后给 maptask 的结果进行局部汇总,以减轻 reducetask 的计算负载,减少网络传输

2、如何使用Combiner

Combiner 和 Reducer 一样,编写一个类,然后继承 Reducer,reduce 方法中写具体的 Combiner 逻辑,然后在 job 中设置 Combiner 组件:job.setCombinerClass(MyCombiner.class)

3、本案例基于WordCount基础

Mapper:WordMapper.java

/**
* Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
*
*/
public class WordMapper extends Mapper<LongWritable,Text,Text,LongWritable> {
   @Override
   protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
       //1、切分单词
       String[] words = value.toString().split(" ");

       //2、将单词进行计数  <hello,1>这样的key value往外进行输出的
       for (String word : words) {
           //3、写入到上下文中
           context.write(new Text(word),new LongWritable(1));
       }

   }
}

Combiner:MyCombiner.java

/**
 * Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
 *     KEYIN:map传过来的key
 *     VALUEIN:单词
 *     KEYOUT:单词
 *     VALUEOUT:次数
 */
public class MyCombiner extends Reducer<Text,LongWritable,Text,LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        //1、定义一个变量
        long count = 0;

        //2、对values进行迭代
        for (LongWritable value : values) {
            count += value.get();
        }

        //3、写入到上下文中
        context.write(key,new LongWritable(count));
    }
}

Reducer:WordReducer.java

/**
 * Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
 */
public class WordReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        //1、定义一个变量
        long count = 0;

        //2、迭代
        for (LongWritable value : values) {
            count += value.get();
        }

        //3、写入到上下文
        context.write(key,new LongWritable(count));
    }
}

Driver:JobMain.java

/**
 * Combiner局部合并  案例
 */
public class JobMain {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        //一、初始化job
        Job job = Job.getInstance(configuration, "mycombiner");

        //二、配置job  8个小步骤
        //1、设置输入
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("D://input/test2.txt"));

        //2、设置Mapper类,并设置k2 v2
        job.setMapperClass(WordMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        //3分区 4排序 5局部合并 6分组
        //5、局部合并设置 本案例的重点
        job.setCombinerClass(MyCombiner.class);

        //7、设置Reducer类,并设置k3 v3
        job.setReducerClass(WordReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //8、设置输出
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("D://wordoutcombiner"));

        //三、等待完成
        boolean b = job.waitForCompletion(true);
        System.out.println(b);
        System.exit(b ? 0 : 1);
    }
}

test2.txt


结果

相关文章

网友评论

      本文标题:MapReduce中局部合并组件

      本文链接:https://www.haomeiwen.com/subject/ijafdktx.html