阅读aiosplite源码笔记

在GIthub是看到这样一个aiospilte的库,具体是将sqlite改成异步的。发现源码体积挺小的,遂看一下,看完后发现作者脑路新奇。
作者主要使用了代理模式,代理了ConnectionCursor这两个sqlite3的类。但是作者新奇就在于,其实这是一个披着异步外衣的多线程。作者对于每一个连接都新建一个独立线程。仅仅使用异步作为询问,而实际费IO的操作是在独立的线程中。

作者的Connection类继承自threading.Thread

使用Python的上下文管理器:

1
2
3
4
5
6
7
8
async def __aenter__(self) -> 'Connection':
self.start()
await self._connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.close()
self._conn = None

可见,每开一个连接,就创建一个新的独立的线程。对于每个独立的线程,执行run()方法,self._txQueue,放置着包装好的待执行的函数。可见,所有的实际操作fn()是阻塞的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def run(self) -> None:
"""Execute function calls on a separate thread."""
while self._running:
try:
fn = self._tx.get(timeout=0.1)
except Empty:
continue
try:
Log.debug('executing %s', fn)
result = fn()
Log.debug('returning %s', result)
self._rx.put(result)
except Exception as e:
Log.exception('returning exception %s', e)
self._rx.put(e)

而所谓的异步的_execute(),它仅仅作为询问用,它每隔一段时间看看结果队列self._rx是否有结果,有结果即返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
async def _execute(self, fn, *args, **kwargs):
"""Queue a function with the given arguments for execution."""
await self._lock.acquire()
pt = partial(fn, *args, **kwargs)
self._tx.put_nowait(pt)
while True:
try:
result = self._rx.get_nowait()
break
except Empty:
await asyncio.sleep(0.1)
continue
self._lock.release()
if isinstance(result, Exception):
raise result
return result

作者还在这里加了锁。作者这样做是为了防止结果的混乱。举个例子,如下代码,可能因为await asyncio.sleep(0.1),而造成cursor2拿到的是table1的结果。值得注意的是,如果同一EventLoop里面有其他需要获得锁的协程,可能会导致Bug的出现。

1
2
3
async with aiosqlite.connect(...) as db:
cursor1 = await db.execute('SELECT * FROM table1')
cursor2 = await db.execute('SELECT * FROM table2')
文章目录
|