目录

pymongo中的并发问题

在写一个系统时不得不考虑高并发时数据库的写入准确性问题,拿mongodb来说,比如要插入一条数据,如果存在则更新,如果不存在则插入新数据,如果在多线程会有哪些问题呢?

多线程中使用pymongo

我们尝试在两个线程中做同一件事,当数据存在的时候进行更新,当数据不存在的时候进行插入

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#coding:utf-8
import pymongo
import threading


mclient = pymongo.MongoClient("127.0.0.1",27017)
mobj = mclient['dockerTest']

def insetData(name,data):
    mobj["yyxtest"].find_one_and_update(
        {"name":name},
        {"$set":data},
        upsert=True
    )

if __name__ == '__main__':
    th1 = threading.Thread(target=insetData,args=("yang",{"age":18}))
    th2 = threading.Thread(target=insetData,args=("yang",{"age":20}))
    th1.start()
    th2.start()
    for i in [th1,th2]:
        i.join()
    print("all done")

如果按照我们的设计思路,数据库里应该只会有一条记录,但是实际情况是数据库里新增了两条记录

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
/* 2 */
{
    "_id" : ObjectId("5dff8a311c89c4dc02845c71"),
    "name" : "yang",
    "age" : 18
}

/* 3 */
{
    "_id" : ObjectId("5dff8a311c89c4dc02845c72"),
    "name" : "yang",
    "age" : 20
}

上面的问题主要是由于 find_one_and_update 中的"找"和"改"不是原子性操作,当有两个线程执行这个方法时,此时它们都没有查找到,然后就都执行了新的插入数据操作,解决办法有以下几种:

  1. 线程加锁
  2. 使用原子操作
  3. 使用multi参数
  4. 使用事务

线程加锁

这个应该是最简单的了,把find_one_and_update操作加上锁,同时只有一个线程在执行,查找和更新操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#coding:utf-8
import pymongo
import threading


mclient = pymongo.MongoClient("127.0.0.1",27017)
mobj = mclient['dockerTest']

def insetData(name,data,lock):
    lock.acquire()
    mobj["yyxtest"].find_one_and_update(
        {"name":name},
        {"$set":data},
        upsert=True
    )
    lock.release()


if __name__ == '__main__':
    thlock = threading.Lock()
    th1 = threading.Thread(target=insetData,args=("fjy",{"age":18},thlock))
    th2 = threading.Thread(target=insetData,args=("fjy",{"age":20},thlock))
    th1.start()
    th2.start()
    for i in [th1,th2]:
        i.join()
    print("all done")

使用原子操作

mongodb也自带一些原子性的复合操作,但是只有特定的一些方法才是原子的,比如上面的find_one_and_update 可以使用原子的update 方法

1
2
3
4
5
6
def insetData(name,data):
    mobj["yyxtest"].update(
        {"name":name},
        {"$set":data},
        upsert=True
    )

但是update方法现在已经是DEPRECATED,文档上推荐使用update_one,replace_one方法,但这些方法都不是原子操作方法。

使用multi参数

find_one_and_update 可以传不定参数,这里我们使用 multi=True 参数,其本质还是加锁操作。

1
2
3
4
5
6
7
def insetData(name,data):
    mobj["yyxtest"].find_one_and_update(
        {"name":name},
        {"$set":data},
        upsert=True,
        multi=True,
    )

使用事务

mongodb4.0 开始支持事务了,但是支持事务的mongodb有很多条件,得部署在集群环境,有副本集,且开户的Session

1
2
3
4
5
6
7
8
9
def insetData(name,data):
    with mclient.start_session() as session:
        with session.start_transaction():
            mobj["yyxtest"].find_one_and_update(
                {"name":name},
                {"$set":data},
                upsert=True,
                session=session
            )

或者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
def insetData(name,data):
    session = client.start_session()
    session.start_transaction()
    try:
        mobj["yyxtest"].find_one_and_update(
            {"name": name},
            {"$set": data},
            upsert=True,
        )
    except:
        session.abort_transaction()
    else:
        session.commit_transaction()
    finally:
        session.end_session()

以前几种方法都可以解决并发场景下的数据不准确的问题,实际的使用中可能还会有更多的问题,需要多多考虑。