一种新奇的线程队列轮询方法

利用隐藏的环回(loopback)网络连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import queue
import socket
import os
class PollableQueue(queue.Queue):
# 定义一种新的Queue,底层有一对互联的socket
def __init__(self):
super().__init__()
if os.name == 'posix':
self._putsocket, self._getsocket = socket.socketpair()
else:
# non-POSIX 系统
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('127.0.0.1', 0))
server.listen(1)
# 创建一个服务器socket,之后立刻创建客户端socket并连接到服务器上
self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._putsocket.connect(server.getsockname())
self._getsocket, _ = server.accept()
server.close()
def fileno(self):
# 返回套接字的文件描述符
return self._getsocket.fileno()
def put(self, item):
super().put(item)
self._putsocket.send(b'x')
def get(self):
self._getsocket.recv(1)
return super().get()
if __name__ == '__main__':
import select
import threading
import time
def consumer(queues):
while True:
# 使用select轮询
can_read, _, _ = select.select(queues,[],[])
for r in can_read:
item = r.get()
print('Got:', item)
q1 = PollableQueue()
q2 = PollableQueue()
q3 = PollableQueue()
t = threading.Thread(target=consumer, args=([q1,q2,q3],))
t.daemon = True
t.start()
q1.put(1)
q2.put(10)
q3.put('hello')
q2.put(15)
time.sleep(1)

尽管底层的IO会带来一点点负载。但是
如果不采用这样的socket技术,另一个选择就是遍历所有的队列,分辨每个队列是否为空,还要用上定时器。
但是这样不仅麻烦,如果跟其他轮询对象比如socket一起使用的话。不能在同一地位上。

文章目录
|