目录

使用aiomysql异步操作mysql

之前一直在使用mongo与redis,最近在项目中开始使用mysql数据库,由于现在的项目是全程异步的操作,所以在在网上查了下关于在python中异步的操作mysql,找来找去最后发现aiomysql的是实现最好的,现在简单介绍一下它的使用。

aiomysql的文档地址 https://aiomysql.readthedocs.io/en/latest/

需要根据项目中使用mysql查询的频率来选择是使用单独的connection还是使用连接池,查询较少的可以选择使用connection,使用一次以后就断开,再次使用再次连接,但是对于mysql,每次连接的开销都很高,所以建议还是使用连接池,由于不同的mysql服务对于interactive_timeout的设置时间不同,所以这里还要注意一下这个超时问题,在同步版本中关于mysql主动断开连接的问题可以参考我之前的文章,解决mysql服务器在无操作超时主动断开连接的问题 ,异步版本同样也要注意这个问题。

为了测试,我在docker中启了一个mysql服务,并且设置interactive_timeout为5秒,非常短,这样测试以后,如果一个连接在5秒钟之内都没有任何查询则主动将该连接断开。数据很简单就两条

1
2
3
4
5
6
7
8
mysql> select * from person;
+----+------+-----+
| id | name | age |
+----+------+-----+
|  1 | yang |  18 |
|  2 | fan  |  16 |
+----+------+-----+
2 rows in set

使用单独的connection

根据官方文档,我对其进行了一点封装,采用单例模式。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#coding:utf-8

import aiomysql
import asyncio
import logging
import traceback
'''
mysql 异步版本
'''

logobj = logging.getLogger('mysql')

class Pmysql:
    __connection = None

    def __init__(self):
        self.cursor = None
        self.connection = None

    @staticmethod
    async def getconnection():
        if Pmysql.__connection == None:
            conn = await aiomysql.connect(
                host='127.0.0.1',
                port=3306,
                user='root',
                password='123456',
                db='mytest',
                )
            if conn:
                Pmysql.__connection = conn
                return conn
            else:
                raise("connect to mysql error ")
        else:
            return Pmysql.__connection

    async def query(self,query,args=None):
        self.cursor = await self.connection.cursor()
        await self.cursor.execute(query,args)
        r = await self.cursor.fetchall()
        await self.cursor.close()
        return r


async def test():
    conn = await Pmysql.getconnection()
    mysqlobj.connection = conn
    await conn.ping()
    r = await mysqlobj.query("select * from person")
    for i in r:
        print(i)
    conn.close()

if __name__ == '__main__':
    mysqlobj = Pmysql()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(test())

这个小脚本执行很顺利,得到的结果

1
2
(1, 'yang', 18)
(2, 'fan', 16)

简单说明一个这个脚本,由于aiomysql.connect是异步的,在python里 __init__ 方法不能使用async关键词,也就是在对象的初始化时不能异步,所以我将获取连接的操作单独的使用单例模式来创建一个连接,当然也可以不使用单例,每次进行查询的时候,都重新获取一个新的连接connection。

处理连接无操作超时问题

还是那个老生常谈的问题,如果某个连接在一段时间内无操作,mysql会主动断开这个连接,我这里设置的5秒钟,那么我们看看停顿6秒钟以后再次尝试查询操作会怎样?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
async def test():
    conn = await Pmysql.getconnection()
    mysqlobj.connection = conn
    await conn.ping()
    r = await mysqlobj.query("select * from person")
    for i in r:
        print(i)
    await asyncio.sleep(6)
    r2 = await mysqlobj.query("select * from person")
    for i in r2:
        print(i)
    conn.close()

sleep了6秒钟以后,当再次使用该connection的cursor对象进行查询操作时,由于mysql服务已经将该连接关闭,所以会得到2013, 'Lost connection to MySQL server during query'错误。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
C:\Python35\python.exe F:/python/python3Test/mysqltest3.py
(1, 'yang', 18)
(2, 'fan', 16)
Traceback (most recent call last):
  File "C:\Python35\lib\site-packages\aiomysql\connection.py", line 598, in _read_bytes
...
...
  File "C:\Python35\lib\site-packages\aiomysql\connection.py", line 601, in _read_bytes
    raise OperationalError(2013, msg) from e
pymysql.err.OperationalError: (2013, 'Lost connection to MySQL server during query')
...
...

解决方法还是和之前同步版本一样,在进行查询操作之前,先使用connection.ping()方法来检查一下连接是否有效,该方法默认会在连接无效的时候进行重新连接。这里我直接修改Pmysql类的query方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#coding:utf-8

import aiomysql
import asyncio
import logging
import traceback
'''
mysql 异步版本
'''

logobj = logging.getLogger('mysql')

class Pmysql:
    __connection = None

    def __init__(self):
        self.cursor = None
        self.connection = None

    @staticmethod
    async def getconnection():
        if Pmysql.__connection == None:
            conn = await aiomysql.connect(
                host='127.0.0.1',
                port=3306,
                user='root',
                password='123456',
                db='mytest',
                )
            if conn:
                Pmysql.__connection = conn
                return conn
            else:
                raise("connect to mysql error ")
        else:
            return Pmysql.__connection

    async def query(self,query,args=None):
        self.cursor = await self.connection.cursor()
        #每次进行查询操作时都先执行一下ping()方法来检查一下连接是否有效
        await self.connection.ping()
        await self.cursor.execute(query,args)
        r = await self.cursor.fetchall()
        await self.cursor.close()
        return r


async def test():
    conn = await Pmysql.getconnection()
    mysqlobj.connection = conn
    r = await mysqlobj.query("select * from person")
    for i in r:
        print(i)
    await asyncio.sleep(6)
    r2 = await mysqlobj.query("select * from person")
    for i in r2:
        print(i)
    conn.close()

if __name__ == '__main__':
    mysqlobj = Pmysql()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(test())

这样上面的脚本就可以正常的执行了。

异步地执行多个查询

异步的操作的优势在于它可以"同时"的进行多个操作,如果查询只是一个一个的单独查询,那用不用异步其实都无所谓,这里尝试使用异步来同时执行多个操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#coding:utf-8
async def test():
    conn = await Pmysql.getconnection()
    mysqlobj.connection = conn
    r = await mysqlobj.query("select * from person")
    mysqlobj.connection.close()
    return r

async def test2():
    conn = await Pmysql.getconnection()
    mysqlobj.connection = conn
    r = await mysqlobj.query("select * from person where id=(%s)",(1,))
    mysqlobj.connection.close()
    return r

async def querysum():
    result = await asyncio.gather(test(),test2())
    for i in result:
        print(i)


if __name__ == '__main__':
    mysqlobj = Pmysql()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(querysum())

这里我准备了两个查询操作,test和test2,并将它们的结果放到另外一个协程querysum中,但是结果却出乎意料,脚本崩了…… 我看崩溃信息很多,其中有一条 RuntimeError: readexactly() called while another coroutine is already waiting for incoming data, 这条给我感觉是当一个协程在等待数据的时候突然另外一个协程进来了打断了它的数据读取。 我个人推断应该是我采用了单例,它们共用一个connection然后在异步的处理过程中,当一个查询在进行过程中,在等待协程的数据返回,此时由于用了await,执行权会让出给别的协程,但是此时如果别的协程又在该connection上进行了数据库查询,则会影响到被await协程的数据读取。但是他们是用的同一个connection吗? 我打印一下看看

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
async def test():
    conn = await Pmysql.getconnection()
    print("test...",id(conn))
    mysqlobj.connection = conn
    r = await mysqlobj.query("select * from person")
    mysqlobj.connection.close()
    return r

async def test2():
    conn = await Pmysql.getconnection()
    print("test2..",id(conn))
    mysqlobj.connection = conn
    r = await mysqlobj.query("select * from person where id=(%s)",(1,))
    mysqlobj.connection.close()
    return r

得到的结果是:

1
2
test... 1719190377360
test2.. 1719190377976

惊奇的发现,它们用的居然不是同一个connection…,那么问题是不是出在了单例模式下的初始化connection函数上…… 我们返来看一下connection初始化条件if Pmysql.__connection == None:,如果__connection 是None的话则进行初始化操作,据此我又推断,由于我在两个协程中共用的是一个全局的mysqlobj,mysqlobj = Pmysql(),所以在这两个协程运行的一开始在同时调用await Pmysql.getconnection()时,由于此时,这个mysqlobj的__connection是空,所以这两个协程此时的判断都是为空,所以都重新进行了数据库连接操作,然后把各自初始化获取的conn赋给了mysqlobj.connection,但是这就有问题了,同一个对象的某个属性的值就变了,所以在之后使用connection的cursor对象进行数据库查询操作时就会出现问题…… 那么我把其中一个协程在获取connection对象之前先暂停一下呢,让另外一个协程先获取到connection,这样当另外的协程再次获取的时候就可以直接获取到之前初始化过的connection了.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
async def test():
    conn = await Pmysql.getconnection()
    print("test...",id(conn))
    mysqlobj.connection = conn
    r = await mysqlobj.query("select * from person")
    mysqlobj.connection.close()
    return r

async def test2():
    await asyncio.sleep(0.1)
    conn = await Pmysql.getconnection()
    print("test2..",id(conn))
    mysqlobj.connection = conn
    r = await mysqlobj.query("select * from person where id=(%s)",(1,))
    mysqlobj.connection.close()
    return r

我在test2()函数获取connection之前sleep(0.1)秒 此时在test()函数和test2()函数中得到的connection就是相同的了,但是脚本依然报错

1
2
3
4
5
6
7
8
test... 2286181241800
test2.. 2286181241800
Traceback (most recent call last):
  ...
  ...
  "C:\Python35\lib\site-packages\aiomysql\connection.py", line 1064, in _ensure_alive
    raise InterfaceError("(0, 'Not connected')")
pymysql.err.InterfaceError: (0, 'Not connected')

脚本报了一个0, 'Not connected'错误,这里是由于在test()和test2()函数执行完以后都执行了mysqlobj.connection.close()操作来关闭这个connection,在异步操作中,不一定谁先执行完,谁先执行完就将connection关闭,但是你关闭了,其它协程可能还会用到,所以这里就报了Not connected错误。 解决方法是将mysqlobj.connection.close()注掉,在脚本全部执行完以后统一对connection进行关闭操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
async def test():    
    conn = await Pmysql.getaconnection()
    print("test...",id(conn))
    mysqlobj.connection = conn
    r = await mysqlobj.query("select * from person")
    # mysqlobj.connection.close()
    return r

async def test2():
    await asyncio.sleep(0.1)
    conn = await Pmysql.getconnection()
    print("test2..",id(conn))
    mysqlobj.connection = conn
    r = await mysqlobj.query("select * from person where id=(%s)",(1,))
    # mysqlobj.connection.close()
    return r

async def querysum():
    result = await asyncio.gather(test(),test2())
    for i in result:
        print(i)


if __name__ == '__main__':
    mysqlobj = Pmysql()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(querysum())
    mysqlobj.connection.close()

此时得到正确的执行结果

1
2
3
4
test... 1732819440584
test2.. 1732819440584
((1, 'yang', 18), (2, 'fan', 16))
((1, 'yang', 18),)

其实这里还可以不使用全局的mysqlobj,在每次查询的时候使用各自独立的对象,使用独立的连接connection

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

async def getaconnection():
    return await aiomysql.connect(
        host='127.0.0.1',
        port=3306,
        user='root',
        password='123456',
        db='mytest',
    )


async def test():
    mysqlobj = Pmysql()
    conn = await getaconnection()
    mysqlobj.connection = conn
    print("test...",id(conn))
    mysqlobj.connection = conn
    r = await mysqlobj.query("select * from person")
    mysqlobj.connection.close()
    return r

async def test2():
    mysqlobj = Pmysql()
    conn = await getaconnection()
    mysqlobj.connection = conn
    print("test2..",id(conn))
    mysqlobj.connection = conn
    r = await mysqlobj.query("select * from person where id=(%s)",(1,))
    mysqlobj.connection.close()
    return r

async def querysum():
    result = await asyncio.gather(test(),test2())
    for i in result:
        print(i)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(querysum())

如果使用单独的对象,单独的connection,那么我们其实不需要自己来维护这套连接机制,而是使用下面要介绍的连接池操作。

使用连接池pool

使用连接池的意义在于,有一个池子,它里保持着指定数量的可用连接,当一个查询结执行之前从这个池子里取一个连接,查询结束以后将连接放回池子中,这样可以避免频繁的连接数据库,节省大量的资源。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#coding:utf-8
import traceback
import logging
import aiomysql
import asyncio
'''
mysql 异步版本
'''

logobj = logging.getLogger('mysql')

class Pmysql:
    def __init__(self):
        self.coon = None
        self.pool = None

    async def initpool(self):
        try:
            logobj.debug("will connect mysql~")
            __pool = await aiomysql.create_pool(
                    minsize=5,
                    maxsize=10,
                    host='127.0.0.1',
                    port=3306,
                    user='root',
                    password='123456',
                    db='mytest',
                    autocommit=False)
            return __pool
        except:
            logobj.error('connect error.', exc_info=True)

    async def getCurosr(self):
        conn = await self.pool.acquire()
        cur = await conn.cursor()
        return conn,cur


    async def query(self, query,param=None):
        conn,cur = await self.getCurosr()
        try:
            await cur.execute(query,param)
            return await cur.fetchall()
        except:
            logobj.error(traceback.format_exc())
        finally:
            if cur:
                await cur.close()
            # 释放掉conn,将连接放回到连接池中
            await self.pool.release(conn)

async def test():
    mysqlobj = await getAmysqlobj()
    r = await mysqlobj.query("select * from person")
    for i in r:
        print(i)
    await asyncio.sleep(6)
    r2 = await mysqlobj.query("select * from person where id = (%s)",(1,))
    print(r2)

async def getAmysqlobj():
    mysqlobj = Pmysql()
    pool = await mysqlobj.initpool()
    mysqlobj.pool = pool
    return mysqlobj

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(test())

该脚本在test方法在数据库查询操作分成了两部分,中间停了6秒钟来让mysql服务主动断开连接,当进行第二次查询的时候,并没有报2013, 'Lost connection to MySQL server during query'error,这里是由于

1
2
3
4
async def getCurosr(self):
    conn = await self.pool.acquire()
    cur = await conn.cursor()
    return conn,cur

在getCurosr方法中是从连接池中重新获取了一个可用的连接。

异步处理多任务

和单连接一样,我们这里尝试异步的处理多个任务看看情况如何

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
async def test(mysqlobj):
    r = await mysqlobj.query("select * from person")
    return r

async def test2(mysqlobj):
    r = await mysqlobj.query("select * from person where id = (%s)",(1,))
    return r

async def getAmysqlobj():
    mysqlobj = Pmysql()
    pool = await mysqlobj.initpool()
    mysqlobj.pool = pool
    return mysqlobj

async def querysum():
    mysqlobj = await getAmysqlobj()
    result = await asyncio.gather(test(mysqlobj),test2(mysqlobj))
    for i in result:
        print(i)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(querysum())

注意这里在运行test()和test2()放入的是同一个mysqlobj,但是它们在进行查询的时候都重新通过pool.acquire()重新获取连接和游标,这样它们相互之间不互影响,可以各自进行各自的查询。

aiomysql 的使用初步就讲到这里,之后我会介绍一下在tornado中如何异步的使用aiomysql进行查询。