美文网首页
多线程PDF下载器

多线程PDF下载器

作者: 奈斯凸米特 | 来源:发表于2019-10-08 17:20 被阅读0次
该PDF下载器支持从本地txt文件中读取和数据库读取两种方式
数据库ip我乱写的,这里集成了三种情况的数据库(本地数据库,需要验证的数据库,不需要验证的数据库)
# -*- coding: utf-8 -*-
"""
此程序用来多线程下载PDF,提供本地txt文件读取下载和数据库读取下载两种方式
"""
import hashlib
import os
import sys
from multiprocessing import Queue
from threading import Thread, Lock

import pymongo
import requests

g_num = 0  # 创建全局变量
lock = Lock()  # 创建全局互斥锁


class DownloadPDF(object):

    def __init__(self):
        # 创建消息队列
        self.q = Queue()

    def run(self):
        self.mkdir()
        self.get_pdf_msg()
        pdf_size = self.q.qsize()  # 总PDF数
        # 开启多线程
        all_th = []
        for i in range(10):
            th = Thread(target=self.download, args=(pdf_size,))
            th.start()
            all_th.append(th)

        for th in all_th:
            th.join()
        print('\n' + '下载完成!')

    def download(self, pdf_size):
        while True:
            try:
                # 获取PDF信息
                each_pdf = self.q.get(timeout=2)  # 设置超时时间2s。这里如果不设置超时,当队列为空时,q.get()会进入阻塞状态
            except:
                # 这里队列已经为空,取不到数据就跳出循环
                break
                # 计数
            global g_num
            lock.acquire()  # 上锁
            g_num += 1
            # 打印下载进度
            sys.stdout.write('\r' + '正在下载:%s / %s' % (g_num, pdf_size))
            path = ''
            pdf_url = ''
            md5_later = ''
            pdf_name = ''  # 如果没有PDF名,置为空
            if is_txt:
                # 选择了txt文件方式,只有一个链接
                pdf_url = each_pdf.strip()
                href_name = pdf_url.split('/')[-1]  # 取链接 / 后的值作为PDF名称
                if '.pdf' in href_name:
                    href_name = href_name.split('.')[0]
                md5_later = self.pdf_name_md5(href_name)  # pdf转码之后的名称
                path = 'PDF/' + md5_later + '.pdf'

            elif not is_txt:
                # 从数据库中读取PDF信息,数据库中包含两个字段(pdf_name, pdf_url)
                pdf_name = each_pdf['pdf_name']
                pdf_url = each_pdf['pdf_url']
                md5_later = self.pdf_name_md5(pdf_name)
                path = 'PDF/' + md5_later + '.pdf'
            # 下载之前先去重
            try:
                # 尝试获取该路径下pdf的size,如果还未下载则置为0
                size = os.path.getsize(path)
            except:
                size = 0
            # 判断已经下载并且size > 0则为成功下载
            if os.path.exists(path) and size != 0:
                # 路径存在说明下载过
                lock.release()  # 释放锁
                continue
            # 下载PDF
            self.download_pdf(pdf_name, each_pdf, path, pdf_url, md5_later)
            lock.release()  # 释放锁

    @staticmethod
    def download_pdf(pdf_name, each_pdf, path, pdf_url, md5_later):
        """
        下载PDF的主要实现程序
        :param pdf_name:
        :param each_pdf:
        :param path:
        :param pdf_url:
        :param md5_later:
        :return:
        """
        try:
            # 开始下载
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)'
                              ' Chrome/76.0.3809.100 Safari/537.36'}
            r = requests.get(url=pdf_url, headers=headers, timeout=(120, 600))  # 设置connect超时2分钟,read超时10分钟
            f = open(path, 'wb')
            f.write(r.content)
            f.close()
            r.close()
            s = requests.session()
            s.keep_alive = False
            if is_insert == 'y':
                # 下载成功,插入数据库
                if not pdf_name:
                    msg = {'url': pdf_url, 'md5_name': md5_later, 'relative_path': path}
                else:
                    msg = {'url': pdf_url, 'origin_name': pdf_name, 'md5_name': md5_later, 'relative_path': path}
                new_collection.insert(msg)
        except:
            # 下载失败
            print('\r' + '下载失败:%s' % str(each_pdf))
            with open('error.txt', 'r') as fr:
                content = fr.readlines()
            if str(each_pdf) not in content:
                with open('error.txt', 'a') as fe:
                    fe.writelines(str(each_pdf) + '\n')

    @staticmethod
    def mkdir():
        """
        创建下载失败文件、去重文件、PDF文件夹
        :return:
        """
        if not os.path.exists('error.txt'):
            with open('error.txt', 'w') as fr:
                fr.write('')
        if not os.path.exists('PDF'):
            os.mkdir('PDF')

    def client_dbs(self):
        """
        连接数据库,获取所有pdf信息
        :return:
        """
        ip = self.choose_ip()

        client = pymongo.MongoClient('mongodb://%s:27017' % ip)
        # 这里因为该数据库开了验证,所以这里进行判断验证(ip我乱写的)
        if ip == '192.168.0.123':
            client['admin'].authenticate('用户名', '密码')  # 数据库验证(用户名密码根据自己数据库修改)
        all_db_names = client.list_database_names()
        # 规范打印数据库名
        self.pr_datas(all_db_names)

        db_name_num = input('\r' + '请选择数据库对应序号:')
        db_name = self.choose_db_or_col(all_db_names, db_name_num, '数据库')

        while True:
            db = client[db_name]
            all_col = db.collection_names()
            if not all_col:
                db_name = input('没有这个数据库,请重新输入:')
            else:
                break
        # 规范打印集合名
        self.pr_datas(all_col)
        col_num = input('\r' + '请选择集合对应的序号:')
        col_name = self.choose_db_or_col(all_col, col_num, '集合')

        collection = db[col_name]
        return collection, db

    @staticmethod
    def choose_ip():
        """
        选择ip
        :return:
        """
        ip = input('请选择数据库:' + '\n' + '1. 本地数据库   2. 123数据库   3. 200数据库' + '\n')
        while True:
            if ip == '1':
                ip = '127.0.0.1'
                break
            elif ip == '2':
                ip = '192.168.0.123'
                break
            elif ip == '3':
                ip = '192.168.0.200'
                break
            else:
                ip = input('请正确输入1 或者 2 或者 3:')
        return ip

    @staticmethod
    def choose_db_or_col(name, name_num, d_or_c):
        """
        选择数据库或者集合
        :return:
        """
        while True:
            try:
                int(name_num)
            except ValueError:
                name_num = input('请输入正确的%s序号:' % d_or_c)
            if 0 <= int(name_num) <= len(name):
                break
            else:
                name_num = input('请输入正确的%s序号:' % d_or_c)
        for j, e_name in enumerate(name, 1):
            if name_num == str(j):
                choose_name = e_name
                return choose_name

    @staticmethod
    def pr_datas(names):
        """
        规范打印数据库/集合名称
        :param names:
        :return:
        """
        for i, each_name in enumerate(names, 1):
            print(str(i) + '. ' + each_name + ' ' * (40 - len(each_name) - len(str(i))), end='')
            if i % 3 == 0:
                print('\n')
        if len(names) < 3:
            print('\n')

    def get_pdf_msg(self):
        """
        将pdf信息存入消息队列
        :return:
        """
        all_urls = self.choose_way()
        for i in all_urls:
            self.q.put(i)

    def choose_way(self):
        """
        选择PDF的url来源,是在txt文件中还是从数据库中读取
        :return:
        """
        print('*' * 100)
        print(
            '注意事项:' + '\n' + '1. 如果选择了txt文件方式下载,txt内容必须为一行一个url格式' + '\n' +
            '2. 如果选择了从数据库中读取PDF信息下载,则数据库中包含两个字段(a. 存放url的字段,b. 该PDF的名字字段(可以没有)),其中存放url的字段不能嵌套,只能有一个url' + '\n' +
            '3. txt文件只有链接,默认取链接最后 "/" 后的值作为PDF名' + '\n' +
            '4. 数据库方式如果没有指定PDF名字,则默认以链接最后一个 / 后的内容作为PDF名')
        print('*' * 100)
        global is_txt, is_insert, new_collection
        the_way = input('请选择PDF的来源(输入1 / 2):' + '\n' + '1. txt文件       2. 从数据库中读取' + '\n')
        while True:
            if the_way == '1' or the_way == '2':
                break
            else:
                the_way = input('请正确输入1 或者 2:')
        urls = []
        if the_way == '1':
            # txt文件
            is_txt = True
            print('说明:txt文件中必须为一行一个url!')
            txt_name = input('请输入与该程序同级目录下的txt文件名称(如:123.txt):')
            while True:
                try:
                    with open(txt_name, 'r') as f:
                        urls = f.readlines()
                    break
                except FileNotFoundError:
                    txt_name = input('没有这个txt文件,请重新输入:')
        elif the_way == '2':
            # 数据库
            collection, db = self.client_dbs()  # 连接数据库
            url_field = input('请输入PDF的url字段名:')
            name_field = input('请输入PDF的name字段名(如果没有则不输入):')
            if not name_field:
                is_txt = True
                for d in collection.find():
                    if d[url_field]:
                        urls.append(d[url_field])
            else:
                is_txt = False
                for d in collection.find():
                    if d[url_field]:
                        msg = {'pdf_url': d[url_field], 'pdf_name': d[name_field]}
                        urls.append(msg)

        # 选择是否要插入数据的数据库
        is_insert = input('是否要将下载的PDF信息写入到新的数据库(输入y / n,不输入或输入其他默认不写入):')
        if is_insert == 'y':
            print('选择插入数据库,字段包含(pdf_url, md5_name, relative_path)')
            ip = self.choose_ip()

            client = pymongo.MongoClient('mongodb://%s:27017' % ip)
            db_name = input('请输入要存入的数据库名(可新建):')
            db = client[db_name]
            if ip == '192.168.0.123':
                db.authenticate(name='用户名', password='密码', source='admin')
            new_col = input('请输入要插入的集合名称:')
            new_collection = db[new_col]
        return urls

    @staticmethod
    def pdf_name_md5(pdf):
        """
        将pdf名字转为md5
        :param pdf:
        :return:
        """
        md = hashlib.md5()
        md.update(pdf.encode('utf-8'))
        pdf_name = md.hexdigest()
        return pdf_name


if __name__ == '__main__':
    download = DownloadPDF()
    download.run()

相关文章

网友评论

      本文标题:多线程PDF下载器

      本文链接:https://www.haomeiwen.com/subject/netipctx.html