pymongo中的并发问题
在写一个系统时不得不考虑高并发时数据库的写入准确性问题,拿mongodb来说,比如要插入一条数据,如果存在则更新,如果不存在则插入新数据,如果在多线程会有哪些问题呢?
多线程中使用pymongo
我们尝试在两个线程中做同一件事,当数据存在的时候进行更新,当数据不存在的时候进行插入
| #coding:utf-8
import pymongo
import threading
mclient = pymongo.MongoClient("127.0.0.1",27017)
mobj = mclient['dockerTest']
def insetData(name,data):
mobj["yyxtest"].find_one_and_update(
{"name":name},
{"$set":data},
upsert=True
)
if __name__ == '__main__':
th1 = threading.Thread(target=insetData,args=("yang",{"age":18}))
th2 = threading.Thread(target=insetData,args=("yang",{"age":20}))
th1.start()
th2.start()
for i in [th1,th2]:
i.join()
print("all done")
|
如果按照我们的设计思路,数据库里应该只会有一条记录,但是实际情况是数据库里新增了两条记录
| /* 2 */
{
"_id" : ObjectId("5dff8a311c89c4dc02845c71"),
"name" : "yang",
"age" : 18
}
/* 3 */
{
"_id" : ObjectId("5dff8a311c89c4dc02845c72"),
"name" : "yang",
"age" : 20
}
|
上面的问题主要是由于 find_one_and_update
中的"找"和"改"不是原子性操作,当有两个线程执行这个方法时,此时它们都没有查找到,然后就都执行了新的插入数据操作,解决办法有以下几种:
1. 线程加锁
2. 使用原子操作
3. 使用multi参数
4. 使用事务
线程加锁
这个应该是最简单的了,把find_one_and_update
操作加上锁,同时只有一个线程在执行,查找和更新操作
| #coding:utf-8
import pymongo
import threading
mclient = pymongo.MongoClient("127.0.0.1",27017)
mobj = mclient['dockerTest']
def insetData(name,data,lock):
lock.acquire()
mobj["yyxtest"].find_one_and_update(
{"name":name},
{"$set":data},
upsert=True
)
lock.release()
if __name__ == '__main__':
thlock = threading.Lock()
th1 = threading.Thread(target=insetData,args=("fjy",{"age":18},thlock))
th2 = threading.Thread(target=insetData,args=("fjy",{"age":20},thlock))
th1.start()
th2.start()
for i in [th1,th2]:
i.join()
print("all done")
|
使用原子操作
mongodb也自带一些原子性的复合操作,但是只有特定的一些方法才是原子的,比如上面的find_one_and_update
可以使用原子的update
方法
| def insetData(name,data):
mobj["yyxtest"].update(
{"name":name},
{"$set":data},
upsert=True
)
|
但是update方法现在已经是DEPRECATED
,文档上推荐使用update_one,replace_one方法,但这些方法都不是原子操作方法。
使用multi参数
find_one_and_update
可以传不定参数,这里我们使用 multi=True
参数,其本质还是加锁操作。
| def insetData(name,data):
mobj["yyxtest"].find_one_and_update(
{"name":name},
{"$set":data},
upsert=True,
multi=True,
)
|
使用事务
mongodb4.0 开始支持事务了,但是支持事务的mongodb有很多条件,得部署在集群环境,有副本集,且开户的Session
| def insetData(name,data):
with mclient.start_session() as session:
with session.start_transaction():
mobj["yyxtest"].find_one_and_update(
{"name":name},
{"$set":data},
upsert=True,
session=session
)
|
或者
| def insetData(name,data):
session = client.start_session()
session.start_transaction()
try:
mobj["yyxtest"].find_one_and_update(
{"name": name},
{"$set": data},
upsert=True,
)
except:
session.abort_transaction()
else:
session.commit_transaction()
finally:
session.end_session()
|
以前几种方法都可以解决并发场景下的数据不准确的问题,实际的使用中可能还会有更多的问题,需要多多考虑。