美文网首页
离线计算组件篇-MapReduce-Join连接

离线计算组件篇-MapReduce-Join连接

作者: CoderInsight | 来源:发表于2022-12-02 21:44 被阅读0次

11.mapreduce的join

1. reduce join

(1)、需求

订单数据表t_order:

id date pid amount
1001 20150710 P0001 2
1002 20150710 P0002 3
1002 20150710 P0003 3

商品信息表t_product

id pname category_id price
P0001 小米5 1000 2000
P0002 锤子T1 1000 3000

假如数据量巨大,两表的数据是以文件的形式存储在HDFS中,需要用mapreduce程序来实现一下SQL查询运算:

select  a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product  b on a.pid = b.id  

(2)、实现机制

  • 通过将关联的条件作为map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联
  • 定义Mapper:
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, Text> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //现在我们读取了两个文件,如何确定当前处理的这一行数据是来自哪一个文件里面的
        //方式一:通过获取文件的切片,获得文件明
        /*
        FileSplit inputSplit = (FileSplit) context.getInputSplit();//获取我们输入的文件的切片
        //获取文件名称
        String name = inputSplit.getPath().getName();
        if (name.equals("orders.txt")) {
            //订单表数据
        } else {
            //商品表数据
        }
        */
        
        String[] split = value.toString().split(",");

        //方式二:因为t_product表,都是以p开头,所以可以作为判断的依据
        if (value.toString().startsWith("p")) {
            //p0002,锤子T1,1000,3000
            //以商品id作为key2,相同商品的数据都会到一起去
            context.write(new Text(split[0]), value);
        } else {
            //order
            // 1001,20150710,p0001,2
            context.write(new Text(split[2]), value);
        }
    }
}
  • 定义Reducer:
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class ReduceJoinReducer extends Reducer<Text,Text,Text, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        String order = "";
        String product = "";
        for (Text value : values) {
            if (value.toString().startsWith("p")) {
                product = value.toString();
            }else {
                order = value.toString();
            }
        }
        // 将结果保存
        context.write(new Text(order + "\t" + product),NullWritable.get());
    }
}
  • 定义main方法
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ReduceJoinMain extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        //获取job对象
        Job job = Job.getInstance(super.getConf(), ReduceJoinMain.class.getSimpleName());
        job.setJarByClass(ReduceJoinMain.class);

        //第一步:读取文件
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path(args[0]));

        //第二步:设置自定义mapper逻辑
        job.setMapperClass(ReduceJoinMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        //分区,排序,规约,分组 省略

        //第七步:设置reduce逻辑
        job.setReducerClass(ReduceJoinReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        //第八步:设置输出数据路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean b = job.waitForCompletion(true);
        return b ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new Configuration(), new ReduceJoinMain(), args);
        System.exit(run);
    }
}

2. map join

(1)、原理阐述

  • 适用于关联表中有小表的情形;

  • 可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度

  • 将小文件上传到hdfs中,然后在Main函数中将文件添加到缓存中,然后使用java io的方式读取文件;

    在Map()函数中将要实现join的字段放在key中,从而实现两个文件之间的join。

(2)、实现示例

  • 先在mapper类中预先定义好小表,进行join
  • 引入实际场景中的解决方案:一次加载数据库或者用
  • 定义mapper类:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    //用于保存商品表的数据;productMap中的key是商品id,value是与key对应的表记录
    private Map<String, String> productMap;

    /**
     * 初始化方法,只在程序启动调用一次
     *
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {

        productMap = new HashMap<String, String>();
        // 通过context上下文对象去 获取配置文件对象
        Configuration configuration = context.getConfiguration();

        // 通过Job对象去 获取到所有的缓存文件
        // 方式一
        URI[] cacheFiles = Job.getInstance(context.getConfiguration()).getCacheFiles();
        //方式二:deprecated
        //URI[] cacheFiles = DistributedCache.getCacheFiles(configuration);

        //现在只有一个缓存文件放进了分布式缓存中
        URI cacheFile = cacheFiles[0];

        //获取FileSystem
        FileSystem fileSystem = FileSystem.get(cacheFile, configuration);
        //读取文件,获取到输入流。这里面装的都是商品表的数据
        FSDataInputStream fsDataInputStream = fileSystem.open(new Path(cacheFile));

        /**
         * 商品表数据如下:
         * p0001,xiaomi,1000,2
         * p0002,appale,1000,3
         * p0003,samsung,1000,4
         */
        //获取到BufferedReader之后,可以一行一行的读取数据
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(fsDataInputStream));
        String line = null;
        //每次循环,获得表的一行数据
        while ((line = bufferedReader.readLine()) != null) {
            String[] split = line.split(",");
            // productMap中的key是商品id,value是与key对应的表记录
            productMap.put(split[0], line);
        }
    }

    /**
     * @param key
     * @param value   订单表的记录,如 1001,20150710,p0001,2
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split(",");
        //获取订单表的商品id
        String pid = split[2];

        //获取商品表的数据
        String productLine = productMap.get(pid);

        System.out.println("Value中的数据:" + value.toString());
        System.out.println("商品表中的数据:" + productLine);
        // 此时把所有的数据直接拼接在一起了,然后便于观察最后的实际拼接的结果;在实际条件下其实也可以使用这种方式,然后在测试的没有问题之后的,可以再把这个删除
        
        context.write(new Text(value.toString() + "\t" + productLine), NullWritable.get());
    }
}

  • 定义main方法:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.net.URI;

/**
 * 需求:同reduce join
 * 实现:select a.id, a.date, b.name, b.category_id, b.price
 * from t_order a
 * join t_product b
 * on a.pid = b.id
 */
public class MapJoinMain extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        //分布式缓存的hdfs路径
        URI uri = new URI("hdfs://node01:8020/cache/product.txt");

        // 本地路径:需要用一下形式file:///C:/1、HK/.../pdts.txt   ;需要指定到具体的文件;如果是使用file:///c:\\1、HK\\3、ME...不符合语法会报错
        // URI uri = new URI("file:///C:/1、HK/3、ME/2、高级0x/1、Hadoop集群升级课件/9、MapReduce/MR第一次/12、join操作/map端join/cache/pdts.txt");
        Configuration configuration = super.getConf();
        // 添加缓存文件 方式二:deprecated
        // DistributedCache.addCacheFile(uri, configuration);

        //获取job对象
        Job job = Job.getInstance(configuration, MapJoinMain.class.getSimpleName());
        //添加缓存文件:方式一
        job.addCacheFile(uri);
        job.setJarByClass(MapJoinMain.class);

        //读取文件,解析成为key,value对
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path(args[0]));

        job.setMapperClass(MapJoinMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        //没有reducer逻辑,不用设置了
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setNumReduceTasks(2);

        boolean b = job.waitForCompletion(true);
        return b ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new Configuration(), new MapJoinMain(), args);
        System.exit(run);
    }
}

相关文章

网友评论

      本文标题:离线计算组件篇-MapReduce-Join连接

      本文链接:https://www.haomeiwen.com/subject/mloffdtx.html