Python asyncio事件循环的底层实现
asyncio的事件循环通过多个系统调用来实现IO多路复用。在Linux上使用epoll,在macOS上使用kqueue,在Windows上使用IOCP。这些系统调用由selector模块封装。
SelectorSelector的核心接口:
import selectors
sel = selectors.DefaultSelector()
# 在Linux上等价于 sel = selectors.EpollSelector()
注册文件描述符的事件监听:
import socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('localhost', 8888))
server.listen(100)
server.setblocking(False)
sel.register(server, selectors.EVENT_READ)
def accept_connection(sock):
conn, addr = sock.accept()
conn.setblocking(False)
sel.register(conn, selectors.EVENT_READ, read_handler)
while True:
events = sel.select(timeout=1) # 阻塞直到有事件发生或超时
for key, mask in events:
if key.fileobj is server:
accept_connection(server)
else:
handler = key.data
handler(key.fileobj)
sel.select返回就绪事件列表。在epoll实现中,调用epoll_wait系统调用,挂起当前线程直到至少一个事件就绪或超时。
epoll的事件循环伪代码:
import os
import array
EPOLL_CTL_ADD = 1
EPOLLIN = 0x001
epoll_fd = os.epoll_create()
# 注册事件
os.epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server.fileno(), EPOLLIN)
MAX_EVENTS = 64
while True:
events = os.epoll_wait(epoll_fd, MAX_EVENTS, -1)
for fd, event_mask in events:
if fd == server.fileno():
conn, addr = server.accept()
os.epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn.fileno(), EPOLLIN)
conn.setblocking(False)
else:
data = os.read(fd, 4096)
if data:
os.write(fd, data)
else:
os.epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, 0)
os.close(fd)
os.epoll_wait的超时参数:-1表示无限阻塞直到有事件,0表示立即返回(非阻塞),正数表示最多等待多少毫秒。
asyncio内部将上述的select/poll/epoll/kqueue封装为事件循环。核心数据结构是三个队列:
import heapq
import time
from collections import deque
class MockEventLoop:
def __init__(self):
self._ready = deque() # 就绪队列
self._scheduled = [] # 定时器堆
self._stopping = False
self._debug = False
def call_soon(self, callback, *args, context=None):
h = Handle(callback, args, context or self._get_context())
self._ready.append(h)
return h
def call_later(self, delay, callback, *args):
when = time.monotonic() + delay
timer = TimerHandle(when, callback, args, self._get_context())
heapq.heappush(self._scheduled, timer)
return timer
def _run_once(self):
now = time.monotonic()
# 将到期的定时器移动到就绪队列
while self._scheduled:
timer = self._scheduled[0]
if timer.when > now:
break
heapq.heappop(self._scheduled)
self._ready.append(timer)
# 如果没有就绪事件,等待IO
if not self._ready:
timeout = 0
if self._scheduled:
timeout = max(0, self._scheduled[0].when - time.monotonic())
events = self._selector.select(timeout)
for key, mask in events:
callback = key.data
self._ready.append(callback)
# 执行所有就绪回调
ntodo = len(self._ready)
for _ in range(ntodo):
handle = self._ready.popleft()
handle._run()
def run_forever(self):
while not self._stopping:
self._run_once()
每一步的执行顺序是:处理定时器、等待IO事件、执行就绪回调。这个_ready队列是一个deque,所以回调是先进先出的。
asyncio的事件循环在多线程中的行为:
import asyncio
import threading
loop = asyncio.new_event_loop()
def run_loop():
asyncio.set_event_loop(loop)
loop.run_forever()
t = threading.Thread(target=run_loop, daemon=True)
t.start()
# 从其他线程提交任务
async def async_task():
return 42
future = asyncio.run_coroutine_threadsafe(async_task(), loop)
result = future.result(timeout=5)
print(result)
run_coroutine_threadsafe通过线程安全的队列将任务提交到事件循环线程:
def run_coroutine_threadsafe(coro, loop):
future = loop.create_future()
def callback():
task = asyncio.ensure_future(coro, loop=loop)
task.add_done_callback(lambda t: _set_result(future, t))
loop.call_soon_threadsafe(callback)
return future
def call_soon_threadsafe(self, callback, *args):
handle = self.call_soon(callback, *args)
self._write_to_self(handle)
return handle
call_soon_threadsafe通过self管道(self-pipe trick)唤醒事件循环。往管道中写入一个字节,事件循环的select在I/O事件就绪时被唤醒。
asyncio事件的内部管道:
def _write_to_self(self):
try:
os.write(self._ssock.fileno(), b'\x00')
except OSError:
pass
def _read_from_self(self):
try:
os.read(self._csock.fileno(), 4096)
except OSError:
pass
关闭事件循环的机制:
loop.stop() # 设置_stopping标志,当前run_once后退出
loop.close() # 更彻底地关闭
# 1. 取消所有待处理的任务
# 2. 关闭异步生成器
# 3. 关闭executor的线程池
# 4. 关闭selector
loop.close的调用顺序:
def close(self):
try:
self._executor.shutdown(wait=True)
self._selector.close()
self._make_self_pipe() # 再次创建self-pipe(为下次使用准备)
# 但实际上close之后loop不能再使用
finally:
self._closed = True
已经close的事件循环再调用run_forever会抛出RuntimeError('Event loop is closed')。
asyncio.all_tasks列出所有未完成的任务:
async def slow_task(name, delay):
await asyncio.sleep(delay)
return f"{name} done"
async def main():
task1 = asyncio.create_task(slow_task("A", 5))
task2 = asyncio.create_task(slow_task("B", 3))
pending = asyncio.all_tasks()
print(f"待处理的任务: {len(pending)}")
for t in pending:
print(f" - {t.get_name()}: {t._coro}")
result = await asyncio.gather(task1, task2, return_exceptions=True)
print(result)
asyncio.run(main())
Task的__step方法在每次协程yield后被调用。事件循环通过重复调用__step来推进协程的执行:
class Task(Future):
def __step(self, exc=None):
coro = self._coro
try:
if exc is None:
result = coro.send(None)
else:
result = coro.throw(exc)
except StopIteration as exc:
self.set_result(exc.value)
except CancelledError as exc:
super().cancel(msg=exc.args[0])
except Exception as exc:
self.set_exception(exc)
else:
if result is self:
# 协程返回自身,调度器重新调度
self._loop.call_soon(self.__step)
elif isinstance(result, Future):
result.add_done_callback(self.__wakeup)
else:
self._loop.call_soon(self.__step)
关键逻辑:如果result是Future,注册回调等待Future完成;如果result是其他值,用call_soon立即重新调度。
Python 3.12移除了一些已废弃的事件循环实现:
try:
loop = asyncio.get_event_loop() # Python 3.12中仍有,但不推荐
except DeprecationWarning:
loop = asyncio.new_event_loop() # 推荐方式
asyncio.new_event_loop创建新的SelectorEventLoop实例(Unix)或ProactorEventLoop实例(Windows)。asyncio.get_running_loop获取当前正在运行的事件循环,是正确的方式。
Windows的ProactorEventLoop使用IOCP,而不是select。IOCP是Windows上最高效的IO事件通知机制:
import asyncio
import sys
if sys.platform == 'win32':
loop = asyncio.ProactorEventLoop()
else:
loop = asyncio.SelectorEventLoop()
asyncio.set_event_loop(loop)
asyncio.run()在Python 3.10+中自动选择正确的事件循环实现。不需要手动判断平台。
uvloop是libuv封装的事件循环,比asyncio默认的事件循环快约2倍:
import uvloop
import asyncio
async def main():
await asyncio.sleep(1)
return "done"
# uvloop替代默认事件循环
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
result = asyncio.run(main())
uvloop用Cython实现,将libuv的C回调映射为Python的协程。libuv是Node.js使用的事件循环库。
Python asyncio事件循环的底层实现