美文网首页
一个ForkJoin并行任务的实际应用

一个ForkJoin并行任务的实际应用

作者: windrises | 来源:发表于2017-10-13 19:09 被阅读76次

先交代背景,因为我有好多照片存在不同的电脑和移动硬盘备份,平时又喜欢摄影,有时候相机的照片和手机拍的需要同步到所有设备,保证所有设备上的照片都是一样的,但是有时候会各种原因导致图片不同步,人为查找排除很麻烦何况我有10000+照片(其实里面没有几张自己的。。),首先想到的是根据文件名判重,考虑到会有重名的文件,严谨起见采取对照片摘要MD5,再去判重。其次是对这些MD5的过滤找到设备各自独有的照片进行复制同步即可。(插一句,有想过用Git管理这些二进制文件,没有具体试过,哪天闲了就试试~若有大侠搞过望不吝赐教!)

今天主要进行的是给一个顶级目录,递归生成所有文件的MD5信息,采取的是Fork/Join并行框架+命令行的方式 {#今天主要进行的是给一个顶级目录递归生成所有文件的md5信息采取的是forkjoin并行框架命令行的方式}

下边这段可以略过,不过看看也没坏处~

至于为什么这样做,说说心路历程,刚开始想用Qt+C++的方式,考虑到开发效率(我想速成)再考虑之前Qt写的几个工具,果断放弃;后来想用Python无奈自己Python太弱,也放弃;再后来想用JavaFx,还需要边学边搞,win下编程C#MFC压根没搞过,这啥啥不会真捉急啊,总不能逼我在Linux上拿C||Shell写吧,好吧我承认我事儿倍多:);那就用控制台吧,写个Java这样还可以把几个步骤分开来写单独优化,操作起来几个命令行敲一下黑屏白字滚动起来。。。想想都有点酸爽。扯了这么多,最终就是实现一个Java工具类,给他一个顶级目录,他会自动生成一个清单文件,里面是MD5和对应的文件路径信息,当然是基于并行执行任务框架做的,接下来开始吧

并行执行核心思想是大的任务递归分解为粒度较小的任务,然后每个小的任务执行完返回执行结果,汇总成一个最终的结果 {#并行执行核心思想是大的任务递归分解为粒度较小的任务然后每个小的任务执行完返回执行结果汇总成一个最终的结果}

在本文中,遍历一个给定的顶级文件夹并输出所有内部文件的摘要,这就是最终的任务,这个任务要并行执行为了提高效率必须划分成小任务并行执行它们并收集执行结果,先看看整体的代码:

public class Filter {
    static ForkJoinPool mPool = null;
    static List<File> mList = new ArrayList<>();

    public static void main(String[] args) {
        for (int i = 0; i < args.length; i++) {
            switch (args[i]) {
            /* 指定并行执行所需线程数量 eg (-j 8) 指定线程池有8个线程*/
                case "-j":
                    mPool = new ForkJoinPool(Integer.parseInt(args[++i]));
                    break;

            /* 指定顶级目录列表 */
                default:
                    mList.add(new File(args[i]));
                    break;
            }
        }

        /* 具体分配执行 */
        new Executor().invokeTask();
    }

    /*
    * 具体的顶级任务执行器
    */
    static class Executor {
        long start = 0;
        long end1 = 0;
        long end2 = 0;

        private List<GenMd5Task> mTasks = new ArrayList<>();
        private List<Map<String, String>> mapList = new ArrayList<>();

        public Executor() {
            if (mPool == null) {
                mPool = new ForkJoinPool();
            }
        }

        public List<Map<String, String>> invokeTask() {
            end1 = end2 = start = System.currentTimeMillis();

            for (File file : mList) {
                /* 划分的子任务*/ 
                GenMd5Task task = new GenMd5Task(file);
                mTasks.add(task);
                /* 调用任务并等待返回执行结果 */
                Map<String, String> map = mPool.invoke(task);
                mapList.add(map);

                end1 = System.currentTimeMillis();

                /* 把最终计算结果记录到文件中 */ 
                FileUtil.writeMapToFile(file.getName() + System.currentTimeMillis() + "-md5.txt",
                        map);

                end2 = System.currentTimeMillis();
            }

            /* 统计耗时 */
            System.out.println("computer time: " + (end1 - start) + "ms");
            System.out.println("write file time: " + (end2 - end1) + "ms");

            mPool.shutdown();

            return mapList;
        }
    }
}

上边代码是工具类主体,需要执行参数,比如java Filter -j 8 C:\Users\yy\xxx

主要的工作都在Executor的invokeTask中完成,里面有一个GenMd5Task的类,继承自RecursiveTask,表示一个又返回值的task,还有一个没有返回值的类RecursiveAction

public class GenMd5Task extends RecursiveTask<Map<String, String>> {
    private HashMap<String, String> mMap;
    private File mFile;
    private List<GenMd5Task> mTasks;

    public GenMd5Task(File file) {
        mFile = file;
    }

    @Override
    protected Map<String, String> compute() {
        mMap = new HashMap();
        File[] files = mFile.listFiles();
        mTasks = new ArrayList<>();

        for (File f : files) {
            System.out.println(f.getAbsoluteFile());
            if (f.isFile()) {
                mMap.put(DigestUtil.getMd5(FileUtil.getBytes(f)), f.getAbsolutePath());
            } else {
                /* 上边是文件直接计算MD5放进当前结果Map中 */
                /* 下边是当前目录一级子目录,再次创建一个子任务,添加进任务列表,一会并发执行这些任务 */ 
                GenMd5Task task = new GenMd5Task(f);
                mTasks.add(task);
            }
        }

        /* 执行所有任务 并汇总结果*/ 
        invokeAll(mTasks);
        for (GenMd5Task task : mTasks) {
            mMap.putAll(task.join());
        }

        return mMap;
    }
}

上边看到是直接按照所有子目录对应一个子任务的方式去分配的,这样可能并不是性能最优,留待以后解决,同时还需要注意一点的是,这儿是在任务计算过程中动态添加任务的,每一个上级任务都在等待其所有的子任务执行完毕返回结果,像一个倒立的树状结构,每一个叶子节点都是一个最小粒度的任务,在这里是当前目录下只包含文件没有其它目录的时候

其中用到的两个工具类附上代码: {#其中用到的两个工具类附上代码}

public class FileUtil {
    public static byte[] getBytes(File f) {
        ByteArrayOutputStream bos = new ByteArrayOutputStream((int) f.length());
        BufferedInputStream in = null;

        try {
            in = new BufferedInputStream(new FileInputStream(f));
            int buf_size = 4096;
            byte[] buffer = new byte[buf_size];
            int len;
            while (-1 != (len = in.read(buffer, 0, buf_size))) {
                bos.write(buffer, 0, len);
            }
            return bos.toByteArray();
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        } finally {
            try {
                in.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                bos.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /* 把一个Map写入文件中 */
    public static <K, V> void writeMapToFile(String fileName, Map<K, V> map) {
        File f = new File(fileName);
        try {
            FileWriter fw = new FileWriter(f);
            for (Map.Entry<K, V> entry : map.entrySet())
                fw.write(entry.getKey().toString() + ":" + entry.getValue().toString() + "\n");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
public class DigestUtil {
    /**
     * 计算字符串MD5值
     *
     * @param str 需要计算的字符串
     * @return String
     */
    public static String getMd5(String str) {
        try {
            return getMd5(str.getBytes("UTF-8"));
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static String getMd5(byte[] bytes) {
        try {
            byte[] hash = MessageDigest.getInstance("MD5").digest(bytes);
            StringBuilder hex = new StringBuilder(hash.length * 2);
            for (byte b : hash) {
                if ((b & 0xFF) < 0x10)
                    hex.append("0");
                hex.append(Integer.toHexString(b & 0xFF));
            }
            return hex.toString();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}

这只是完成了第一步生成摘要,有些地方仍然需要优化,比如怎么最大限度喂饱每一个线程的问题,比如当文件读取遇到IO瓶颈,CPU利用率一直比较低的问题,甚至过程打断的处理等等,并且还需要完成查重同步的部分~

有不妥的地方,欢迎指正,一起进步~

迁移自CSDN
2016年05月31日 00:15:15
http://blog.csdn.net/u013262051/article/details/51541627

相关文章

  • 一个ForkJoin并行任务的实际应用

    先交代背景,因为我有好多照片存在不同的电脑和移动硬盘备份,平时又喜欢摄影,有时候相机的照片和手机拍的需要同步到所有...

  • J.U.C -ForkJoin 框架

    ForkJoin 框架 是 jdk7 提供的一个用于执行并行任务的框架 简介 ForkJoin 是一个执行并行任务...

  • 并行流与顺序流

    ForkJoin框架 普通for java8并行流,底层就是forkjoin框架

  • ForkjoinPool -1

    ForkJoin是用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结...

  • ForkJoin 学习使用笔记

    ForkJoin 学习使用笔记 Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大...

  • 畅卓下载优化方案

    一个定时线程池,两个forkjoin池 流程:定时任务,查库forkjoin池,存硬盘forkjoin池 好处:查...

  • rxjs中常用的操作符

    Rx.Observable.forkJoin(...args [resultSelector]) 并行运行所有可观...

  • 并行执行任务的ForkJoin框架简介

    Fork/Join框架简介 从JDK1.7开始,Java提供Fork/Join框架用于并行执行任务,它的思想就是讲...

  • java并发编程之Fork/Join 框架

    ForkJoin是Java7提供的原生多线程并行处理框架,其基本思想是将大人物分割成小任务,最后将小任务聚合起来得...

  • 多线程

    多线程的具体应用场景有哪些?实际中需要注意些什么? 后台任务,例如:发短信、发邮件、发MQ消息、记录日志。 并行计...

网友评论

      本文标题:一个ForkJoin并行任务的实际应用

      本文链接:https://www.haomeiwen.com/subject/yittuxtx.html