相信对于队列的概念大家都不会陌生,这种先入先出的数据结构应用很广泛,像一般的生产消费都会用到队列,关于Queue的用法介绍可以参考我之前的文章 python中的Queue与多进程(multiprocessing)还有栈,栈是一种先入后出的数据结构,面优先队列有别于普通的队列与栈,在实现上,它一般通过堆这一数据结构,而堆其实是一种完全二叉树,它会对进入容器的元素进行排序(根据事先指定的规则),出队的顺序则会是二叉树的根结点代表的元素。接下来介绍几种优先队列的实现。

通过heapq模块

heapq是一个二叉堆的实现,它内部使用内置的list对象,它无论插入还是获取最小元素复杂度都在O(log n)。这里主要用到它的heappushheappop方法,heappush 方法需要传入两个参数,一个是列表(list),另外是一个对象,这里的对象须是可比较对象,就是它可以通过cmp方法来比较大小,以下是在 python2 中的代码实现

1
2
3
4
5
6
7
8
9
10
11
12
#coding:gbk
import heapq

tasks = []
heapq.heappush(tasks,(10,'aaa'))
heapq.heappush(tasks,(40,'bbb'))
heapq.heappush(tasks,(30,'ccc'))
heapq.heappush(tasks,(20,'ddd'))

while tasks:
task = heapq.heappop(tasks)
print(task)

运行结果如下

1
2
3
4
(10, 'aaa')
(20, 'ddd')
(30, 'ccc')
(40, 'bbb')

可以看到,我放入 tasks 列表里的元素是个 set 对象,对象第一个元素是个 int 类型的数字,如果使用cmp方法进行比较的话

1
2
3
4
5
6
>>> cmp(10,20)
-1
>>> cmp(10,10)
0
>>> cmp(10,5)
1

对于小于,等于,大于分别返回的是-1,0,1,其实这也是在定义sorted的实现方法,

1
2
3
4
5
6
7
8
>>> sorted([(10,'aaaa'),(30,'bbbb')])
[(10, 'aaaa'), (30, 'bbbb')]
>>> sorted([(40,'aaaa'),(30,'bbbb')])
[(30, 'bbbb'), (40, 'aaaa')]
>>> sorted([(30,'aaaa'),(30,'bbbb')])
[(30, 'aaaa'), (30, 'bbbb')]
>>> sorted([(30,'bbbb'),(30,'abbb')])
[(30, 'abbb'), (30, 'bbbb')]

可以看到在sorted方法里,它的排序算法是通过比较第一个元素的大小,小的排在前面,第一个元素相同再比较第二个元素,看返回之前的代码,heapq.heappush 将 set 元素添加到列表元素以后,将对其进行重新排序,将最小的放在前面,于是就得到了上面的打印结果。

上面是使用python自带的 set 数据结构,可否自定义一种类型呢,比较在实现生活中,在上班的第一件事是给自已写一下今天要完成哪些事情,其实哪些事情的优先级比较高就是先做哪些事情,其实在上面也说到 sorted 方法,这个方法其实就是在调用对象的 __cmp__ 方法,好么我可以单独定义一个带有 __cmp__ 方法的对象则可以实现优先队列中的对象排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#coding:gbk
import heapq

# 使用heapq实现优先队列
#定义一个可比较对象
class CompareAble:
def __init__(self,priority,jobname):
self.priority = priority
self.jobname = jobname

def __cmp__(self, other):
if self.priority < other.priority:
return -1
elif self.priority == other.priority:
return 0
else:
return 1


joblist = []

heapq.heappush(joblist,CompareAble(80,'eat'))
heapq.heappush(joblist,CompareAble(70,'a write plan2'))
heapq.heappush(joblist,CompareAble(70,'write plan'))
heapq.heappush(joblist,CompareAble(90,'sleep'))
heapq.heappush(joblist,CompareAble(100,'write code'))

while joblist:
task = heapq.heappop(joblist)
print(task.priority,task.jobname)

运行结果:

1
2
3
4
5
(70, 'write plan')
(70, 'a write plan2')
(80, 'eat')
(90, 'sleep')
(100, 'write code')

上面的compareAble 类初始化有两个参数,一个是优先级,一个是事情的名字,我这里定义的是优先级数值越小排序越靠前,也可以定义成数值越大越靠前。如果优先级相同,则按照插入顺序来排序。

通过Queue,PriorityQueue类型实现

这个优先级队列内部使用了heapq,不同的是PriorityQueue的操作是同步的,提供锁操作,支持并发的生产者和消费者,而且它的接口更加友好,它继承自Queue,所以好多Queue的方法可以直接使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#coding:gbk
import heapq
from queue import Queue,PriorityQueue

# 使用heapq实现优先队列
#定义一个可比较对象
class CompareAble:
def __init__(self,priority,jobname):
self.priority = priority
self.jobname = jobname

def __cmp__(self, other):
if self.priority < other.priority:
return -1
elif self.priority == other.priority:
return 0
else:
return 1


pq = PriorityQueue()
pq.put(CompareAble(80,'eat'))
pq.put(CompareAble(70,'a write plan2'))
pq.put(CompareAble(70,'write plan'))
pq.put(CompareAble(90,'sleep'))
pq.put(CompareAble(100,'write code'))

while pq.qsize()!= 0:
task = pq.get_nowait()
print(task.jobname,task.priority)

接下来通过一个生产消费的实例来说明优先队列的使用

有三个生产者和二个消费者,生产者向队列中生产有优先级的任务,消费者也是优先消费高级别的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#coding:gbk
from queue import PriorityQueue
import time
import random
import threading

# 使用heapq实现优先队列
#定义一个可比较对象
class CompareAble:
def __init__(self,priority,jobname):
self.priority = priority
self.jobname = jobname

def __cmp__(self, other):
if self.priority < other.priority:
return -1
elif self.priority == other.priority:
return 0
else:
return 1

tasks = [(i, "do task %s"%i) for i in range(10,100,5)]
def produce(pq,lock):
while True:
lock.acquire()
task = tasks[random.randint(0,len(tasks)-1)]
print('put %s %s in pq'%(task[0],task[1]))
pq.put(CompareAble(task[0],task[1]))
time.sleep(1)
lock.release()

def consumer(pq,lock):
while True:
lock.acquire()
task = pq.get_nowait()
if task:
print(task.priority, task.jobname)
else:
time.sleep(1)
lock.release()

if __name__ == '__main__':
task_queue = PriorityQueue()
task_lock = threading.Lock()
for i in range(3):
t = threading.Thread(target=produce,args=(task_queue,task_lock))
t.setDaemon(False)
t.start()
for i in range(2):
t = threading.Thread(target=consumer,args=(task_queue,task_lock))
t.setDaemon(False)
t.start()

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
put 30 do task 30 in pq
put 20 do task 20 in pq
put 75 do task 75 in pq
(20, 'do task 20')
(30, 'do task 30')
put 20 do task 20 in pq
put 15 do task 15 in pq
put 70 do task 70 in pq
(15, 'do task 15')
(20, 'do task 20')
put 85 do task 85 in pq
put 10 do task 10 in pq
put 30 do task 30 in pq
(10, 'do task 10')
(30, 'do task 30')
put 70 do task 70 in pq
put 10 do task 10 in pq
put 55 do task 55 in pq
(10, 'do task 10')
(55, 'do task 55')
put 20 do task 20 in pq
put 45 do task 45 in pq
put 75 do task 75 in pq
(20, 'do task 20')
(45, 'do task 45')
put 40 do task 40 in pq
put 40 do task 40 in pq
...

可以看出,每次取出来的都是当前队列中 priority 最小的数

python3 中的使用方法

上面的代码无法在python3中运行,主要是因为python3没有cmp方法,运行得到的异常信息是

1
TypeError: unorderable types: CompareAble() < CompareAble()

就是没有一个 < 的操作

需要在上面定义一个 __lt__ 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#coding:gbk
from queue import PriorityQueue
import time
import random
import threading

# 使用heapq实现优先队列
#定义一个可比较对象
class CompareAble:
def __init__(self,priority,jobname):
self.priority = priority
self.jobname = jobname

# def __cmp__(self, other):
# if self.priority < other.priority:
# return -1
# elif self.priority == other.priority:
# return 0
# else:
# return 1

def __lt__(self, other):
if self.priority <= other.priority:
return False
else:
return True

tasks = [(i, "do task %s"%i) for i in range(10,100,5)]
def produce(pq,lock):
while True:
lock.acquire()
task = tasks[random.randint(0,len(tasks)-1)]
print('put %s %s in pq'%(task[0],task[1]))
pq.put(CompareAble(task[0],task[1]))
lock.release()
time.sleep(1)

def consumer(pq,lock):
while True:
lock.acquire()
try:
if pq.empty():
continue
task = pq.get_nowait()
if task:
print(task.priority, task.jobname)
finally:
lock.release()
time.sleep(1)

if __name__ == '__main__':
task_queue = PriorityQueue()
task_lock = threading.Lock()
for i in range(3):
t = threading.Thread(target=produce,args=(task_queue,task_lock))
t.setDaemon(False)
t.start()
for i in range(2):
t = threading.Thread(target=consumer,args=(task_queue,task_lock))
t.setDaemon(False)
t.start()

上面的代码我修改了一点对于大小的判断,与之前的是反的,这里 priority 越大则越先返回,上面的代码在 python2 中也可以运行,所有如果为了兼容性可以选择定义使用 __lt__ 方法。

参考文章
用Python实现优先级队列的3种方法
python的优先队列示例