上一篇文章在python异步协程中处理流程分析(一)介绍了在python3中使用asyncio创建单个协程和多个协程,这篇文章介绍在异步的应用里如何调用同步的函数。

依然是之前准备的三个函数,一个阻塞的,两个异步的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 定义阻塞的函数
def ping(url):
print("阻塞函数开始运行")
time.sleep(2)
os.system("ping %s"%url)
print("阻塞函数运行结束")

# 定义两个异步函数
async def asyncfunc1():
print("Suspending func1")
await asyncio.sleep(1)
print("func func1 ", threading.current_thread())
print('Resuming func1')
return "func1"

async def asyncfunc2():
print("Suspending func2")
await asyncio.sleep(1)
print("func func2 ", threading.current_thread())
print('Resuming func2')
return "func2"

使用传统的多线程的方式跑同步代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#coding:gbk
import asyncio
import time,sys
import threading
import concurrent
import functools
import subprocess
import os

# 定义阻塞的函数
def ping(url):
print("阻塞函数开始运行,当前的线程ID为:",threading.current_thread())
time.sleep(2)
os.system("ping %s"%url)
print("阻塞函数运行结束")


async def main():
task1 = loop.create_task(asyncfunc1())
task1.add_done_callback(callbackfunc)
task2 = loop.create_task(asyncfunc2())
task2.add_done_callback(callbackfunc)
result = await asyncio.gather(task1,task2)
print(result)

async def mian2():
result = await asyncio.gather(asyncfunc1(),asyncfunc2())
print(result)

if __name__=="__main__":
print("In main thread ",threading.current_thread())
# 创建三个子线程
t1 = threading.Thread(target=ping,args=("www.baidu.com",))
t2 = threading.Thread(target=ping,args=("www.yangyanxing.com",))
t3 = threading.Thread(target=ping,args=("www.qq.com",))
t1.start()
t2.start()
t3.start()

输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
In main thread  <_MainThread(MainThread, started 9208)>
阻塞函数开始运行,当前的线程ID为: <Thread(Thread-1, started 8720)>
阻塞函数开始运行,当前的线程ID为: <Thread(Thread-2, started 9368)>
阻塞函数开始运行,当前的线程ID为: <Thread(Thread-3, started 8320)>

正在 Ping https.qq.com [123.151.137.18] 具有 32 字节的数据:
来自 123.151.137.18 的回复: 字节=32 时间=4ms TTL=53

正在 Ping www.a.shifen.com [220.181.38.150] 具有 32 字节的数据:
来自 220.181.38.150 的回复: 字节=32 时间=1ms TTL=54

正在 Ping yangyanxing.coding.me [119.28.76.36] 具有 32 字节的数据:
....
....
阻塞函数运行结束

可以看到,主线程和子线程跑在了不同的线程中。

在事件循环中动态的添加同步函数

解决方案是,先启一个子线程,这个线程用来跑事件循环loop,然后动态的将同步函数添加到事件循环中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#coding:gbk
import asyncio
import time,sys
import threading
import concurrent
import functools
import subprocess
import os

# 定义阻塞的函数
print("阻塞函数开始运行,当前的线程ID为:",threading.current_thread())
time.sleep(2)
print("模拟ping 输出 ",url)
print("阻塞函数运行结束,当前的线程ID为:",threading.current_thread())

#定义一个跑事件循环的线程函数
def start_thread_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()

if __name__=="__main__":
print("In main thread ",threading.current_thread())
loop = asyncio.get_event_loop()
# 在子线程中运行事件循环,让它run_forever
t = threading.Thread(target= start_thread_loop, args=(loop,))
t.start()

# 在主线程中动态添加同步函数
loop.call_soon_threadsafe(ping,"www.baidu.com")
loop.call_soon_threadsafe(ping,"www.qq.com")
loop.call_soon_threadsafe(ping,"www.yangyanxing.com")
print('主线程不会阻塞')

由于使用ping 命令得到很多输出,所以我对函数稍稍做了修改,只是模拟打印了一行文字,但是函数中的time.sleep(2) 这个是一个阻塞式的函数
得到的输出为

1
2
3
4
5
6
7
8
9
10
11
In main thread  <_MainThread(MainThread, started 7924)>
阻塞函数开始运行,当前的线程ID为: <Thread(Thread-1, started 10716)>
主线程不会阻塞
模拟ping 输出 www.baidu.com
阻塞函数运行结束,当前的线程ID为: <Thread(Thread-1, started 10716)>
阻塞函数开始运行,当前的线程ID为: <Thread(Thread-1, started 10716)>
模拟ping 输出 www.qq.com
阻塞函数运行结束,当前的线程ID为: <Thread(Thread-1, started 10716)>
阻塞函数开始运行,当前的线程ID为: <Thread(Thread-1, started 10716)>
模拟ping 输出 www.yangyanxing.com
阻塞函数运行结束,当前的线程ID为: <Thread(Thread-1, started 10716)>

从输出结果可以看出,loop.call_soon_threadsafe()和主线程是跑在同一个线程中的,虽然loop.call_soon_threadsafe()没有阻塞主线程的运行,但是由于需要跑的函数ping是阻塞式函数,所以调用了三次,这三次结果是顺序执行的,并没有实现并发。
如果想要实现并发,需要通过run_in_executor 把同步函数在一个执行器里去执行。该方法需要传入三个参数,run_in_executor(self, executor, func, *args) 第一个是执行器,默认可以传入None,如果传入的是None,将使用默认的执行器,一般执行器可以使用线程或者进程执行器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#coding:gbk
import asyncio
import time,sys
import threading
import concurrent
import functools
import subprocess
import os

# 定义阻塞的函数
def ping(url):
print("阻塞函数开始运行,当前的线程ID为:",threading.current_thread())
time.sleep(2)
print("模拟ping 输出 ",url)
print("阻塞函数运行结束,当前的线程ID为:",threading.current_thread())


#定义一个跑事件循环的线程函数
def start_thread_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()

if __name__=="__main__":
print("In main thread ",threading.current_thread())
loop = asyncio.get_event_loop()
# 在子线程中运行事件循环,让它run_forever
t = threading.Thread(target= start_thread_loop, args=(loop,))
t.start()

# 在主线程中动态添加同步函数
loop.run_in_executor(None,ping,"www.baidu.com")
loop.run_in_executor(None,ping,"www.qq.com")
loop.run_in_executor(None,ping,"www.yangyanxing.com")
print('主线程不会阻塞')

得到的输出结果

1
2
3
4
5
6
7
8
9
10
11
In main thread  <_MainThread(MainThread, started 8588)>
阻塞函数开始运行,当前的线程ID为: <Thread(Thread-2, started daemon 9068)>
阻塞函数开始运行,当前的线程ID为: <Thread(Thread-3, started daemon 7200)>
阻塞函数开始运行,当前的线程ID为: <Thread(Thread-4, started daemon 10924)>
主线程不会阻塞
模拟ping 输出 www.yangyanxing.com
模拟ping 输出 www.baidu.com
阻塞函数运行结束,当前的线程ID为: <Thread(Thread-4, started daemon 10924)>
阻塞函数运行结束,当前的线程ID为: <Thread(Thread-2, started daemon 9068)>
模拟ping 输出 www.qq.com
阻塞函数运行结束,当前的线程ID为: <Thread(Thread-3, started daemon 7200)>

可以看到同步函数实现了并发,但是它们跑在了不同的线程中,这个就和之前传统的使用多线程是一样的了。

上文说到,run_in_executor的第一个参数是执行器,这里执行器是使用concurrent.futures 下的两个类,一个是thread一个是process,也就是执行器可以分为线程执行器和进程执行器。它们在初始化的时候都有一个max_workers参数,如果不传则根据系统自身决定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#coding:gbk
import asyncio
import time,sys
import threading
import concurrent
import functools
import subprocess
import os

# 定义阻塞的函数
def ping(url):
print("阻塞函数开始运行,当前的线程ID为:",threading.current_thread(),"进程ID为:",os.getpid())
time.sleep(2)
print("模拟ping 输出 ",url)
print("阻塞函数运行结束,当前的线程ID为:",threading.current_thread())


#定义一个跑事件循环的线程函数
def start_thread_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()

if __name__=="__main__":
print("In main thread ",threading.current_thread())
loop = asyncio.get_event_loop()
# 在子线程中运行事件循环,让它run_forever
t = threading.Thread(target= start_thread_loop, args=(loop,))
t.start()
threadingexecutor = concurrent.futures.ThreadPoolExecutor(2)
processExetutor = concurrent.futures.ProcessPoolExecutor()

# 在主线程中动态添加同步函数
loop.run_in_executor(processExetutor,ping,"www.baidu.com")
loop.run_in_executor(processExetutor,ping,"www.qq.com")
loop.run_in_executor(processExetutor,ping,"www.yangyanxing.com")
print('主线程不会阻塞')

这里初始化了两个执行器,一个是线程的,一个是进程的,
它们执行的效果一样,只是一个跑在了多线程,一个跑在了多进程
使用concurrent.futures.ThreadPoolExecutor()执行器的结果是

1
2
3
4
5
6
7
8
9
10
11
In main thread  <_MainThread(MainThread, started 7688)>
阻塞函数开始运行,当前的线程ID为: <Thread(Thread-2, started daemon 10924)> 进程ID为: 8188
阻塞函数开始运行,当前的线程ID为: <Thread(Thread-3, started daemon 9068)> 进程ID为: 8188
主线程不会阻塞
模拟ping 输出 www.baidu.com
模拟ping 输出 www.qq.com
阻塞函数运行结束,当前的线程ID为: <Thread(Thread-2, started daemon 10924)>
阻塞函数运行结束,当前的线程ID为: <Thread(Thread-3, started daemon 9068)>
阻塞函数开始运行,当前的线程ID为: <Thread(Thread-2, started daemon 10924)> 进程ID为: 8188
模拟ping 输出 www.yangyanxing.com
阻塞函数运行结束,当前的线程ID为: <Thread(Thread-2, started daemon 10924)>

这们的进程ID都是8188,是跑在了同一个进程下。另外注意一下,我这里在初始化的时候传一个max_workers为2,注意看结果的输出,它是先执行了前两个,当有一个执行完了以后再开始执行第三个,而不是三个同时运行的。

使用concurrent.futures.ProcessPoolExecutor()执行器的执行结果

1
2
3
4
5
6
7
8
9
10
11
In main thread  <_MainThread(MainThread, started 10220)>
主线程不会阻塞
阻塞函数开始运行,当前的线程ID为: <_MainThread(MainThread, started 3928)> 进程ID为: 6652
阻塞函数开始运行,当前的线程ID为: <_MainThread(MainThread, started 10992)> 进程ID为: 9436
阻塞函数开始运行,当前的线程ID为: <_MainThread(MainThread, started 9740)> 进程ID为: 9000
模拟ping 输出 www.qq.com
阻塞函数运行结束,当前的线程ID为: <_MainThread(MainThread, started 3928)>
模拟ping 输出 www.baidu.com
阻塞函数运行结束,当前的线程ID为: <_MainThread(MainThread, started 10992)>
模拟ping 输出 www.yangyanxing.com
阻塞函数运行结束,当前的线程ID为: <_MainThread(MainThread, started 9740)>

可以看出来它们的进程ID是不同的。

这样看使用run_in_executor和使用多进程和多线程其实意义是一样的。别着急,在讲完异步函数以后就可以看到区别了。

在事件循环中动态的添加异步函数

通过asyncio.run_coroutine_threadsafe 方法来动态的将一个协程绑定到事件循环上,并且不会阻塞主线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#coding:gbk
import asyncio
import time,sys
import threading
import concurrent
import functools
import subprocess
import os

# 定义两个异步函数
async def asyncfunc1():
print("Suspending func1")
await asyncio.sleep(1)
print("func func1 ", threading.current_thread())
print('Resuming func1')
return "func1"

async def asyncfunc2():
print("Suspending func2")
await asyncio.sleep(1)
print("func func2 ", threading.current_thread())
print('Resuming func2')
return "func2"


#定义一个跑事件循环的线程函数
def start_thread_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()

if __name__=="__main__":
print("In main thread ",threading.current_thread())
loop = asyncio.get_event_loop()
# 在子线程中运行事件循环,让它run_forever
t = threading.Thread(target= start_thread_loop, args=(loop,))
t.start()
asyncio.run_coroutine_threadsafe(asyncfunc1(),loop)
asyncio.run_coroutine_threadsafe(asyncfunc1(),loop)
asyncio.run_coroutine_threadsafe(asyncfunc2(),loop)
asyncio.run_coroutine_threadsafe(asyncfunc2(),loop)

print('主线程不会阻塞')

通过asyncio.run_coroutine_threadsafe在loop上绑定了四个协程函数,得到的输出结果为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
In main thread  <_MainThread(MainThread, started 4772)>
Suspending func1
主线程不会阻塞
Suspending func1
Suspending func2
Suspending func2
func func1 <Thread(Thread-1, started 3948)>
Resuming func1
func func2 <Thread(Thread-1, started 3948)>
Resuming func2
func func1 <Thread(Thread-1, started 3948)>
Resuming func1
func func2 <Thread(Thread-1, started 3948)>
Resuming func2

主线程不会被阻塞,起的四个协程函数几乎同时返回的结果,但是注意,协程所在的线程和主线程不是同一个线程,因为此时事件循环loop是放到了另外的子线程中跑的,所以此时这四个协程放到事件循环的线程中运行的。
注意这里只有run_coroutine_threadsafe方法,没有run_coroutine_thread 方法。

获取协程的返回结果

获取结果可以使用asyncio.gather()方法,这里面传的是coros_or_futures就是协程或者task对象,asyncio.run_coroutine_threadsafe()run_in_executor()返回的都是Future对象,所以可以将它们共同放到gather里,获取返回值.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#coding:gbk
import asyncio
import time,sys
import threading
import concurrent
import functools
import subprocess
import os

# 定义阻塞的函数
def ping(url):
print("阻塞函数开始运行,当前的线程ID为:",threading.current_thread(),"进程ID为:",os.getpid())
time.sleep(4)
print("模拟ping 输出 ",url)
print("阻塞函数运行结束,当前的线程ID为:",threading.current_thread())
return url

# 定义两个异步函数
async def asyncfunc1():
print("Suspending func1")
await asyncio.sleep(1)
print("func func1 ", threading.current_thread())
print('Resuming func1')
return "func1"

async def asyncfunc2():
print("Suspending func2")
await asyncio.sleep(2)
print("func func2 ", threading.current_thread())
print('Resuming func2')
return "func2"


#定义一个跑事件循环的线程函数
def start_thread_loop(loop):
print("loop线程 id 为",threading.current_thread())
asyncio.set_event_loop(loop)
loop.run_forever()

# 定义一个回调函数
def callbackfunc(task):
print("task 运行结束,它的结果是:",task.result())
# loop.stop()

async def main():
t1 = time.time()
# 使用loop.create_task创建task对象,返回asyncio.tasks.Task对象
task1 = loop.create_task(asyncfunc1())
task2 = loop.create_task(asyncfunc2())
# 使用asyncio.run_coroutine_threadsafe 返回的是concurrent.futures._base.Future对象
# 注意这个对象没有__await__方法,所以不能对其使用await 但是可以给它添加回调add_done_callback
task3 = asyncio.run_coroutine_threadsafe(asyncfunc1(),loop)
task4 = asyncio.run_coroutine_threadsafe(asyncfunc2(),loop)

# 使用loop.run_in_executor创建阻塞的任务,返回asyncio.futures.Future对象
task5 = loop.run_in_executor(None,ping,"www.baidu.com")
task6 = loop.run_in_executor(None,ping,"www.yangyanxing.com")

# 使用asyncio.ensure_future()创建任务对象
task7 = asyncio.ensure_future(asyncfunc1())
task8 = asyncio.ensure_future(asyncfunc2())


task1.add_done_callback(callbackfunc)
task2.add_done_callback(callbackfunc)
task3.add_done_callback(callbackfunc)
task4.add_done_callback(callbackfunc)
task5.add_done_callback(callbackfunc)
task6.add_done_callback(callbackfunc)
task7.add_done_callback(callbackfunc)
task8.add_done_callback(callbackfunc)

result = await asyncio.gather(task1,task2,task5,task6,task7,task8)
print(result)
t2 = time.time()
print("一共用了%s时间"%(t2-t1))

async def mian2():
result = await asyncio.gather(asyncfunc1(),asyncfunc2(),)
print(result)

def shutdown(loop):
loop.stop()

if __name__=="__main__":
print("In main thread ",threading.current_thread())
loop = asyncio.get_event_loop()
loop2 = asyncio.new_event_loop()
# 在子线程中运行事件循环,让它run_forever
t = threading.Thread(target= start_thread_loop, args=(loop,))
t.start()
asyncio.run_coroutine_threadsafe(main(),loop)

print('主线程不会阻塞')

代码执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
In main thread  <_MainThread(MainThread, started 6052)>
loop线程 id 为 <Thread(Thread-1, started 2388)>
主线程不会阻塞
阻塞函数开始运行,当前的线程ID为: <Thread(Thread-2, started daemon 11644)> 进程ID为: 12280
阻塞函数开始运行,当前的线程ID为: <Thread(Thread-3, started daemon 1180)> 进程ID为: 12280
Suspending func1
Suspending func2
Suspending func1
Suspending func2
Suspending func1
Suspending func2
func func1 <Thread(Thread-1, started 2388)>
Resuming func1
func func1 <Thread(Thread-1, started 2388)>
Resuming func1
func func1 <Thread(Thread-1, started 2388)>
Resuming func1
task 运行结束,它的结果是: func1
task 运行结束,它的结果是: func1
task 运行结束,它的结果是: func1
func func2 <Thread(Thread-1, started 2388)>
Resuming func2
func func2 <Thread(Thread-1, started 2388)>
Resuming func2
func func2 <Thread(Thread-1, started 2388)>
Resuming func2
task 运行结束,它的结果是: func2
task 运行结束,它的结果是: func2
task 运行结束,它的结果是: func2
模拟ping 输出 www.baidu.com
阻塞函数运行结束,当前的线程ID为: <Thread(Thread-2, started daemon 11644)>
模拟ping 输出 www.yangyanxing.com
阻塞函数运行结束,当前的线程ID为: <Thread(Thread-3, started daemon 1180)>
task 运行结束,它的结果是: www.baidu.com
task 运行结束,它的结果是: www.yangyanxing.com
['func1', 'func2', 'www.baidu.com', 'www.yangyanxing.com', 'func1', 'func2']
一共用了4.002800464630127时间

总的时间是取决于所有运行的函数中耗时最长的,这里同步函数有个阻塞的sleep(4) ,所以总的时间是4秒多一点点.

关于在异步协程中的处理流程先总结这么多,之后再学习总结一个与异步相关的各种库如aiohttp的使用等等.