celery是一个基于分布式消息传输的异步任务队列,它专注于实时处理,同时也支持任务调度。关于celery的更多介绍及例子,笔者可以参考文章。
本文将介绍如何使用celery来加速爬虫。 本文爬虫的例子来自文章:。这里不再过多介绍,我们的项目结构如下:
其中,app_test.py为主程序,其代码如下:
from celery import Celeryapp = Celery('proj', include=['proj.tasks'])app.config_from_object('proj.celeryconfig')if __name__ == '__main__': app.start()
tasks.py为任务函数,代码如下:
import reimport requestsfrom celery import groupfrom proj.app_test import app@app.task(trail=True)# 并行调用任务def get_content(urls): return group(C.s(url) for url in urls)()@app.task(trail=True)def C(url): return parser.delay(url)@app.task(trail=True)# 获取每个网页的name和descriptiondef parser(url): req = requests.get(url) html = req.text try: name = re.findall(r'(.+?)', html)[0] desc = re.findall(r'(.+?)', html)[0] if name is not None and desc is not None: return name, desc except Exception as err: return '', ''
celeryconfig.py为celery的配置文件,代码如下:
BROKER_URL = 'redis://localhost' # 使用Redis作为消息代理CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 把任务结果存在了RedisCELERY_TASK_SERIALIZER = 'msgpack' # 任务序列化和反序列化使用msgpack方案CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSONCELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接受的内容类型
最后是我们的爬虫文件,scrapy.py,代码如下:
import timeimport requestsfrom bs4 import BeautifulSoupfrom proj.tasks import get_contentt1 = time.time()url = "http://www.wikidata.org/w/index.php?title=Special:WhatLinksHere/Q5&limit=500&from=0"# 请求头部headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, \ like Gecko) Chrome/67.0.3396.87 Safari/537.36'}# 发送HTTP请求req = requests.get(url, headers=headers)# 解析网页soup = BeautifulSoup(req.text, "lxml")# 找到name和Description所在的记录human_list = soup.find(id='mw-whatlinkshere-list')('li')urls = []# 获取网址for human in human_list: url = human.find('a')['href'] urls.append('https://www.wikidata.org'+url)#print(urls)# 调用get_content函数,并获取爬虫结果result = get_content.delay(urls)res = [v for v in result.collect()]for r in res: if isinstance(r[1], list) and isinstance(r[1][0], str): print(r[1])t2 = time.time() # 结束时间print('耗时:%s' % (t2 - t1))
在后台启动redis,并切换至proj项目所在目录,运行命令:
celery -A proj.app_test worker -l info
输出结果如下(只显示最后几行的输出):
......['Antoine de Saint-Exupery', 'French writer and aviator']['', '']['Sir John Barrow, 1st Baronet', 'English statesman']['Amy Johnson', 'pioneering English aviator']['Mike Oldfield', 'English musician, multi-instrumentalist']['Willoughby Newton', 'politician from Virginia, USA']['Mack Wilberg', 'American conductor']耗时:80.05160284042358
在rdm中查看数据,如下:
在文章中,我们已经知道,如果用一般的方法来实现这个爬虫,耗时大约为725秒,而我们使用celery,一共耗时约80秒,大概相当于一般方法的九分之一。虽然没有scrapy这个爬虫框架和异步框架aiohttp, asyncio来的快,但这也可以作为一种爬虫的思路。
本次分享到此结束,感谢阅读~注意:本人现已开通微信公众号: Python爬虫与算法(微信号为:easy_web_scrape), 欢迎大家关注哦~~