美文网首页
scrapy异步使用Django 模型存储

scrapy异步使用Django 模型存储

作者: mutang | 来源:发表于2021-11-22 10:35 被阅读0次
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html


# useful for handling different item types with a single interface
import asyncio

from product.items import CategoryItem, SubCategoryItem, SKUItem
from control.models import Category, SubCategory, SKU
from concurrent.futures import ThreadPoolExecutor


class ProductPipeline:
    # 使用异步存储的时候,使用sqlite3会报错,因为sqlite3是单线程的,我们是一个线程池对象,并发存储会被sqlite3拒绝(database was locked)
    # 创建事件循环对象
    loop = asyncio.get_event_loop()
    # 创建线程池
    executor = ThreadPoolExecutor()
    # 任务队列
    tasks = []
    counter = {'cate': 0, 'sub_cate': 0, 'sku': 0}

    async def process_item(self, item, spider):
        # 存在则更新 get_update
        print(item)
        if isinstance(item, CategoryItem):
            return self.process_category_item(item, spider)
        elif isinstance(item, SubCategoryItem):
            return self.process_sub_category_item(item, spider)
        else:
            return self.process_sku_item(item, spider)

    def process_category_item(self, item, spider):
        '''将保存数据的处理方法加入到任务队列'''
        self.counter['cate'] += 1
        task = self.loop.run_in_executor(self.executor, self.executor_func(Category, item), )
        self.tasks.append(task)
        return item

    def process_sub_category_item(self, item, spider):
        '''将保存数据的处理方法加入到任务队列'''
        self.counter['sub_cate'] += 1
        task = self.loop.run_in_executor(self.executor, self.executor_func(SubCategory, item), )
        self.tasks.append(task)
        return item

    def process_sku_item(self, item, spider):
        '''将保存数据的处理方法加入到任务队列'''
        self.counter['sku'] += 1
        task = self.loop.run_in_executor(self.executor, self.executor_func(SKU, item), )
        self.tasks.append(task)
        return item

    @staticmethod
    def executor_func(model, item):
        '''主要作用是将有参数的函数转换为无参数的函数返回,方便run_in_executor方法调用,这个方法它只接受位置传参,不接受关键字传参'''

        def func():
            if isinstance(item, CategoryItem):
                return model.objects.get_or_create(defaults=item['familyID'], **item)  # 一般是create
            elif isinstance(item, SubCategoryItem):
                d = dict(item)
                try:
                    category = Category.objects.get(familyID=d.pop('category'))
                except Category.DoesNotExist:
                    print('未添加进去 -----', d)
                else:
                    return model.objects.get_or_create(defaults=d['sub_cate_id'], category=category, **d)
            else:
                d = dict(item)
                try:
                    sub_cate = SubCategory.objects.get(sub_cate=d.pop('sub_cate'))
                except SubCategory.DoesNotExist:
                    print('未添加进去 -----', d)
                else:
                    return model.objects.get_or_create(defaults=d['sku'], sub_cate=sub_cate, **d)

        return func

    def close_spider(self, spider):
        '''当爬虫关闭的时候调用这个方法保存数据'''
        print(self.counter)
        self.loop.run_until_complete(asyncio.wait(self.tasks))

以上代码有问题,下级模型往往找不到上级,暂时贴出来

正确的,虽然不够优雅,但能实现,能保存。不需要scrapy中item,在spider.py中yield 字典

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html


# useful for handling different item types with a single interface
from asgiref.sync import sync_to_async #同步变异步

from control.models import Category, SubCategory, SKU


class ProductPipeline:
    counter = {
        'cate': 0,
        'sub_cate': 0,
        'sku': 0
    }
    @sync_to_async
    def process_item(self, item, spider):
        print(item)
        try:
            cate = Category.objects.get(familyID=item['familyID'])
        except Category.DoesNotExist:
            cate = Category()
            cate.familyID = item['familyID']
            cate.familyName = item['familyName']
            cate.metricsFamilyName = item['metricsFamilyName']
            cate.url = item['url']
            cate.level = item['level']
            cate.isLeaf = item['isLeaf']
            cate.count = item['count']
            cate.save()
            self.counter['cate'] += 1
        try:
            sub_category = SubCategory.objects.get(sub_cate_id=item['sub_cate_id'])
        except SubCategory.DoesNotExist:
            sub_category = SubCategory()
            sub_category.category = cate
            sub_category.sub_cate_id = item['sub_cate_id']
            sub_category.sub_cate_name = item['sub_cate_name']
            sub_category.save()
            self.counter['sub_cate'] += 1
        # 保存
        sku = SKU(sub_cate=sub_category, sku=item['sku'])
        sku.save()
        self.counter['sku'] += 1
        print(self.counter)
        return item

相关文章

  • scrapy异步使用Django 模型存储

    以上代码有问题,下级模型往往找不到上级,暂时贴出来 正确的,虽然不够优雅,但能实现,能保存。不需要scrapy中i...

  • 使用python3.7+Vue.js2.0+Django2.0.

    使用python3.7+Vue.js2.0+Django2.0.4异步前端通过api上传文件到七牛云云端存储原文转...

  • Celery异步任务与定时任务

    Django使用异步任务与定时任务 Django配置 创建项目 异步耗时任务 配置文件worker/config....

  • 人工智能以及爬虫

    Scrapy安装及使用 - CSDN博客 Django安装指南 - CSDN博客 TensorFlow中文社区-首...

  • Django 常见问题回答

    Django 密码存储Django 提供灵活的密码存储系统,默认使用 PBKDF2。对象的 password 属性...

  • Web开发(九)Django模型-ORM

    一、Django模型 Django模型是一个定义数据源的数据,它包含要存储数据的一些属性和行为。通常,每一个模型对...

  • Python Web应用框架:Django 1.11模型

    Django模型 模型在Django中,是一个被存储在资料库中的物件,可以想像是一种表格(table)的感觉。以下...

  • scrapy--异步存mysql

    scrapy是一个异步的爬虫框架、异步解决的很大的问题就是io方面的操作,当我们爬虫请求到数据后,进行存储(io)...

  • 安装scrapy报错,提示error:unable to fin

    win10 64安装scrapy时提示错误信息: Scrapy使用了Twisted作为框架实现异步I/O,先安...

  • 异步网络模型

    转发:异步网络模型 异步网络模型 异步网络模型在服务开发中应用非常广泛,相关资料和开源库也非常多。项目中,使用现成...

网友评论

      本文标题:scrapy异步使用Django 模型存储

      本文链接:https://www.haomeiwen.com/subject/baavtrtx.html