目录

python中的异步实践与tornado应用

最近项目中由于在python3中使用tornado,之前也有用过,是在python2中,由于对于协程理解不是很透彻,只是套用官方文档中的写法,最近比较细致的看了下协程的用法,也将tornado在python3中异步的实践了一下。

异步基础

要理解协程,先要理解异步,要理解异步,先要理解同步,与同步相关的概念又有阻塞与非阻塞,下面一一做简单介绍。

阻塞

阻塞状态指程序未得到所需计算资源时被挂起的状态。程序在等待某个操作完成期间,自身无法继续干别的事情,则称该程序在该操作上是阻塞的。 常见的阻塞形式有:网络 I/O 阻塞、磁盘 I/O 阻塞、用户输入阻塞等。阻塞是无处不在的,包括 CPU 切换上下文时,所有的进程都无法真正干事情,它们也会被阻塞。如果是多核 CPU 则正在执行上下文切换操作的核不可被利用。

非阻塞

程序在等待某操作过程中,自身不被阻塞,可以继续运行干别的事情,则称该程序在该操作上是非阻塞的。 非阻塞并不是在任何程序级别、任何情况下都可以存在的。 仅当程序封装的级别可以囊括独立的子程序单元时,它才可能存在非阻塞状态。 非阻塞的存在是因为阻塞存在,正因为某个操作阻塞导致的耗时与效率低下,我们才要把它变成非阻塞的。

同步

不同程序单元为了完成某个任务,在执行过程中需靠某种通信方式以协调一致,称这些程序单元是同步执行的。 例如购物系统中更新商品库存,需要用“行锁”作为通信信号,让不同的更新请求强制排队顺序执行,那更新库存的操作是同步的。 简言之,同步意味着有序。

异步

为完成某个任务,不同程序单元之间过程中无需通信协调,也能完成任务的方式,不相关的程序单元之间可以是异步的。 例如,爬虫下载网页。调度程序调用下载程序后,即可调度其他任务,而无需与该下载任务保持通信以协调行为。不同网页的下载、保存等操作都是无关的,也无需相互通知协调。这些异步操作的完成时刻并不确定。 简言之,异步意味着无序。

这个概念让我想起了上学时学过的一篇文章,讲统筹安排的,比如你现在要烧水,做饭,洗衣服三件事,如果同步的进行,先烧水,在水烧开的过程中你什么都不做就等着它烧开,然后水烧开以后你再接着做饭,饭做熟的过程中你也是什么都不干,就干等着,饭做熟后再去将洗衣服放入洗衣机中去洗,之后又是干等着。如果单做一件事的时间是烧水10分别,做饭30分钟,洗衣服20分钟,那么完成这三件事总共需要60分钟。 如果将这三件事异步的去进行,我先将水烧上,然后再将衣服放到洗衣机里,然后去做饭,这三件事同时进行,当水烧开的时候给我一个信号,这里就是水壶会响,我听到响声以后我会中止做饭这件事情去处理烧开的水,比如把它倒到保温瓶中,衣服洗完以后洗衣机也会给我一个信号,那么我就会将衣服拿出来晾晒。这样处理完三件事总共的时间就由三件事情中最长的时间决定,这里就是30分钟,其实异步的处理就是最大程度的发挥cup的处理能力,让其在同一时间内做更多的事情。

上面的过程用代码来实现大概是这个样子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#coding:gbk

import time
import threading,multiprocessing

def water():
    print("开始烧水了")
    start = time.time()
    time.sleep(2)
    end = time.time()
    return ("水开了", end - start)

def wash():
    print("开始洗衣服了")
    start = time.time()
    time.sleep(4)
    end = time.time()
    return ("衣服洗完了", end - start)

def cook():
    print("开始做饭了")
    start = time.time()
    time.sleep(10)
    end = time.time()
    return ("饭熟了",end - start)

def alarm(info):
    print("%s 一共用了%s 时间"%(info[0],info[1]))

class DoWork(threading.Thread):
    def __init__(self, func, callback,*params, **paramMap):
        threading.Thread.__init__(self)
        self.func = func
        self.params = params
        self.paramMap = paramMap
        self.rst = None
        self.finished = False
        self.isDaemon = True
        self.isDaemon = False
        self.callback = callback

    def run(self):
        self.rst = self.func(*self.params, **self.paramMap)
        self.callback(self.rst)
        self.finished = True

if __name__ == '__main__':
    task1 = DoWork(water,alarm)
    task2 = DoWork(wash,alarm)
    task3 = DoWork(cook,alarm)

    start = time.time()
    task1.start()
    task2.start()
    task3.start()
    task1.join()
    task2.join()
    task3.join()
    end = time.time()
    print("一共用了%s时间"%(end-start))

执行结果如下

1
2
3
4
5
6
7
开始烧水了
开始洗衣服了
开始做饭了
水开了 一共用了2.0003998279571533 时间
衣服洗完了 一共用了4.00029993057251 时间
饭熟了 一共用了10.000999689102173 时间
一共用了10.001499891281128时间

yield 语法

以上是用了多线程的方式来达到异步的效果,但是并没有用到协程,协程在python2就有,现在来看看在python2中通过yield语法。以下方法是在python2.6中执行.

要了解 yield 语法,先要了解一个概念: Generator 『生成器』,关于generator的概念可以参考廖雪峰的教程,写的很好,生成器

如果一个函数定义中包含 yield 关键字,那么这个函数就不再是一个普通函数,而是一个 generator

1
2
3
4
5
6
7
def h():
    print('I am yangyanxing')
    yield 5


if __name__ == '__main__':
    c = h()

运行该脚本以后程序并没有任何输出,因为它有yield表达式,因此,我们通过next()语句让它执行。next()语句将恢复Generator执行,并直到下一个yield表达式处。比如:

1
2
3
4
5
6
7
8
9
def h():
    print('I am yangyanxing')
    yield 5
    print("I am fjy")


if __name__ == '__main__':
    c = h()
    c.next()

调用 c.nect() 以后,函数开始执行,这时先打印 “I am yangyanxing”, 之后遇到 yield 关键字,此时函数又被中断,脚本执行结束,程序只打印了一行 “I am yangyanxing”,如果想要打印出 I am fjy 呢,以时需要再调用一次 c.next(), 当再次调用 c.next() 时,函数从之前的 yield 处开始执行,由于函数在之后没有 yield 了,所以程序会抛一个 StopIteration 异常。

next() 函数相关的还有一个 send() 函数,next 函数传递的是 Nonesend 函数可以传递对应的值。其实next()和send()在一定意义上作用是相似的,区别是send()可以传递yield表达式的值进去,而next()不能传递特定的值,只能传递None进去。因此,我们可以看做 c.next() 和 c.send(None) 作用是一样的。看如下的代码。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def h():
    print('I am yangyanxing')
    m = yield 5
    print(m)
    print("I am fjy")
    yield 6
    print("They love too much")

if __name__ == '__main__':
    c = h()
    c.next()
    c.send("hahaha")

函数的输出为

1
2
3
I am yangyanxing
hahaha
I am fjy

这代码解析,首先通过 c = h() 定义了一个 generator ,然后调用 c.next() 启动这个生成器,生成器先打印出 I am yangyanxing 然后遇到 m = yield 5 这行代码,后停止,之后再调用 c.send("hahaha") ,这时候 m 的值就是 hahaha, 然后再打印出 m ,之后再打印出 I am fjy,之后又遇到了 yield 关键字,程序又中止了,整个脚本执行结束,需要提醒的是,第一次调用时,请使用next()语句或是send(None),不能使用send发送一个非None的值,否则会出错的,因为没有yield语句来接收这个值。 那么 next()send() 函数的返回值是什么呢? 注意到上面函数中的 yield 之后是一个5了吗?其实这就是调用 netx 或者 send 以后得到的返回值.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def h():
    print('I am yangyanxing')
    m = yield 5
    print(m)
    print("I am fjy")
    yield 6

if __name__ == '__main__':
    c = h()
    print(c.next())
    print(c.send("hahaha"))

得到的输出为

1
2
3
4
5
I am yangyanxing
5
hahaha
I am fjy
6

异步使用

同步的困扰

首先看以下的代码,以下是在python2中编写

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.httpclient
import time

from tornado.options import define, options
define("port", default=8000, help="run on the given port", type=int)

class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        query = self.get_argument('q')
        time.sleep(5)
        self.write("hello %s"%query)

if __name__ == "__main__":
    tornado.options.parse_command_line()
    app = tornado.web.Application(handlers=[(r"/", IndexHandler)])
    http_server = tornado.httpserver.HTTPServer(app)
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.instance().start()

我分别用浏览器和和用脚本对 http://127.0.0.1:8000/?q=yangyanxing 该 url 进行访问,脚本如下

1
2
3
4
5
6
7
8
9
import requests,time

def geturl(url):
     s = time.time()
     r = requests.get(url)
     e = time.time()
     print(int(e-s))

getrul(r"http://127.0.0.1:8000/?q=yangyanxing")

服务端显示

1
2
[I 190114 00:03:46 web:2162] 304 GET /?q=yangyanxing (127.0.0.1) 5000.97ms
[I 190114 00:03:51 web:2162] 200 GET /?q=yangyanxing (127.0.0.1) 5006.78ms

脚本打印为 7或者8

在同步应用中,由于同时只能提供一个请求。所以,如果一个路由中有一个比较耗时的操作,如代码中的 time.sleep(5) 那么意味着如果同时有两个请求,那么第二个请求只能等待服务器处理完第一个请求之后才能处理第二个请求,也就中处理两个请求,最短要5秒,最长要10秒,这还只是2个,如果有10个那就是要50秒处理完所有的请求,这个效率是无法接受的,服务端可否同时处理10个请求,就像文章一开始提到的同时做三件事情,在处理一个耗时的操作时,不是干等着这件事情处理完,而是去做别的事情,当那件事情结束以后,再通过调用回调函数来通知调用者。

异步的使用

  • 客户端的实现 异步的使用可以分为客户端的调用与服务端的处理,先从简单的来看,客户端的调用,比如你要同时访问 baidu.com 10次,你会怎么做?可以依次的对 baidu 发起10次请求,每次请求结束以后再发起下一次请求,假如每次请求是1秒钟,那么10次请求至少要用10秒钟,排除IO相关耗时,代码可能是这个样子的
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
#coding:utf-8

import time
import requests

def geturl(url):
    start = time.time()
    r = requests.get(url)
    end = time.time()
    print("用了%s时间"%(end-start))
    return r.status_code


if __name__ == '__main__':
    start = time.time()
    for i in range(10):
        geturl(r"https://www.baidu.com")
    end = time.time()
    print("all done,use %s time"%(end-start))

执行结果如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
用了0.2974998950958252时间
用了0.0494999885559082时间
用了0.0410001277923584时间
用了0.029999971389770508时间
用了0.18199992179870605时间
用了0.02700018882751465时间
用了0.022499799728393555时间
用了0.020000219345092773时间
用了0.020999908447265625时间
用了0.019999980926513672时间
all done,use 0.7105000019073486 time 

一共用了0.7秒,百度的反应可能太快了,我们准备一个本地的环境来模拟慢返回。这里我先使用tornado的异步协程处理,之后再详细说明该处的用法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.httpclient
import asyncio

import urllib
import json
import datetime
import time

from tornado.options import define, options
define("port", default=8000, help="run on the given port", type=int)

class IndexHandler(tornado.web.RequestHandler):
    async def get(self):
        query = self.get_argument('q')
        await asyncio.sleep(5)
        self.writeres("hello %s"%query)
        self.finish()

    def writeres(self,returnstr):
        self.write(returnstr)

if __name__ == "__main__":
    tornado.options.parse_command_line()
    app = tornado.web.Application(handlers=[(r"/", IndexHandler)])
    http_server = tornado.httpserver.HTTPServer(app)
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.instance().start()

请求代码改为三次,只是为了说明问题

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def geturl(url):
    start = time.time()
    r = requests.get(url)
    end = time.time()
    print("用了%s时间"%(end-start))
    return r.status_code


if __name__ == '__main__':
    start = time.time()
    for i in range(3):
        geturl(r"http://127.0.0.1:8000/?q=yangyanxing")
    end = time.time()
    print("all done,use %s time"%(end-start))

结果:

1
2
3
4
用了5.009501695632935时间
用了5.012002229690552时间
用了5.012502193450928时间
all done,use 15.035006284713745 time

可以看到,总是时间是15秒,同步对一个url发请求,在没有做异步处理的时候时间是累积的。

接下来说本篇的重点,协程.

  • 定义协程

在一个普通的函数前面加上 async 关键字,此时该函数会返回一个coroutine对象,函数里也不会立刻执行.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
async def getUrlByCor(url):
    start = time.time()
    r = requests.get(url)
    end = time.time()
    print("用了%s时间" % (end - start))
    return r.status_code

if __name__ == '__main__':
    s = getUrlByCor(r'http://127.0.0.1:8000/?q=yangyanxing')
    print(s)

运行结果:

1
 <coroutine object getUrlByCor at 0x00000000034BBF10>

此处的 s 是一个coroutine对象,那么怎么才能执行函数里面的方法呢? 将这个协程对象放到事件循环 event_loop 中执行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
async def getUrlByCor(url):
    start = time.time()
    r = requests.get(url)
    end = time.time()
    print("用了%s时间" % (end - start))
    return r.status_code

if __name__ == '__main__':
    s = getUrlByCor(r"http://127.0.0.1:8000/?q=yangyanxing")
    loop = asyncio.get_event_loop()
    loop.run_until_complete(s)

执行结果:

用了5.009500026702881时间

如果同时发三个请求呢? 这里要对协程做一个封装,将其封装成一个 task 对象

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
async def getUrlByCor(url):
    start = time.time()
    r = requests.get(url)
    end = time.time()
    print("用了%s时间" % (end - start))
    return r.text

if __name__ == '__main__':
    s = time.time()
    tasks = [asyncio.ensure_future(getUrlByCor(r"http://127.0.0.1:8000/?q=yangyanxing")) for i in range(3)]
    print(len(tasks),tasks)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    e = time.time()
    print("use %s time"%(e-s))
    for task in tasks:
        print('Task Result:', task.result())

结果如下:

1
2
3
4
5
6
7
8
3 [<Task pending coro=<getUrlByCor() running at E:/Github/asyncTorMysql/asynctest.py:69>>, <Task pending coro=<getUrlByCor() running at E:/Github/asyncTorMysql/asynctest.py:69>>, <Task pending coro=<getUrlByCor() running at E:/Github/asyncTorMysql/asynctest.py:69>>]
用了5.008501291275024时间
用了5.012002229690552时间
用了5.012002229690552时间
use 15.03450632095337 time
Task Result: hello yangyanxing
Task Result: hello yangyanxing
Task Result: hello yangyanxing

总的时间还是15秒,并没有实现异步呢,还是同步的依次执行请求。 其实,要实现异步处理,我们得先要有挂起的操作,当一个任务需要等待 IO 结果的时候,可以挂起当前任务,转而去执行其他任务,这样我们才能充分利用好资源,上面方法都是一本正经的串行走下来,连个挂起都没有,怎么可能实现异步?

上面的函数存在耗时的操作就是r = requests.get(url) 那么将它写成挂起的呢? r = await requests.get(url)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
async def getUrlByCor(url):
    start = time.time()
    r = await requests.get(url)
    end = time.time()
    print("用了%s时间" % (end - start))
    return r.text

if __name__ == '__main__':
    s = time.time()
    tasks = [asyncio.ensure_future(getUrlByCor(r"http://127.0.0.1:8000/?q=yangyanxing")) for i in range(3)]
    print(len(tasks),tasks)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    e = time.time()
    print("use %s time"%(e-s))
    for task in tasks:
        print('Task Result:', task.result())

不出意外的报错了

Task exception was never retrieved future: <Task finished coro=<getUrlByCor() done, defined at E:/Github/asyncTorMysql/asynctest.py:69> exception=TypeError(“object Response can’t be used in ‘await’ expression”,)> Traceback (most recent call last): File “C:\Python35\lib\asyncio\tasks.py”, line 240, in _step result = coro.send(None) File “E:/Github/asyncTorMysql/asynctest.py”, line 71, in getUrlByCor r = await requests.get(url) TypeError: object Response can’t be used in ‘await’ expression

这个错误的意思是 requests 返回的 Response 对象不能和 await 一起使用,await 后面的对象必须是如下格式之一

  1. 原生 coroutine 对象
  2. 一个由 types.coroutine() 修饰的生成器,这个生成器可以返回 coroutine 对象。
  3. 一个包含 __await 方法的对象返回的一个迭代器。

reqeusts 返回的 Response 不符合上面任一条件,因此就会报上面的错误了。

既然 await 后面的对象要是 coroutine 对象 ,那么将其包装在async 后面不就可以了吗?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
async def get(url):
    return requests.get(url)

async def getUrlByCor(url):
    start = time.time()
    r = await get(url)
    end = time.time()
    print("用了%s时间" % (end - start))
    return r.text

if __name__ == '__main__':
    s = time.time()
    tasks = [asyncio.ensure_future(getUrlByCor(r"http://127.0.0.1:8000/?q=yangyanxing")) for i in range(3)]
    print(len(tasks),tasks)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    e = time.time()
    print("use %s time"%(e-s))
    for task in tasks:
        print('Task Result:', task.result())

这次不报错了,但是依然没有异步的执行

1
2
3
4
5
6
7
用了5.0090014934539795时间
用了5.011002063751221时间
用了5.011502027511597时间
use 15.03450632095337 time
Task Result: hello yangyanxing
Task Result: hello yangyanxing
Task Result: hello yangyanxing

也就是说我们仅仅将涉及 IO 操作的代码封装到 async 修饰的方法里面是不可行的!我们必须要使用支持异步操作的请求方式才可以实现真正的异步,所以这里就需要 aiohttp 派上用场了

aiohttp 是一个支持异步请求的库,利用它和 asyncio 配合我们可以非常方便地实现异步请求操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
async def get(url):
    session = aiohttp.ClientSession()
    res = await session.get(url)
    result = await res.text()
    await session.close()
    return result

async def getUrlByCor(url):
    start = time.time()
    r = await get(url)
    end = time.time()
    print("用了%s时间" % (end - start))
    return r

if __name__ == '__main__':
    s = time.time()
    tasks = [asyncio.ensure_future(getUrlByCor(r"http://127.0.0.1:8000/?q=yangyanxing")) for i in range(3)]
    print(len(tasks),tasks)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    e = time.time()
    print("use %s time"%(e-s))
    for task in tasks:
        print('Task Result:', task.result())

执行结果:

1
2
3
4
5
6
7
用了5.006500005722046时间
用了5.006499767303467时间
用了5.006500005722046时间
use 5.008500099182129 time
Task Result: hello yangyanxing
Task Result: hello yangyanxing
Task Result: hello yangyanxing

这次终于实现了异步请求。

还记得最开始的洗衣做饭的例子吗?可以使用异步协程来实现,代码大概是这个样子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#coding:utf-8

import time
import threading,multiprocessing
import requests
import asyncio
import aiohttp

async def water():
    print("开始烧水了")
    start = time.time()
    await asyncio.sleep(2) # 这里使用asyncio.sleep 异步休眠
    end = time.time()
    return ("水开了", end - start)

async def wash():
    print("开始洗衣服了")
    start = time.time()
    await asyncio.sleep(4)
    end = time.time()
    return ("衣服洗完了", end - start)

async def cook():
    print("开始做饭了")
    start = time.time()
    await asyncio.sleep(10)
    end = time.time()
    return ("饭熟了",end - start)

def alarm(task):
    print("%s 一共用了%s 时间"%(task.result()[0],task.result()[1]))

if __name__ == '__main__':
    tasks = []
    # 初始化task
    task_water = asyncio.ensure_future(water())
    task_wash = asyncio.ensure_future(wash())
    task_cook = asyncio.ensure_future(cook())
    tasks.extend([task_water,task_wash,task_cook])
    # 将这三个task 绑定回调函数
    task_water.add_done_callback(alarm)
    task_wash.add_done_callback(alarm)
    task_cook.add_done_callback(alarm)
    # 将task放入事件循环中
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))

执行结果:

开始烧水了 开始洗衣服了 开始做饭了 水开了 一共用了2.0 时间 衣服洗完了 一共用了4.003999948501587 时间 饭熟了 一共用了9.994499921798706 时间

服务端的实现

  • 先看下tornado在python2中的解决方案.

我们再来翻过头来看之前用tornado写的服务端同步代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.httpclient
import time

from tornado.options import define, options
define("port", default=8000, help="run on the given port", type=int)

class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        query = self.get_argument('q')
        time.sleep(5)
        self.write("hello %s"%query)

if __name__ == "__main__":
    tornado.options.parse_command_line()
    app = tornado.web.Application(handlers=[(r"/", IndexHandler)])
    http_server = tornado.httpserver.HTTPServer(app)
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.instance().start()

IndexHandler 中的 get 方法,由于当中存在了一个比较耗时的操作,time.sleep(5) 处理完这个请求需要卡5秒,在卡住的这段时间,tornado无法再完成别的请求,如果此时再发来一个 / 的请求,那么只能等待这前的请求操作结束之后再对处理新发过来的请求,如果同时有1万个请求发过来,可想而知,最后一个请求就等到猴年马月才能处理完呢……

解决方法是使用@tornado.web.asynchronous@tornado.gen.coroutine 装饰器,将耗时的操作放到线程中去执行,这里的耗时操作 time.sleep(5) 是阻塞的,所以将阻塞函数放加上 @run_on_executor 装饰器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.httpclient
import tornado.autoreload
from tornado.concurrent import run_on_executor

import time
from concurrent.futures import ThreadPoolExecutor

from tornado.options import define, options
define("port", default=8000, help="run on the given port", type=int)

class IndexHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(100)
    @tornado.web.asynchronous
    @tornado.gen.coroutine
    def get(self):
        query = self.get_argument('q')
        res = yield self.sleepBlock(2,query)
        self.write("hello %s"%res)
        self.finish()

    @run_on_executor
    def sleepBlock(self,sleeptime,query):
        time.sleep(sleeptime)
        return query

if __name__ == "__main__":
    tornado.options.parse_command_line()
    app = tornado.web.Application(handlers=[(r"/", IndexHandler)],settings={"debug":True})
    http_server = tornado.httpserver.HTTPServer(app)
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.instance().start()

注意到在 IndexHandler 中有一行初始化 executor 的代码 executor = ThreadPoolExecutor(100) 这里的参数100是最大的线程数,我这里传的是100,也就意味着同时能处理100个请求,当有101个请求的时候,前100个请求可以同时在2秒内执行,最后的那一个请求就要等之前有结束的线程以后再去执行了。

  • 再看下tornado在python3.5 中的解决方案

由于在python3.5以后引入了 asyncio这个标准库,很多异步的操作可以用这个库来操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.httpclient
import tornado.autoreload
from tornado.concurrent import run_on_executor
import asyncio

import time
from concurrent.futures import ThreadPoolExecutor

from tornado.options import define, options
define("port", default=8000, help="run on the given port", type=int)

class IndexHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(100)
    @tornado.web.asynchronous
    @tornado.gen.coroutine
    def get(self):
        query = self.get_argument('q')
        res = yield self.sleepBlock(2,query)
        self.write("hello %s"%res)
        self.finish()

    @run_on_executor
    def sleepBlock(self,sleeptime,query):
        time.sleep(sleeptime)
        return query

class asyncIndexHandler(tornado.web.RequestHandler):
    async def get(self):
        query = self.get_argument('q')
        res = await self.sleepBlock(2,query)
        self.write("hello %s"%res)
        self.finish()

    async def sleepBlock(self,sleeptime,query):
        await asyncio.sleep(sleeptime) # 使用异步的sleep方法
        return query

if __name__ == "__main__":
    tornado.options.parse_command_line()
    app = tornado.web.Application(handlers=[
        (r"/", IndexHandler),
        (r"/async",asyncIndexHandler)
    ],settings={"debug":True})
    http_server = tornado.httpserver.HTTPServer(app)
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.instance().start()

IndexHandler 中的 get 方法使用了asyncawait 关键字来达到异步的处理请求,这里的asyncio.sleep(5) 是异步的暂停5秒,如果此处的方法涉及到无法使用异步请求的库该怎么处理,比如说我就想使用time.sleep(5) 则需要在线程池中运行,就像上面的/ 路由里使用 @run_on_executor 中执行。

结语

异步操作涉及的知识点比较多,不同版本的 python 对于异步的处理也不一样,有些东西如 yield 理解起来比较费劲,需要多在项目中实践,tornado 这个框架的设计初衷也是异步网络库,过使用非阻塞网络I/O, Tornado 可以支持上万级的连接,所以要使用过程中要多多考虑异步非阻塞的编码。

参考文章

爬虫速度太慢?来试试用异步协程提速吧!

Python3 异步协程函数async具体用法

Python天天美味(25) - 深入理解yield

理解Python协程:从yield/send到yield from再到async/await

使用tornado让你的请求异步非阻塞

  • 文章标题: python中的异步实践与tornado应用
  • 本文作者: 杨彦星
  • 本文链接: https://www.yangyanxing.com/article/async_in_python.html
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。