高并发爬取淘宝主图【协程,aiohttp】 - Go语言中文社区

高并发爬取淘宝主图【协程,aiohttp】


现在 python 写爬虫基本都是用scrapy,但是有时候一些小任务没必要用这么大的框架。而且希望代码自由度高一些,可以随心所欲写。这时候就需要用到 aiohttp 这个库,它基于 asyncio 编写,主要解决这类问题。

什么?你说 requests?这确实是一个优秀的http库,可它是阻塞式IO,太慢了。但是,其API设计优秀,文档健全,非常适合新手入门使用。

这是群里一位朋友需要的抓取淘宝主图的爬虫。随手写了一下,就全写在单文件内,没有以项目形式解耦分成很多文件。希望能给正在学习协程的朋友一点参考。

import os

import logging
import aiohttp
import asyncio
import aiofiles
import pandas as pd
from pyquery import PyQuery
from multiprocessing import cpu_count

# 【使用方法】:将要抓取主图的宝贝 ID 填入程序生成的 items.csv 内,一行一个。

# 日志等级及格式
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s 【%(levelname)s】=> %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)


class AsyncSpider(object):
    def __init__(self):
        self.BASE_DIR = os.path.dirname(os.path.abspath(__file__))
        self.BASE_URL = "https://detail.tmall.com/item.htm?id="
        self.checker()
        self.loader()
        self.total = 0
        self.failed = 0

    def checker(self):
        """
        检查必要文件及文件夹,如果不存在就自行创建。
        """
        folders = ["files", "images"]
        files = ["items.csv", "failed.csv"]

        for folder in folders:
            path = os.path.join(self.BASE_DIR, folder)
            if not os.path.exists(path):
                os.makedirs(path)

        for file in files:
            path = os.path.join(self.BASE_DIR, "files", file)
            if not os.path.exists(path):
                f = open(path, "w")
                exit("将要下载的宝贝 id 填写至 items.csv 后再运行程序。")
            f = open(os.path.join(self.BASE_DIR, "files", "failed.csv"), "w")
            f.close()

    def loader(self):
        try:
            path = os.path.join(self.BASE_DIR, "files", "items.csv")
            df = pd.read_csv(path, header=None)
            items = df[0].to_list()
            return items
        except Exception as e:
            if str(e) == "No columns to parse from file":
                logging.error("items.csv 数据为空,无可执行任务,程序退出。")
            else:
                logging.error(e)
            exit()

    async def error_handler(self, url):
        self.failed += 1
        item = url.split("id=")[-1] + "n"
        path = os.path.join(self.BASE_DIR, "files", "failed.csv")
        async with aiofiles.open(path, "ab") as f:
            await f.write(item.encode())
            await f.close()

    @staticmethod
    async def fetch(session, url):
        logging.info(f"Start: {url}")
        async with session.get(url) as response:
            if response.status in [200, 201]:
                # 如果 url 以『.jpg』结尾,则认定为图片地址,返回 byte。否则,认定为网址,返回 str 便于解析。
                if url.endswith(".jpg"):
                    return await response.read()
                else:
                    return await response.text()

    # 解析 html 获取主图 url
    @staticmethod
    async def parser(html):
        pq = PyQuery(html)
        # 获取所有图片的url,如果 url 中存在『_430x430q90』则认定为主图,将主图 url 返回
        for link in pq.items("img"):
            src = link.attr("src")
            if src:
                if "_430x430q90" in src or "400x400.jpg" in src:
                    return "https:" + src
        return

    # 图片存储
    async def save(self, session, url, path):
        # 获取图片内容
        img = await self.fetch(session, url)
        # 将内容异步存储
        async with aiofiles.open(path, "wb") as f:
            await f.write(img)
            await f.close()
            logging.info(f"Saved image {path}")

    async def main(self, url, semaphore):
        self.total += 1
        async with semaphore:
            async with aiohttp.ClientSession() as session:
                html = await self.fetch(session, url)
                image_src = await self.parser(html)
                if image_src:
                    image_name = url.split("id=")[-1] + ".jpg"
                    image_path = os.path.join(self.BASE_DIR, "images", image_name)
                    await self.save(session, image_src, image_path)
                else:
                    await self.error_handler(url)

    def run(self):
        items = self.loader()
        urls = map(lambda x: self.BASE_URL + str(x), items)

        semaphore = asyncio.Semaphore(cpu_count() * 2)
        loop = asyncio.get_event_loop()
        futures = [asyncio.ensure_future(self.main(url, semaphore)) for url in urls]
        tasks = asyncio.gather(*futures)
        loop.run_until_complete(tasks)
        loop.close()
        logging.info(f"运行完毕。任务数共 {self.total} 。失败 {self.failed}")


if __name__ == "__main__":
    async_spider = AsyncSpider()
    async_spider.run()

版权声明:本文来源简书,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://www.jianshu.com/p/29daec3662f7
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-01-12 13:08:18
  • 阅读 ( 1232 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢