美文网首页
python asyncio并发编程(7)

python asyncio并发编程(7)

作者: TheRightPath1 | 来源:发表于2020-01-22 16:09 被阅读0次

1.使用aiohttp客户端实现爬虫代码示例

import asyncio
import re
from scrapy import Selector
import aiohttp
import aiomysql


# 最开始的url
start_url = 'https://news.sina.com.cn/'
# 等待爬取的url
waitting_urls = set()
# 已经爬取过的url
already_urls = set()
# 定义并发的数量为3
sem = asyncio.Semaphore(3)


# 请求url获取源码
async def fetch(url, session):
    async with sem:
        try:
            # get(url)是耗费网咯IO请求的过程,因此需要使用async with
            async with session.get(url) as response:
                if response.status == 200:
                    return await response.text()
                # 必须使用await获取源码
        except Exception as e:
            print(e)


async def analysis_title(url, session, pool):
    try:
        html = await fetch(url, session)
        selector = Selector(text=html)
    except Exception as e:
        print(e)
        print(url)
        return ''
    title = selector.xpath('//h1[@class="main-title"]/text()').get()
    if title:
        # 从mysql的连接池中获取一个mysql的连接对象
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                try:
                    insert_sql = 'INSERT INTO `title` VALUES ("{}")'.format(title)
                    await cur.execute(insert_sql)
                    already_urls.add(url)
                except Exception as e:
                    print(e)
                    print(url)
                    return ''


# 从非新闻页的提取所有url并放入waitting_urls
async def extract_url(url, session):
    html = await fetch(url, session)
    try:
        selector = Selector(text=html)
    except Exception as e:
        print(e)
        print(url)
        print(html)
        return ''
    urls = selector.xpath('//@href').extract()
    for url in urls:
        waitting_urls.add(url)
    already_urls.add(url)


# 从waitting_urls中获取url并找出符合要求的url发送给解析方法
async def consumer(pool):
    # 由于session是可以close的,因此需要使用async with来声明session
    # 为了防止每次请求都建立session而耗费并发, 因此只声明一次session并将其传入其他协程
    async with aiohttp.ClientSession() as session:
        while True:
            if len(waitting_urls) == 0:
                await asyncio.sleep(2)
                continue
            url = waitting_urls.pop()
            if url and url not in already_urls and re.findall('/\d{4}-\d{2}-\d{2}/doc.*?\d+\.shtml', url):
                asyncio.ensure_future(analysis_title(url, session, pool))
            else:
                asyncio.ensure_future(extract_url(url, session))


async def main(loop):
    # 等待mysql连接成功, loop参数是协程中的loop,autocommit表示自动提交
    pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
                                      user='root', password='zkr@123...',
                                      db='test', loop=loop, charset='utf8',
                                      autocommit=True)
    async with aiohttp.ClientSession() as session:
        await extract_url(start_url, session)
        # 为了防止每次入库都生成mysql连接池耗费资源,因此将pool连接池传入函数中
    asyncio.ensure_future(consumer(pool))


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(main(loop))
    loop.run_forever()

相关文章

网友评论

      本文标题:python asyncio并发编程(7)

      本文链接:https://www.haomeiwen.com/subject/vgpuzctx.html