美文网首页
Hadoop之MapReduce

Hadoop之MapReduce

作者: 汤汤的汤 | 来源:发表于2020-04-25 11:36 被阅读0次

Hadoop大数据技术体系

框架
参考https://www.jianshu.com/p/17bee8316848

MapReduce

框图

从wordcount开始

参考:wordcount实例

  • Map: for each (k,v) ---> produce new set of (k,v) pairs
  • Reduce: produce one (k,v) for each distinct key
    比如wordcount中:


    map过程
    reduce过程

    代码:

package com.felix;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
/**
 * 
 * 描述:WordCount explains by Felix
 * @author Hadoop Dev Group
 */
public class WordCount
{
    /**
     * MapReduceBase类:实现了Mapper和Reducer接口的基类(其中的方法只是实现接口,而未作任何事情)
     * Mapper接口:
     * WritableComparable接口:实现WritableComparable的类可以相互比较。所有被用作key的类应该实现此接口。
     * Reporter 则可用于报告整个应用的运行进度,本例中未使用。 
     * 
     */
    public static class Map extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, IntWritable>
    {
        /**
         * LongWritable, IntWritable, Text 均是 Hadoop 中实现的用于封装 Java 数据类型的类,这些类实现了WritableComparable接口,
         * 都能够被串行化从而便于在分布式环境中进行数据交换,你可以将它们分别视为long,int,String 的替代品。
         */
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        
        /**
         * Mapper接口中的map方法:
         * void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter)
         * 映射一个单个的输入k/v对到一个中间的k/v对
         * 输出对不需要和输入对是相同的类型,输入对可以映射到0个或多个输出对。
         * OutputCollector接口:收集Mapper和Reducer输出的<k,v>对。
         * OutputCollector接口的collect(k, v)方法:增加一个(k,v)对到output
         */
        public void map(LongWritable key, Text value,
                OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException
        {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens())
            {
                word.set(tokenizer.nextToken());
                output.collect(word, one);
            }
        }
    }
    public static class Reduce extends MapReduceBase implements
            Reducer<Text, IntWritable, Text, IntWritable>
    {
        public void reduce(Text key, Iterator<IntWritable> values,
                OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException
        {
            int sum = 0;
            while (values.hasNext())
            {
                sum += values.next().get();
            }
            output.collect(key, new IntWritable(sum));
        }
    }
    public static void main(String[] args) throws Exception
    {
        /**
         * JobConf:map/reduce的job配置类,向hadoop框架描述map-reduce执行的工作
         * 构造方法:JobConf()、JobConf(Class exampleClass)、JobConf(Configuration conf)等
         */
        JobConf conf = new JobConf(WordCount.class);
        conf.setJobName("wordcount");           //设置一个用户定义的job名称
        conf.setOutputKeyClass(Text.class);    //为job的输出数据设置Key类
        conf.setOutputValueClass(IntWritable.class);   //为job输出设置value类
        conf.setMapperClass(Map.class);         //为job设置Mapper类
        conf.setCombinerClass(Reduce.class);      //为job设置Combiner类
        conf.setReducerClass(Reduce.class);        //为job设置Reduce类
        conf.setInputFormat(TextInputFormat.class);    //为map-reduce任务设置InputFormat实现类
        conf.setOutputFormat(TextOutputFormat.class);  //为map-reduce任务设置OutputFormat实现类
        /**
         * InputFormat描述map-reduce中对job的输入定义
         * setInputPaths():为map-reduce job设置路径数组作为输入列表
         * setInputPath():为map-reduce job设置路径数组作为输出列表
         */
        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        JobClient.runJob(conf);         //运行一个job
    }
}

MapReduce框架组成

参考:MapReduce框架组成

MapReduce主要包括JobClient、JobTracker、TaskTracker、HDFS四个部分。


mapreduce框架
  1. JobClient:配置参数Configuration,并打包成jar文件存储在HDFS上,将文件路径提交给JobTracker的master服务,然后由master创建每个task将它们分发到各个TaskTracker服务中去执行。
  2. JobTracker:这是一个master服务,程序启动后,JobTracker负责资源监控和作业调度。JobTracker监控所有的TaskTracker和job的健康状况,一旦发生失败,即将之转移到其他节点上,同时JobTracker会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器,而调度器会在资源出现空闲时,选择合适的任务使用这些资源。在Hadoop 中,任务调度器是一个可插拔的模块,用户可以根据自己的需要设计相应的调度器。
  3. TaskTracker:运行在多个节点上的slaver服务。TaskTracker主动与JobTracker通信接受作业,并负责直接执行每个任务。TaskTracker 会周期性地通过Heartbeat 将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker 发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)。TaskTracker 使用“slot”等量划分本节点上的资源量。“slot”代表计算资源(CPU、内存等)。一个Task 获取到一个slot 后才有机会运行,而Hadoop 调度器的作用就是将各个TaskTracker 上的空闲slot 分配给Task 使用。slot 分为Map slot 和Reduce slot 两种,分别供MapTask 和Reduce Task 使用。TaskTracker 通过slot 数目(可配置参数)限定Task 的并发度。
    Task分为Map Task和Reduce Task两种,均由TaskTracker启动。
    逻辑角度分析作业运行顺序:输入分片(input split)、map阶段、combiner阶段、shuffle阶段、reduce阶段。

逻辑流程

  • input split:在map计算之前,程序会根据输入文件计算split,每个input split针对一个map任务。input split存储的并非是数据本身,而是一个分片长度和一个记录数据的位置的数组。
  • map阶段:即执行map函数。
  • combiner阶段:这是一个可选择的函数,实质上是一种reduce操作。combiner是map的后续操作,主要是在map计算出中间文件前做一个简单的合并重复key值的操作。
  • shuffle阶段:指从map输出开始,包括系统执行排序即传送map输出到reduce作为输入的过程。另外针对map输出的key进行排序又叫sort阶段。map端shuffle,简单来说就是利用combiner对数据进行预排序,利用内存缓冲区来完成。reduce端的shuffle包括复制数据和归并数据,最终产生一个reduce输入文件。shuffle过程有许多可调优的参数来提高MapReduce的性能,其总原则就是给shuffle过程尽量多的内存空间。
  • reduce阶段:即执行reduce函数并存到hdfs文件系统中。

相关文章

  • MapReduce

    MapReduce介绍 在Hadoop中计算模型使用的是MapReduce。Hadoop MapReduce计算编...

  • Hadoop5-Mapreduce shuffle及优化

    Hadoop-Mapreduce shuffle及优化 转载 MapReduce简介 在Hadoop MapRed...

  • 凭借这份pdf,我成功拿下了阿里、腾讯、美团等offer(大数据

    Hadoop 概念 HDFS MapReduce Hadoop MapReduce作业的生命周期 Spark 概念...

  • Hadoop 之 MapReduce

    1 MapReduce 概述 MapReduce 是一个分布式运算程序的编程框架,是用户开发基于 Hadoop 的...

  • Hadoop之MapReduce

    MapReduce 思想:分而治之 Map(分):在分的阶段,我们只需要提供Map阶段的逻辑就好,不需要关心原始数...

  • Hadoop之MapReduce

    MapReduce是Hadoop的一个分布式计算框架,用于编写批处理应用程序。编写好的程序可以提交到Hadoop集...

  • Hadoop之MapReduce

    Hadoop大数据技术体系 : MapReduce 框图 从wordcount开始 参考:wordcount实例 ...

  • Hadoop之MapReduce

    一.MapReduce 1. MapReduce定义 Mapreduce是一个分布式运算程序的编程框架,是用户开发...

  • 大数据学习-spark

    spark比较于Hadoop-MapReduce Hadoop 提供的 MapReduce 框架处理大数据的时候,...

  • 1. hdfs实例

    hadoop包含hdfs和MapReduce,hdfs负责数据文件存储,是hadoop的基础,MapReduce是...

网友评论

      本文标题:Hadoop之MapReduce

      本文链接:https://www.haomeiwen.com/subject/wfxbbhtx.html