在写一个系统时不得不考虑高并发时数据库的写入准确性问题,拿mongodb来说,比如要插入一条数据,如果存在则更新,如果不存在则插入新数据,如果在多线程会有哪些问题呢?

多线程中使用pymongo

我们尝试在两个线程中做同一件事,当数据存在的时候进行更新,当数据不存在的时候进行插入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#coding:utf-8
import pymongo
import threading


mclient = pymongo.MongoClient("127.0.0.1",27017)
mobj = mclient['dockerTest']

def insetData(name,data):
mobj["yyxtest"].find_one_and_update(
{"name":name},
{"$set":data},
upsert=True
)

if __name__ == '__main__':
th1 = threading.Thread(target=insetData,args=("yang",{"age":18}))
th2 = threading.Thread(target=insetData,args=("yang",{"age":20}))
th1.start()
th2.start()
for i in [th1,th2]:
i.join()
print("all done")

如果按照我们的设计思路,数据库里应该只会有一条记录,但是实际情况是数据库里新增了两条记录

1
2
3
4
5
6
7
8
9
10
11
12
13
/* 2 */
{
"_id" : ObjectId("5dff8a311c89c4dc02845c71"),
"name" : "yang",
"age" : 18
}

/* 3 */
{
"_id" : ObjectId("5dff8a311c89c4dc02845c72"),
"name" : "yang",
"age" : 20
}

上面的问题主要是由于 find_one_and_update 中的”找”和”改”不是原子性操作,当有两个线程执行这个方法时,此时它们都没有查找到,然后就都执行了新的插入数据操作,解决办法有以下几种:

  1. 线程加锁
  2. 使用原子操作
  3. 使用multi参数
  4. 使用事务

线程加锁

这个应该是最简单的了,把find_one_and_update操作加上锁,同时只有一个线程在执行,查找和更新操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#coding:utf-8
import pymongo
import threading


mclient = pymongo.MongoClient("127.0.0.1",27017)
mobj = mclient['dockerTest']

def insetData(name,data,lock):
lock.acquire()
mobj["yyxtest"].find_one_and_update(
{"name":name},
{"$set":data},
upsert=True
)
lock.release()


if __name__ == '__main__':
thlock = threading.Lock()
th1 = threading.Thread(target=insetData,args=("fjy",{"age":18},thlock))
th2 = threading.Thread(target=insetData,args=("fjy",{"age":20},thlock))
th1.start()
th2.start()
for i in [th1,th2]:
i.join()
print("all done")

使用原子操作

mongodb也自带一些原子性的复合操作,但是只有特定的一些方法才是原子的,比如上面的find_one_and_update 可以使用原子的update 方法

1
2
3
4
5
6
def insetData(name,data):
mobj["yyxtest"].update(
{"name":name},
{"$set":data},
upsert=True
)

但是update方法现在已经是DEPRECATED,文档上推荐使用update_one,replace_one方法,但这些方法都不是原子操作方法。

使用multi参数

find_one_and_update 可以传不定参数,这里我们使用 multi=True 参数,其本质还是加锁操作。

1
2
3
4
5
6
7
def insetData(name,data):
mobj["yyxtest"].find_one_and_update(
{"name":name},
{"$set":data},
upsert=True,
multi=True,
)

使用事务

mongodb4.0 开始支持事务了,但是支持事务的mongodb有很多条件,得部署在集群环境,有副本集,且开户的Session

1
2
3
4
5
6
7
8
9
def insetData(name,data):
with mclient.start_session() as session:
with session.start_transaction():
mobj["yyxtest"].find_one_and_update(
{"name":name},
{"$set":data},
upsert=True,
session=session
)

或者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def insetData(name,data):
session = client.start_session()
session.start_transaction()
try:
mobj["yyxtest"].find_one_and_update(
{"name": name},
{"$set": data},
upsert=True,
)
except:
session.abort_transaction()
else:
session.commit_transaction()
finally:
session.end_session()

以前几种方法都可以解决并发场景下的数据不准确的问题,实际的使用中可能还会有更多的问题,需要多多考虑。