【Python 协程系列】asyncio 源码分析

参考资料

前言

事件驱动主要是将耗时的 I/O 操作转换为待处理的事件,当某个事件触发后会调用相应的回调。

  • 单线程中处理非常大的并发量
  • 非同步运行,代码复杂,调试困难
  • 遇到代码耗时操作/三方库阻塞,会导致整个事件循环阻塞

从 Py3.3 开始,代号 Tulip 的项目开始启动,一直到 Py3.6 才最终成为正式系统库——asyncio。
该模块中提供了一些基础设施:

  • event loop
  • transport and protocol abstractions
  • a higher-level schedule

从 demo 开始

1
2
3
4
5
6
7
8
9
10
import asyncio

async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y

loop = asyncio.get_event_loop()
loop.run_until_complete(compute(1, 2))
loop.close()

如上,利用async def 创建 Native Coroutine ,获取 event loop,执行 Coroutine,结束监听。

get_event_loop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# loop = asyncio.get_event_loop()

def get_event_loop():
""" Return an asyncio event loop. """

# 利用 threading.local() 记录 running_loop, pid
current_loop = _get_running_loop()
if current_loop is not None:
return current_loop

# 默认为 DefaultEventLoopPolicy().get_event_loop()
return get_event_loop_policy().get_event_loop()

# windows_events.py.775
class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
_loop_factory = SelectorEventLoop

DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy

# unix_events.py.1083
class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
_loop_factory = _UnixSelectorEventLoop
...
DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy

默认的 DefaultEventLoopPolicy 是一个区分平台的类,实例化后调用基类中的 get_event_loop 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
_loop_factory = None

class _Local(threading.local):
_loop = None
_set_called = False

def __init__(self):
self._local = self._Local()

def get_event_loop(self):
if (self._local._loop is None and
not self._local._set_called and
isinstance(threading.current_thread(), threading._MainThread)):
self.set_event_loop(self.new_event_loop())
if self._local._loop is None:
raise RuntimeError('There is no current event loop in thread %r.'
% threading.current_thread().name)
return self._local._loop

def set_event_loop(self, loop):
self._local._set_called = True
self._local._loop = loop

def new_event_loop(self):
return self._loop_factory()

在初始化后,同样产生一个线程变量来保存 loop。从代码中可以看出,仅主线程能自动产生 event_loop,对等线程尝试获取 loop 将报错。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
"""UNIX event loop policy with a watcher for child processes."""
_loop_factory = _UnixSelectorEventLoop

def __init__(self):
super().__init__()
self._watcher = None

def _init_watcher(self):
with events._lock:
if self._watcher is None: # pragma: no branch
self._watcher = SafeChildWatcher()
if isinstance(threading.current_thread(),
threading._MainThread):
self._watcher.attach_loop(self._local._loop)

def set_event_loop(self, loop):
"""Set the event loop.

As a side effect, if a child watcher was set before, then calling
.set_event_loop() from the main thread will call .attach_loop(loop) on
the child watcher.
"""
super().set_event_loop(loop)

if self._watcher is not None and \
isinstance(threading.current_thread(), threading._MainThread):
self._watcher.attach_loop(loop)

def get_child_watcher(self):
if self._watcher is None:
self._init_watcher()
return self._watcher

def set_child_watcher(self, watcher):
if self._watcher is not None:
self._watcher.close()
self._watcher = watcher

在 Unix 平台上,_UnixDefaultEventLoopPolicy 对基类的set_event_loop进行了覆盖。增加了对子进程的 watcher。
若提前在对等线程中执行了_watcher=SafeChildWatcher(),在主线程 set loop 时,会调用 attach_loop,添加信号处理机制。

最终返回的 loop,是一个 _UnixSelectorEventLoop 实例对象。

Loop 实例化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
"""Unix event loop.

Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
"""
def __init__(self, selector=None):
super().__init__(selector)
self._signal_handlers = {}

class BaseSelectorEventLoop(base_events.BaseEventLoop):
"""Selector event loop.

See events.EventLoop for API specification.
"""

def __init__(self, selector=None):
super().__init__()

if selector is None:
"""
if 'KqueueSelector' in globals():
DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
DefaultSelector = PollSelector
else:
DefaultSelector = SelectSelector
"""
selector = selectors.DefaultSelector()
logger.debug('Using selector: %s', selector.__class__.__name__)
self._selector = selector
self._make_self_pipe()
self._transports = weakref.WeakValueDictionary()

class BaseEventLoop(events.AbstractEventLoop):
def __init__(self):
self._timer_cancelled_count = 0
self._closed = False
self._stopping = False
self._ready = collections.deque()
self._scheduled = []
self._default_executor = None
self._internal_fds = 0
# Identifier of the thread running the event loop, or None if the
# event loop is not running
self._thread_id = None
self._clock_resolution = time.get_clock_info('monotonic').resolution
self._exception_handler = None
self.set_debug((not sys.flags.ignore_environment
and bool(os.environ.get('PYTHONASYNCIODEBUG'))))
# In debug mode, if the execution of a callback or a step of a task
# exceed this duration in seconds, the slow callback/task is logged.
self.slow_callback_duration = 0.1
self._current_handle = None
self._task_factory = None
self._coroutine_wrapper_set = False

if hasattr(sys, 'get_asyncgen_hooks'):
# Python >= 3.6
# A weak set of all asynchronous generators that are
# being iterated by the loop.
self._asyncgens = weakref.WeakSet()
else:
self._asyncgens = None

# Set to True when `loop.shutdown_asyncgens` is called.
self._asyncgens_shutdown_called = False

在 _UnixSelectorEventLoop 实例化过程中:

  • events.AbstractEventLoop 定义了抽象类接口
  • BaseEventLoop 实现抽象类接口,定义了大量的状态量
  • BaseSelectorEventLoop 创建 selector,实现kqueue/epoll/pool
  • _UnixSelectorEventLoop 添加了一些信号处理及 UNIX 套接字支持

run_until_complete

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# loop.run_until_complete(compute(1, 2))

class BaseEventLoop(events.AbstractEventLoop):
def run_until_complete(self, future):
# assert not self._closed
self._check_closed()

# new_task = True
# if obj is not a Future instance
# 或设置了 obj._asyncio_future_blocking
new_task = not futures.isfuture(future)

# 返回一个 Future 对象
future = tasks.ensure_future(future, loop=self)
if new_task:
future._log_destroy_pending = False
# 添加回调, pool.stop()
future.add_done_callback(_run_until_complete_cb)
try:
self.run_forever()
except:
if new_task and future.done() and not future.cancelled():
# The coroutine raised a BaseException. Consume the exception
# to not log a warning, the caller doesn't have access to the
# local task.
future.exception()
raise
finally:
future.remove_done_callback(_run_until_complete_cb)
if not future.done():
raise RuntimeError('Event loop stopped before Future completed.')

return future.result()

从代码中可以看出,最终还是调用run_forever来执行事件。
run until通过添加回调,当指定 Future 完成后,结束整个 pool。

ensure_future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def ensure_future(coro_or_future, *, loop=None):
if futures.isfuture(coro_or_future):
# 已经是 Future 对象,直接返回
if loop is not None and loop is not coro_or_future._loop:
# Future 对象不能绑定不同 loop
raise ValueError('loop argument must agree with Future')
return coro_or_future
elif coroutines.iscoroutine(coro_or_future):
# Coro ,返回一个 Task 对象,Future 的子类
if loop is None:
loop = events.get_event_loop()
task = loop.create_task(coro_or_future)
if task._source_traceback:
del task._source_traceback[-1]
return task
elif compat.PY35 and inspect.isawaitable(coro_or_future):
# awaitable 对象,封装成 Coro,调用其 __await__ 方法
return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
else:
raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
'required')

class BaseEventLoop(events.AbstractEventLoop):
def create_task(self, coro):
self._check_closed()
if self._task_factory is None:
# 实例化过程中,在 pool._ready 中放入封装的 handler()
task = tasks.Task(coro, loop=self)
if task._source_traceback:
del task._source_traceback[-1]
else:
task = self._task_factory(self, coro)
return task

@coroutine
def _wrap_awaitable(awaitable):
return (yield from awaitable.__await__())

在 ensure_future 中隐藏了非常重要的代码:如何将 coro 与 pool 关联起来。

Task(coro, loop=self)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# tasks.Task(coro, loop=self)

class Task(futures.Future):
def __init__(self, coro, *, loop=None):
super().__init__(loop=loop)
self._coro = coro
self._loop.call_soon(self._step)
# Weak set containing all tasks alive.
self.__class__._all_tasks.add(self)

class Future:
def __init__(self, *, loop=None):
if loop is None:
self._loop = events.get_event_loop()
else:
self._loop = loop
self._callbacks = []

class BaseEventLoop(events.AbstractEventLoop):
def call_soon(self, callback, *args):
self._check_closed()
handle = self._call_soon(callback, args)
if handle._source_traceback:
del handle._source_traceback[-1]
return handle

def _call_soon(self, callback, args):
handle = events.Handle(callback, args, self)
if handle._source_traceback:
del handle._source_traceback[-1]
self._ready.append(handle)
return handle

Task 是 Future 的子类,在实例化过程中:

  • 将 loop/coro 保存在 Task 属性中
  • 通过loop.call_soon(task._step),将task._step()作为回调,传入到 loop 中
  • task._step封装成 Handle 对象,放入 Loop._ready 双端队列

add_done_callback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# future.add_done_callback(_run_until_complete_cb)
def _run_until_complete_cb(fut):
fut._loop.stop() # pool._stopping = True

class Future:
def add_done_callback(self, fn):
"""Add a callback to be run when the future becomes done.

The callback is called with a single argument - the future object. If
the future is already done when this is called, the callback is
scheduled with call_soon.
"""
if self._state != _PENDING:
self._loop.call_soon(fn, self)
else:
self._callbacks.append(fn)

此处,注册回调,在 Future 完成后,stop event loop。

run_forever

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class BaseEventLoop(events.AbstractEventLoop):
def run_forever(self):
# 1. 状态检查
self._check_closed()
if self.is_running():
raise RuntimeError('This event loop is already running')
if events._get_running_loop() is not None:
raise RuntimeError(
'Cannot run the event loop while another loop is running')
# 2. 准备环境
self._set_coroutine_wrapper(self._debug)
self._thread_id = threading.get_ident()
if self._asyncgens is not None:
old_agen_hooks = sys.get_asyncgen_hooks()
sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
finalizer=self._asyncgen_finalizer_hook)
try:
events._set_running_loop(self)
while True:
# 3. 执行调度
self._run_once()
if self._stopping:
break
finally:
# 4. 退出
self._stopping = False
self._thread_id = None
events._set_running_loop(None)
self._set_coroutine_wrapper(False)
if self._asyncgens is not None:
sys.set_asyncgen_hooks(*old_agen_hooks)

run_forever中,通过while True不断的执行run_once方法,完成整个事件驱动的循环调度。
通过_stopping属性,控制循环的退出。

_run_once

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class BaseEventLoop(events.AbstractEventLoop):
def __init__(self):
self._timer_cancelled_count = 0
self._ready = collections.deque()
self._scheduled = []

def _run_once(self):
sched_count = len(self._scheduled)
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and # 100
self._timer_cancelled_count / sched_count >
_MIN_CANCELLED_TIMER_HANDLES_FRACTION): # 0.5
# Remove delayed calls that were cancelled if their number
# is too high
new_scheduled = []
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)
heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
else:
# Remove delayed calls that were cancelled from head of queue.
while self._scheduled and self._scheduled[0]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
handle._scheduled = False

# 省略了 debug, timeout 处理
timeout = 0
# 调用 pool/epoll 等,得到 [(key, mask),] 事件列表
event_list = self._selector.select(timeout)

# 遍历 event_list,根据 mask 调用 Loop.remove / Loop._add_callback
# 在 _add_callback 中会调用 Loop._ready.append(handle)
self._process_events(event_list)

# Handle 'later' callbacks that are ready.
end_time = self.time() + self._clock_resolution
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)

# This is the only place where callbacks are actually *called*.
# All other places just add them to ready.
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
# 省略了 debug 处理
handle._run()
handle = None # Needed to break cycles when an exception occurs.

在 Task 实例化过程中,已经将task._step封装成 handler 放入了 _ready 队列中。
_run_once 中,取出 handle 执行 run 方法。

handle._run()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Handle:
def _run(self):
try:
self._callback(*self._args)
except Exception as exc:
cb = _format_callback_source(self._callback, self._args)
msg = 'Exception in callback {}'.format(cb)
context = {
'message': msg,
'exception': exc,
'handle': self,
}
if self._source_traceback:
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)
self = None # Needed to break cycles when an exception occurs.

可见 handler._run,实际上就是执行 callback。对于封装的 Task,此处将执行 Task()._step()方法。

task._step

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# Task(coro, loop=self)

class Task(futures.Future):
def _step(self, exc=None):
coro = self._coro
self.__class__._current_tasks[self._loop] = self
# Call either coro.throw(exc) or coro.send(None).
try:
if exc is None:
# We use the `send` method directly, because coroutines
# don't have `__iter__` and `__next__` methods.
result = coro.send(None)
else:
result = coro.throw(exc)
except StopIteration as exc:
if self._must_cancel:
# Task is cancelled right before coro stops.
self._must_cancel = False
self.set_exception(futures.CancelledError())
else:
# 会调用 self._schedule_callbacks()
self.set_result(exc.value)
except futures.CancelledError:
super().cancel() # I.e., Future.cancel(self).
except Exception as exc:
self.set_exception(exc)
except BaseException as exc:
self.set_exception(exc)
raise
else:
...
finally:
self.__class__._current_tasks.pop(self._loop)
self = None # Needed to break cycles when an exception occurs.

在 task.step 中,会执行 coro.send(None),进入到协程代码执行阶段。
代码正常执行完,会raise StopIteration(val),然后进入到 set_result环节。

future.set_result()中,将该 future 注册的所有回调,通过 loop.call_soon 放入执行队列,等待执行。
例如,run_until_complete 中,注册了回调_run_until_complete_cb,执行后将退出 loop 循环。

asyncio.sleep(1.0)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# await asyncio.sleep(1.0)

@coroutine
def sleep(delay, result=None, *, loop=None):
"""Coroutine that completes after a given time (in seconds)."""
if delay == 0:
yield
return result

if loop is None:
loop = events.get_event_loop()

# futures.Future(loop=self)
future = loop.create_future()

# def call_later(self, delay, callback, *args)
h = future._loop.call_later(delay,
futures._set_result_unless_cancelled,
future, result)
try:
return (yield from future)
finally:
h.cancel()

class BaseEventLoop(events.AbstractEventLoop):
def call_at(self, when, callback, *args):
self._check_closed()
timer = events.TimerHandle(when, callback, args, self)
heapq.heappush(self._scheduled, timer)
timer._scheduled = True
return timer

def call_later(self, delay, callback, *args):
timer = self.call_at(self.time() + delay, callback, *args)
return timer

def _set_result_unless_cancelled(fut, result):
if fut.cancelled():
return
# fut._result = result
# fut._state = _FINISHED
fut.set_result(result)

我们已经知道,await AwaitableObject,会挂起协程,直到 AwaitableObject 运行结束。
执行 asyncio.sleep(1.0)

  • 创建一个 future 对象
  • 通过 loop.call_later 创建一个 TimerHandle() 对象
  • 将 time 的回调置为 _set_result_unless_cancelled
  • 将 time 放入 loop._scheduled 数组,等待 1s 后触发,执行
  • 执行后,将传入的 result 赋值给 future._result

在等待的时间内,将继续执行代码yield from future

yield from future

回忆下,字节码YIELD_FROM的执行逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
TARGET(YIELD_FROM) {
PyObject *v = POP();
PyObject *receiver = TOP();
if (PyGen_CheckExact(receiver) || PyCoro_CheckExact(receiver)) {
// 如果是 generaotr / corotine,调用 Send()
retval = _PyGen_Send((PyGenObject *)receiver, v);
} else {
// 其他 Awaitable 对象,迭代下一个元素
_Py_IDENTIFIER(send);
if (v == Py_None)
retval = Py_TYPE(receiver)->tp_iternext(receiver);
else
retval = _PyObject_CallMethodIdObjArgs(receiver, &PyId_send, v, NULL);
}
}

在看下 Future 的定义:

1
2
3
4
5
6
7
8
9
10
class Future:
def __iter__(self):
if not self.done():
self._asyncio_future_blocking = True
yield self # This tells Task to wait for completion.
assert self.done(), "yield from wasn't used with future"
return self.result() # May raise too.

if compat.PY35:
__await__ = __iter__ # make compatible with 'await' expression

  1. 在 compute() 中 await asyncio.sleep(1.0),然后又yield from future
  2. 在 Future 中,如果未 done,将一直 yield self
  3. 当 event loop 监听到 timer 触发,将在run_once中执行 timer._run
  4. 最终执行Future.set_result(result),改变 future 状态,return result

asyncio.sleep 创建的 futer,至此完全结束,返回 await 位置,继续执行。