【Python 协程系列】Gevent 源码分析

参考资料

前言

gevent is a coroutine -based Python networking library that uses greenlet to provide a high-level
synchronous API on top of the libev or libuv event loop.

Features include:

  • Fast event loop based on libev or libuv.
  • Lightweight execution units based on greenlets.
  • API that re-uses concepts from the Python standard library (for examples there are events and queues).
  • Cooperative sockets with SSL support
  • Cooperative DNS queries performed through a threadpool, dnspython, or c-ares.
  • Monkey patching utility to get 3rd party modules to become cooperative
  • TCP/UDP/HTTP servers
  • Subprocess support (through gevent.subprocess)
  • Thread pools

Monkey Patch

Gevent 最著名的莫过于Monkey patch。它将部分标准库的同步代码,用异步进行了实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# gevent.monkey.py
__all__ = [
'patch_all',
'patch_builtins',
'patch_dns',
'patch_os',
'patch_select',
'patch_signal',
'patch_socket',
'patch_ssl',
'patch_subprocess',
'patch_sys',
'patch_thread',
'patch_time',
]

我们可以猜测到,在 path_ 中,必然会执行 `import lib, setattr(lib, , gevent.*)`

下面我们只看其中一个例子 patch_socket。

patch_socket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@_ignores_DoNotPatch
def patch_socket(dns=True, aggressive=True):
from gevent import socket
if dns:
items = socket.__implements__ # pylint:disable=no-member
else:
items = set(socket.__implements__) - set(socket.__dns__)
"""
gevent 重新实现的方法
_implements = [
'create_connection',
'socket',
'SocketType',
'fromfd',
'socketpair',
]
"""
_patch_module('socket', items=items)
if aggressive:
if 'ssl' not in socket.__implements__:
remove_item(socket, 'ssl')

patch_socket 中,根据参数决定 patch 的 items。然后调用 patch_module。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def _patch_module(name, items=None, _warnings=None,
_notify_did_subscribers=True):

gevent_module = getattr(__import__('gevent.' + name), name)
module_name = getattr(gevent_module, '__target__', name)
target_module = __import__(module_name)

patch_module(target_module, gevent_module, items=items,
_warnings=_warnings,
_notify_did_subscribers=_notify_did_subscribers)

return gevent_module, target_module

def patch_module(target_module, source_module, items=None,**kwargs):
...
for attr in items:
patch_item(target_module, attr, getattr(source_module, attr))

def patch_item(module, attr, newitem):
olditem = getattr(module, attr, _NONE)
if olditem is not _NONE:
saved.setdefault(module.__name__, {}).setdefault(attr, olditem)
setattr(module, attr, newitem)

在 patch_module 中,导入原生和 Gevent 重写的 module。遍历 items 利用 setattr 完成替换工作。

Demo

1
2
from gevent import time
time.sleep(1)

运行发现,time.sleep(1) 同样会阻塞。

1
2
3
4
5
6
# gevent.time.py
__implements__ = [
'sleep',
]
from gevent.hub import sleep
sleep = sleep

如上,gevent.time 模块,仅对 sleep 函数进行 patch。

sleep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def sleep(seconds=0, ref=True):
hub = _get_hub_noargs()
loop = hub.loop
if seconds <= 0:
waiter = Waiter(hub)
loop.run_callback(waiter.switch, None)
waiter.get()
else:
with loop.timer(seconds, ref=ref) as t:
# Sleeping is expected to be an "absolute" measure with
# respect to time.time(), not a relative measure, so it's
# important to update the loop's notion of now before we start
loop.update_now()
hub.wait(t)

在 sleep 中,出现了 hub/loop/waiter。我们一个个的看。

get_hub_noargs

1
2
3
4
5
6
def get_hub_noargs():
hub = _threadlocal.hub # 又是线程变量
if hub is None:
hubtype = get_hub_class()
hub = _threadlocal.hub = hubtype() # gevent.hub.Hub()
return hub

返回一个 Hub() 实例化对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class Hub(WaitOperationsGreenlet):
"""
A greenlet that runs the event loop.

It is created automatically by :func:`get_hub`."""

def __init__(self, loop=None, default=None):
WaitOperationsGreenlet.__init__(self, None, None)
self.thread_ident = get_thread_ident()
if hasattr(loop, 'run'):
if default is not None:
raise TypeError("Unexpected argument: default")
self.loop = loop
elif get_loop() is not None:
self.loop = get_loop()
else:
if default is None and self.thread_ident != MAIN_THREAD_IDENT:
default = False
loop = self.backend
self.loop = self.loop_class(flags=loop, default=default)
self._resolver = None
self._threadpool = None
self.format_context = GEVENT_CONFIG.format_context

if loop is None:
self.minimal_ident = hub_ident_registry.get_ident(self)

# WaitOperationsGreenlet -> SwitchOutGreenletWithLoop -> TrackedRawGreenlet -> greenlet
class TrackedRawGreenlet(greenlet):
def __init__(self, function, parent):
greenlet.__init__(self, function, parent)

current = getcurrent() # greenlet.getcurrent()
self.spawning_greenlet = wref(current)
# See Greenlet for how trees are maintained.
try:
self.spawn_tree_locals = current.spawn_tree_locals
except AttributeError:
self.spawn_tree_locals = {}
if current.parent:
current.spawn_tree_locals = self.spawn_tree_locals