Python 的协程
爬虫是 IO 密集型任务,当我们使用 requests 库来爬取某个站点时,从发出到返回响应这个过程中,爬虫一直在等待而没有做任何事情。对于这种情况,我们有没有优化方案呢?这就是异步爬虫。要实现异步机制的爬虫,那自然和协程脱不了关系。
# 1. 案例引入
如果我们访问 https://httpbin.org/delay/5 这个链接,需要等待五秒之后才能得到结果,这是因为服务器强制等待了 5 秒的时间才返回响应。如果我们用 requests 写爬虫来爬取的话,那每次 requests 都要等待 5 秒才能拿到结果了。
我们来测试下,下面我们来用 requests 写一个遍历程序,直接遍历 100 次试试看,实现代码如下:
import requests
import logging
import time
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s: %(message)s')
TOTAL_NUMBER = 100
URL = 'https://httpbin.org/delay/5'
start_time = time.time()
for _ in range(1, TOTAL_NUMBER + 1):
logging.info('scraping %s', URL)
response = requests.get(URL)
end_time = time.time()
logging.info('total time %s seconds', end_time - start_time)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
总的爬取时间最终为 663 秒,大约 11 分钟。所以如果我们用 requests 单线程这么爬取的话,总的耗时是非常多的。此时如果我们开了多线程或多进程来爬取的话,其爬取速度确实会成倍提升,那是否有更好的解决方案呢?
本节就来了解一下使用协程来加速的方法,此种方法对于 IO 密集型任务非常有效。如将其应用到网络爬虫中,爬取效率甚至可以成百倍地提升。
# 2. 基础知识
在了解协程之前,我们首先了解一些基础概念,如阻塞和非阻塞、同步和异步、多进程和协程。
# 2.1 阻塞与非阻塞
阻塞状态指程序未得到所需计算资源时被挂起的状态。程序在等待某个操作完成期间,自身无法继续干别的事情,则称该程序在该操作上是阻塞的。
常见的阻塞形式有:网络 I/O 阻塞、磁盘 I/O 阻塞、用户输入阻塞等。阻塞是无处不在的,包括 CPU 切换上下文时,所有的进程都无法真正干事情,它们也会被阻塞。如果是多核 CPU,则正在执行上下文切换操作的核不可被利用。
程序在等待某操作的过程中,自身不被阻塞,可以继续运行干别的事情,则称该程序在该操作上是非阻塞的。
非阻塞并不是在任何程序级别、任何情况下都存在的。仅当程序封装的级别可以囊括独立的子程序单元时,它才可能存在非阻塞状态。非阻塞的存在是因为阻塞存在,正因为某个操作阻塞导致的耗时与效率低下,我们才要把它变成非阻塞的。
# 2.2 同步与异步
不同程序单元为了完成某个任务,在执行过程中需靠某种通信方式以协调一致,此时这些程序单元是同步执行的。
例如在购物系统中更新商品库存时,需要用“行锁”作为通信信号,让不同的更新请求强制排队顺序执行,那更新库存的操作是同步的。
简言之,同步意味着有序。
为了完成某个任务,有时不同程序单元之间无须通信协调也能完成任务,此时不相关的程序单元之间可以是异步的。
例如,爬取下载网页。调度程序调用下载程序后,即可调度其他任务,而无须与该下载任务保持通信以协调行为。不同网页的下载、保存等操作都是无关的,也无须相互通知协调。这些异步操作的完成时刻并不确定。
简言之,异步意味着无序。
# 2.3 多进程
多进程就是利用 CPU 的多核优势,在同一时间并行执行多个任务,可以大大提高执行效率。
# 2.4 协程
协程,英文叫作 coroutine,又称微线程、纤程,它是一种用户态的轻量级线程。
协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此,协程能保留上一次调用时的状态,即所有局部状态的一个特定组合,每次过程重入时,就相当于进入上一次调用的状态。
协程本质上是个单进程,它相对于多进程来说,无须线程上下文切换的开销,无须原子操作锁定及同步的开销,编程模型也非常简单。
我们可以使用协程来实现异步操作,比如在网络爬虫场景下,我们发出一个请求之后,需要等待一定时间才能得到响应,但其实在这个等待过程中,程序可以干许多其他事情,等到响应得到之后才切换回来继续处理,这样可以充分利用 CPU 和其他资源,这就是协程的优势。
# 3. 协程的用法
History: 从 Python 3.4 开始,Python 中加入了协程的概念,但这个版本的协程还是以生成器对象为基础,Python 3.5 则增加了
async
/await
,使得协程的实现更加方便。Python 中使用协程最常用的库莫过于 asyncio,所以本节会以 asyncio 为基础来介绍协程的用法。
首先,我们需要了解下面几个概念:
- event_loop:事件循环,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足条件发生的时候,就会调用对应的处理方法。
- coroutine:中文翻译叫协程,在 Python 中常指代协程对象类型,我们可以将协程对象注册到时间循环中,它会被事件循环调用。我们可以使用
async
关键字来定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象。 - task:任务,它是对协程对象的进一步封装,包含了任务的各个状态。
- future:代表将来执行或没有执行的任务的结果,实际上和
task
没有本质区别。
另外有两个关键字专门用于定义协程:async 定义一个协程,await 用来挂起阻塞方法的执行。
# 4. 定义协程
首先,我们来定义一个协程,体验一下它和普通进程在实现上的不同之处,代码如下:
import asyncio
async def execute(x):
print('Number:', x)
coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print('After calling loop')
2
3
4
5
6
7
8
9
10
11
12
运行结果如下:
Coroutine: <coroutine object execute at 0x1034cf830>
After calling execute
Number: 1
After calling loop
2
3
4
首先,我们引入了 asyncio 这个包,这样我们才可以使用 async
和 await
,然后使用 async
定义了一个 execute
方法,该方法接收一个数字参数,执行之后会打印这个数字。
随后我们直接调用了这个方法,然而这个方法并没有执行,而是返回了一个 coroutine
协程对象。随后我们使用 get_event_loop
方法创建了一个事件循环 loop
,并调用了 loop
对象的 run_until_complete
方法将协程注册到事件循环 loop
中,然后启动。最后,我们才看到 execute
方法打印了输出结果。
可见,async
定义的方法就会变成一个无法直接执行的 coroutine
对象,必须将其注册到事件循环中才可以执行。
前面我们还提到了 task
,它是对 coroutine 对象的进一步封装,比 coroutine 对象多了运行状态,比如 running
、finished
等,我们可以用这些状态来获取协程对象的执行情况。
在上面的例子中,当我们将 coroutine
对象传递给 run_until_complete
方法的时候,实际上它进行了一个操作,就是将 coroutine
封装成了 task
对象。我们也可以显式地进行声明,如下所示:
import asyncio
async def execute(x):
print('Number:', x)
return x
coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print('Task:', task)
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
运行结果如下:
Coroutine: <coroutine object execute at 0x10e0f7830>
After calling execute
Task: <Task pending coro=<execute() running at demo.py:4>>
Number: 1
Task: <Task finished coro=<execute() done, defined at demo.py:4> result=1>
After calling loop
2
3
4
5
6
这里我们定义了 loop
对象之后,接着调用了它的 create_task
方法将 coroutine
对象转化为 task
对象,随后我们打印输出一下,发现它是 pending
状态。接着,我们将 task
对象添加到事件循环中执行,随后打印输出 task
对象,发现它的状态变成了 finished
,同时还可以看到其 result
变成了 1,也就是我们定义的 execute
方法的返回结果。
另外,定义 task
对象还有一种方式,就是直接通过 asyncio 的 ensure_future
方法,返回结果也是 task
对象,这样的话我们就可以不借助 loop
来定义。即使我们还没有声明 loop
,也可以提前定义好 task
对象,写法如下:
import asyncio
async def execute(x):
print('Number:', x)
return x
coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')
task = asyncio.ensure_future(coroutine)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
运行结果如下:
Coroutine: <coroutine object execute at 0x10aa33830>
After calling execute
Task: <Task pending coro=<execute() running at demo.py:4>>
Number: 1
Task: <Task finished coro=<execute() done, defined at demo.py:4> result=1>
After calling loop
2
3
4
5
6
可以发现,其运行效果都是一样的。
# 5. 绑定回调
另外,我们也可以为某个 task
绑定一个回调方法。比如,我们来看下面的例子:
import asyncio
import requests
async def request():
url = 'https://www.baidu.com'
status = requests.get(url)
return status
def callback(task):
print('Status:', task.result())
coroutine = request()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
这里我们定义了一个 request
方法,请求了百度,获取其状态码,但是这个方法里面我们没有任何 print
语句。随后我们定义了一个 callback
方法,这个方法接收一个参数,是 task
对象,然后调用 print
方法打印了 task
对象的结果。这样我们就定义好了一个 coroutine
对象和一个回调方法。我们现在希望的效果是,当 coroutine
对象执行完毕之后,就去执行声明的 callback
方法。
那么它们两者怎样关联起来呢?很简单,只需要调用 add_done_callback
方法即可。我们将 callback
方法传递给封装好的 task
对象,这样当 task
执行完毕之后,就可以调用 callback
方法了。同时 task
对象还会作为参数传递给 callback
方法,调用 task
对象的 result
方法就可以获取返回结果了。
运行结果如下:
Task: <Task pending coro=<request() running at demo.py:5> cb=[callback() at demo.py:11]>
Status: <Response [200]>
Task: <Task finished coro=<request() done, defined at demo.py:5> result=<Response [200]>>
2
3
实际上不用回调方法,直接在 task
运行完毕之后,也可以直接调用 result
方法获取结果,如下所示:
import asyncio
import requests
async def request():
url = 'https://www.baidu.com'
status = requests.get(url)
return status
coroutine = request()
task = asyncio.ensure_future(coroutine)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('Task Result:', task.result())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
运行结果是一样的:
Task: <Task pending coro=<request() running at demo.py:4>>
Task: <Task finished coro=<request() done, defined at demo.py:4> result=<Response [200]>>
Task Result: <Response [200]>
2
3
# 6. 多任务协程
上面的例子我们只执行了一次请求,如果想执行多次请求,应该怎么办呢?我们可以定义一个 task
列表,然后使用 asyncio 的 wait
方法即可执行。看下面的例子:
import asyncio
import requests
async def request():
url = 'https://www.baidu.com'
status = requests.get(url)
return status
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
print('Tasks:', tasks)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
print('Task Result:', task.result())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
这里我们使用一个 for
循环创建了 5 个 task
,组成了一个列表,然后把这个列表首先传递给了 asyncio 的 wait
方法,再将其注册到时间循环中,就可以发起 5 个任务了。最后,我们再将任务的运行结果输出出来,具体如下:
Tasks: [<Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>]
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
2
3
4
5
6
可以看到,5 个任务被顺次执行了,并得到了运行结果。
# 7. 协程实现
前面说了这么一通,又是 async
,又是 coroutine
,又是 task
,又是 callback
,但似乎并没有看出协程的优势?反而写法上更加奇怪和麻烦了。别急,上面的案例只是为后面的使用作铺垫。接下来,我们正式来看下协程在解决 IO 密集型任务上有怎样的优势。
在上面的代码中,我们用一个网络请求作为示例,这就是一个耗时等待操作,因为我们请求网页之后需要等待页面响应并返回结果。耗时等待操作一般都是 IO 操作,比如文件读取、网络请求等。协程对于处理这种操作是有很大优势的,当遇到需要等待的情况时,程序可以暂时挂起,转而去执行其他操作,从而避免一直等待一个程序而耗费过多的时间,充分利用资源。
为了表现出协程的优势,我们还是以本节开头介绍的网站 https://httpbin.org/delay/5 (opens new window) 为例,因为该网站响应比较慢,所以我们可以通过爬取时间来直观感受到爬取速度的提升。
为了让大家更好地理解协程的正确使用方法,这里我们先来看看大家使用协程时常犯的错误,后面再给出正确的例子来对比一下。
首先,我们还是拿之前的 requests 库来进行网页请求,接下来再重新使用上面的方法请求一遍:
import asyncio
import requests
import time
start = time.time()
async def request():
url = 'https://httpbin.org/delay/5'
print('Waiting for', url)
response = requests.get(url)
print('Get response from', url, 'response', response)
tasks = [asyncio.ensure_future(request()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time:', end - start)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
这里我们还是创建了 10 个 task
,然后将 task
列表传给 wait
方法并注册到时间循环中执行。
运行结果如下:
Waiting for https://httpbin.org/delay/5
Get response from https://httpbin.org/delay/5 response <Response [200]>
Waiting for https://httpbin.org/delay/5
...
Get response from https://httpbin.org/delay/5 response <Response [200]>
Waiting for https://httpbin.org/delay/5
Get response from https://httpbin.org/delay/5 response <Response [200]>
Waiting for https://httpbin.org/delay/5
Get response from https://httpbin.org/delay/5 response <Response [200]>
Cost time: 66.64284420013428
2
3
4
5
6
7
8
9
10
可以发现,这和正常的请求并没有什么区别,依然还是顺次执行的,耗时 66 秒,平均一个请求耗时 6.6 秒,说好的异步处理呢?
其实,要实现异步处理,我们得先要有挂起的操作,当一个任务需要等待 IO 结果的时候,可以挂起当前任务,转而去执行其他任务,这样我们才能充分利用好资源。上面的方法都是一本正经地串行走下来,连个挂起都没有,怎么可能实现异步?想太多了。
要实现异步,接下来我们再了解一下 await
的用法,它可以将耗时等待的操作挂起,让出控制权。当协程执行的时候遇到 await
,时间循环就会将本协程挂起,转而去执行别的协程,直到其他协程挂起或执行完毕。
所以,我们可能会将代码中的 request
方法改成如下的样子:
async def request():
url = 'https://httpbin.org/delay/5'
print('Waiting for', url)
response = await requests.get(url)
print('Get response from', url, 'response', response)
2
3
4
5
仅仅是在 requests 前面加了一个关键字 await
,然而此时执行代码,会得到如下报错:
Traceback (most recent call last):
File "demo.py", line 11, in request
response = await requests.get(url)
TypeError: object Response can't be used in 'await' expression
2
3
4
这次它遇到 await
方法确实挂起了,也等待了,但是最后却报了这个错误。这个错误的意思是 requests 返回的 Response
对象不能和 await
一起使用,为什么呢?因为根据官方文档 (opens new window)说明,await
后面的对象必须是如下格式之一:
- 一个原生 coroutine 对象;
- 一个由
types.coroutine
修饰的生成器,这个生成器可以返回coroutine
对象; - 一个包含
__await__
方法的对象返回的一个迭代器。
reqeusts 返回的 Response
对象不符合上面任一条件,因此就会报上面的错误了。
我们必须要使用支持异步操作的请求方式才可以实现真正的异步,所以这里就需要 aiohttp 派上用场了。
# 8. 使用 aiohttp
aiohttp 是一个支持异步请求的库,配合使用它和 asyncio,我们可以非常方便地实现异步请求操作。我们使用 pip 安装即可:
pip install aiohttp
aiohttp 的官方文档链接为 https://aiohttp.readthedocs.io/,它分为两部分,一部分是 Client,一部分是 Server,详细的内容可以参考官方文档。
下面我们将 aiohttp 用上来,将代码改成如下样子:
import asyncio
import aiohttp
import time
start = time.time()
async def get(url):
session = aiohttp.ClientSession()
response = await session.get(url)
await response.text()
await session.close()
return response
async def request():
url = 'https://httpbin.org/delay/5'
print('Waiting for', url)
response = await get(url)
print('Get response from', url, 'response', response)
tasks = [asyncio.ensure_future(request()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time:', end - start)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
这里我们将请求库由 requests 改成了 aiohttp,通过 aiohttp 的 ClientSession
类的 get
方法进行请求,结果如下:
Waiting for https://httpbin.org/delay/5
Waiting for https://httpbin.org/delay/5
Waiting for https://httpbin.org/delay/5
Waiting for https://httpbin.org/delay/5
...
Get response from https://httpbin.org/delay/5 response <ClientResponse(https://httpbin.org/delay/5) [200 OK]>
<CIMultiDictProxy('Date': 'Sun, 09 Aug 2020 14:30:22 GMT', 'Content-Type': 'application/json', 'Content-Length': '360', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')>
...
Get response from https://httpbin.org/delay/5 response <ClientResponse(https://httpbin.org/delay/5) [200 OK]>
<CIMultiDictProxy('Date': 'Sun, 09 Aug 2020 14:30:22 GMT', 'Content-Type': 'application/json', 'Content-Length': '360', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')>
Cost time: 6.033240079879761
2
3
4
5
6
7
8
9
10
11
12
成功了!我们发现这次请求的耗时由 51 秒直接变成了 6 秒,耗费时间减少了非常多。
在代码里面,我们使用了 await
,后面跟了 get
方法。在执行这 10 个协程的时候,如果遇到了 await
,就会将当前协程挂起,转而去执行其他协程,直到其他协程也挂起或执行完毕,再执行下一个协程。这就是异步操作的便捷之处,当遇到阻塞式操作时,任务被挂起,程序接着去执行其他任务,而不是傻傻地等着,这样可以充分利用 CPU 时间,而不必把时间浪费在等待 IO 上。
综上所述,使用了异步请求之后,我们几乎可以在相同的时间内实现成百上千倍次的网络请求,把这个运用在爬虫中,速度提升可谓是非常可观了。