美文网首页
MapReduce实现矩阵乘法

MapReduce实现矩阵乘法

作者: 操作系统 | 来源:发表于2018-04-26 08:02 被阅读0次

说明

为实现matrix1 * matrix2矩阵相乘(matrix1每一个行的列元素分别与matrix2每一列的行元素进行相乘)。实现思路是将matrix2矩阵进行转置,以实现matrix1与转置后的matrix2的每一行元素对应相乘。

Step1:实现矩阵matrix2.txt转置

在hdfs根目录mkdir一个matrix/step1_input文件夹,上传matrix2.txt文件,文件内容如下:

matrix2.txt

1   1_0,2_3,3_-1,4_2,5_-3
2   1_1,2_3,3_5,4_-2,5_-1
3   1_0,2_1,3_4,4_-1,5_2
4   1_-2,2_2,3_-1,4_1,5_2

创建step1包,在此包内,首先创建Mapper1类
Mapper1.java

package step1;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class Mapper1 extends Mapper<LongWritable, Text, Text, Text>{
    private Text outKey = new Text();
    private Text outValue =new Text();
    
    /**
     * key:1
     * value:1  1_0,2_3,3_-1,4_2,5_-3
     */
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        String[] rowAndLine = value.toString().split("\t");
        
        //matrix row number
        String row = rowAndLine[0];
        String[] lines = rowAndLine[1].split(",");
        
        //[1_0,2_3,3_-1,4_2,5_-3]
        for(int i=0;i<lines.length;i++) {
            String column = lines[i].split("_")[0];
            String valueStr = lines[i].split("_")[1];
            //key:column value:rownumber_value
            outKey.set(column);
            outValue.set(row+"_"+valueStr);
            context.write(outKey, outValue);
        }
    }

}

Reducer1.java

package step1;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class Reducer1 extends Reducer<Text, Text, Text, Text>{
    private Text outKey = new Text();
    private Text outValue = new Text();
    
    protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        StringBuilder sb = new StringBuilder();
        for(Text text:values) {
            sb.append(text+",");
        }
        String line = null;
        if(sb.toString().endsWith(",")) {
            line = sb.substring(0,sb.length()-1);
        }
        
        outKey.set(key);
        outValue.set(line);
        
        context.write(outKey, outValue);
    }
    
    
}

MR1.java

package step1;

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MR1 {
    private static String inPath = "/matrix/step1_input/matrix2.txt";
    
    private static String outPath = "/matrix/step1_output";
    
    private static String hdfs ="hdfs://ha-namenode-b:9000";
    
    public int run() {
        try {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", hdfs);
            Job job = Job.getInstance(conf,"step1");
            
            job.setJarByClass(MR1.class);
            job.setMapperClass(Mapper1.class);
            job.setReducerClass(Reducer1.class);
            
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            
            FileSystem fs = FileSystem.get(conf);
            Path inputPath = new Path(inPath);
            if(fs.exists(inputPath)) {
                FileInputFormat.addInputPath(job, inputPath);
            }
            
            Path outputPath = new Path(outPath);
            fs.delete(outputPath,true);
            
            FileOutputFormat.setOutputPath(job, outputPath);
            
            return job.waitForCompletion(true)?1:-1;
        
        }catch(IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return -1;
    }
    public static void main(String[] args) {
        int result = -1;
        result = new MR1().run();
        if(result==1) {
            System.out.println("step1 success...");
        }else if(result==-1) {
            System.out.println("step1 failed...");
        }
    }
}

step2

hadoop fs -mkdir /matrix/step2_input
hadoop fs -put ~/demo/matrix1.txt /matrix/step2_input
1   1_1,2_2,3_-2,4_0
2   1_3,2_3,3_4,4_-3
3   1_-2,2_0,3_2,4_3
4   1_5,2_3,3_-1,4_2
5   1_-4,2_2,3_0,4_2

Mapper2.java

package step2;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class Mapper2 extends Mapper<LongWritable, Text, Text, Text>{
    private Text outKey = new Text();
    private Text outValue =new Text();
    
    private List<String> cacheList = new ArrayList<String>();
    
    
    
    protected void setup(Context context)
            throws IOException, InterruptedException {
        super.setup(context);
        FileReader fr = new FileReader("matrix2");
        BufferedReader br = new BufferedReader(fr);
        String line = null;
        while((line=br.readLine())!=null) {
            cacheList.add(line);
        }
        fr.close();
        br.close();
    }



    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        String row_matrix1 = value.toString().split("\t")[0];
        String[] column_value_array_matrix1 = value.toString().split("\t")[1].split(",");
        for(String line:cacheList) {
            String row_matrix2 = line.toString().split("\t")[0];
            String[] column_value_array_matrix2 = line.toString().split("\t")[1].split(",");
            
            int result = 0;
            for(String column_value_matrix1:column_value_array_matrix1) {
                String column_matrix1 = column_value_matrix1.split("_")[0];
                String value_matrix1 = column_value_matrix1.split("_")[1];
                
                for(String column_value_matrix2:column_value_array_matrix2) {
                    if(column_value_matrix2.startsWith(column_matrix1 + "_")) {
                        String value_matrix2 = column_value_matrix2.split("_")[1];
                        result += Integer.valueOf(value_matrix1) *Integer.valueOf(value_matrix2); 
                    }
                }
            }
            outKey.set(row_matrix1);
            outValue.set(row_matrix2+"_"+result);
            context.write(outKey, outValue);
        }
    }
}

Reducer2.java

package step2;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class Reducer2 extends Reducer<Text, Text, Text, Text> {
    private Text outKey = new Text();
    private Text outValue = new Text();
    
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        StringBuilder sb = new StringBuilder();
        for(Text value:values) {
            sb.append(value+",");
        }
        String result = null;
        if(sb.toString().endsWith(",")) {
            result = sb.substring(0,sb.length()-1);
        }
        
        outKey.set(key);
        outValue.set(result);
        context.write(outKey, outValue);
    }
}

MR2.java

package step2;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;




public class MR2 {
    private static String inPath = "/matrix/step2_input/matrix1.txt";
    
    private static String outPath = "/matrix/output";
    
    private static String cache = "/matrix/step1_output";
    
    private static String hdfs ="hdfs://localhost:9000";
    
        
    public int run() throws URISyntaxException {
        try {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", hdfs);
            Job job = Job.getInstance(conf,"step2");
            
            job.addCacheArchive(new URI(cache+"#matrix2"));
            
            job.setJarByClass(MR2.class);
            job.setMapperClass(Mapper2.class);
            job.setReducerClass(Reducer2.class);
            
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            
            FileSystem fs = FileSystem.get(conf);
            Path inputPath = new Path(inPath);
            if(fs.exists(inputPath)) {
                FileInputFormat.addInputPath(job, inputPath);
            }
            
            Path outputPath = new Path(outPath);
            fs.delete(outputPath,true);
            
            FileOutputFormat.setOutputPath(job, outputPath);
            System.out.println("111111...");
            return job.waitForCompletion(true)?1:-1;
        
        } catch(IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch(URISyntaxException e) {
            e.printStackTrace();
        }
        return -1;                                                      
    }
    public static void main(String[] args) {
        try {
            int result=-1;
            result = new MR2().run();
        
            if(result == 1) {
                System.out.println("step2 success...");
            }
            else if(result == -1){
                System.out.println("step2 failed...");
            }
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
    }
}

相关文章

  • MapReduce实现矩阵乘法

    说明 为实现matrix1 * matrix2矩阵相乘(matrix1每一个行的列元素分别与matrix2每一列的...

  • 《数据结构和算法分析C++版》第三版部分习题(1-3章)

    1.2 编程实现大数加法,乘法,和指数操作乘法采用了快速乘法的算法 1.6 稀疏矩阵(含0元素很多的矩阵mat[1...

  • 图形变换原理

    概述: 图形变换大体分为缩放,平移,拉伸,旋转.他们的原理是矩阵的乘法. 矩阵的乘法: 矩阵的乘法规则:两个矩阵相...

  • 矩阵乘法在python中的表示

    从数学表达上来说,矩阵乘法有: 矩阵的乘法(matmul product):这就是线性代数里面的矩阵乘法 內积/点...

  • sparse matrix 的分布式存储和计算

    矩阵乘法 我们先来补充一下矩阵乘法的数学知识: 矩阵乘法的意义: 对一个矩阵进行左乘一个矩阵的运算,相当于对该矩阵...

  • 主从模式(实现矩阵乘法)

    主从模式(实现矩阵乘法) - 知乎 (zhihu.com)[https://zhuanlan.zhihu.com/...

  • 图形矩阵-----Matrix

    一、矩阵的定义 二、矩阵与矩阵的乘法 矩阵的乘法满足以下运算律:结合律,分配律,但是矩阵乘法不满足交换律。更详细的...

  • 卷积网络和卷积计算

    矩阵乘法和卷积乘法区别: 卷积的乘法和矩阵的乘法不一样,卷积的求和相当于加权求和,也可以称为加权叠加,矩阵相乘是将...

  • MIT-18.06-线性代数(第三讲)

    第三讲 —— 矩阵乘法和逆 1. 矩阵乘法 1.1 行列内积 假设矩阵乘矩阵,得到矩阵,。回顾单个元素的求法,取特...

  • 矩阵链乘法

    矩阵A和矩阵B能够相乘,只有当矩阵A和矩阵B相容。 矩阵链乘法的前提就是降低矩阵的乘法规模。之所以可以这样,是因为...

网友评论

      本文标题:MapReduce实现矩阵乘法

      本文链接:https://www.haomeiwen.com/subject/vttplftx.html