【Python Web 系列】Tornado 源码分析

参考资料

前言

Tornado is a Python web framework and asynchronous networking library。

包含以下几个方面:

  • A web framework,继承 RequestHandler 创建 application
  • Client- and server-side implementions of HTTP (HTTPServer and AsyncHTTPClient).
  • An asynchronous networking library including the classes IOLoop and IOStream
  • A coroutine library (tornado.gen),现已经推荐使用 Native Cocoutine。
  • 虽然兼容其他WSGI server/client,但无法发挥 Tornado 的高并发、长连接特点

Tornado框架模型

上图可以看出,Tornado 可以分为四层:

  • EVENT 处理 IO 事件,这一层已经由 asyncio 来实现
  • TCP 实现了 TCP 服务器,负责数据传输
  • HTTP/HTTPS,基于 HTTP 协议,实现了 服务端和客户端
  • WEB框架,基于认证/DB/Wsgi/模板/本地化,实现 Web 框架

Tornado 使用单线程的事件循环来处理每个连接,在 Tornado5.0+,Py3.5+中:

  • IOLoop 已是对官方库 asyncio.io_loop 的封装
  • 异步占位符 Future,现已推荐使用 Native coroutines

Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
import tornado.ioloop
import tornado.web

class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world")

if __name__ == "__main__":
application = tornado.web.Application([
(r"/", MainHandler),
])
application.listen(4000)
tornado.ioloop.IOLoop.current().start()

Application

1
2
3
4
5
6
# tornado.web.py
class Application(ReversibleRouter):
""" A collection of request handlers that make up a web application."""
def __init__(self, handlers=None, default_host=None, transforms=None,
**settings):
...

web.Application 是 tornado 作为 Web 框架最核心的类。在初始化时传入了:

  • handlers,路由及对应 处理类列表
  • settings,UI模块、静态文件等配置参数
  • transforms,分块压缩配置
1
2
3
4
5
6
7
# application.listen(4000)
class Application(ReversibleRouter):
def listen(self, port, address="", **kwargs):
from tornado.httpserver import HTTPServer
server = HTTPServer(self, **kwargs)
server.listen(port, address)
return server

在 application.listen 中,可以发现,实际上调用的是 HTTPServer().listen()。

HTTPServer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class Configurable(object):
def __new__(cls, *args, **kwargs):
base = cls.configurable_base()
init_kwargs = {}
if cls is base:
impl = cls.configured_class()
if base.__impl_kwargs:
init_kwargs.update(base.__impl_kwargs)
else:
impl = cls
init_kwargs.update(kwargs)
if impl.configurable_base() is not base:
return impl(*args, **init_kwargs)
instance = super(Configurable, cls).__new__(impl)
instance.initialize(*args, **init_kwargs)
return instance

class HTTPServer(TCPServer, Configurable,
httputil.HTTPServerConnectionDelegate):
def __init__(self, *args, **kwargs):
pass

def initialize(self, request_callback, no_keep_alive=False,
xheaders=False, ssl_options=None, protocol=None,
decompress_request=False,
chunk_size=None, max_header_size=None,
idle_connection_timeout=None, body_timeout=None,
max_body_size=None, max_buffer_size=None,
trusted_downstream=None):
...
TCPServer.__init__(self,...)

我们注意到,HTTPServer 继承了 Configurable,并且后者定义了 new 方法。
在 new 方法中,整合实例化参数,调用 initialize,构建实例化对象并返回。
其中:

  • 参数 request_callback == Application()
  • 将自身初始化为 TCPServer 对象

TCPServer.listen

1
2
3
4
5
6
7
8
9
10
class TCPServer(object):
def listen(self, port, address=""):
sockets = bind_sockets(port, address=address)
self.add_sockets(sockets)

def add_sockets(self, sockets):
for sock in sockets:
self._sockets[sock.fileno()] = sock
self._handlers[sock.fileno()] = add_accept_handler(
sock, self._handle_connection)

如上,HTTPServer.listen 继承父类 TCPServer.listen。在 add_sockets 中,将执行很重要一个函数add_accept_handler

add_accept_handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def add_accept_handler(sock, callback):
io_loop = IOLoop.current()
removed = [False]
def accept_handler(fd, events):
# for 循环,限制单次处理最大 128 个
for i in xrange(_DEFAULT_BACKLOG):
if removed[0]:
return
try:
connection, address = sock.accept()
except socket.error as e:
if errno_from_exception(e) in _ERRNO_WOULDBLOCK:
return
if errno_from_exception(e) == errno.ECONNABORTED:
continue
raise
set_close_exec(connection.fileno())
callback(connection, address)

def remove_handler():
io_loop.remove_handler(sock)
removed[0] = True

io_loop.add_handler(sock, accept_handler, IOLoop.READ)
return remove_handler

在 add_accept_handler 中,将 socket 放入 io_loop,当有连接请求,会调用闭包函数 accept_handler。
在其中执行 sock.accept(),获取与客户端连接 conn,调用回调函数 TCPServer._handle_connection,处理连接。

TCPServer._handle_connection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class TCPServer(object):
def _handle_connection(self, connection, address):
# Socket-based non-blocking Rader&Witer,有省略
stream = IOStream(connection,
max_buffer_size=self.max_buffer_size,
read_chunk_size=self.read_chunk_size)

future = self.handle_stream(stream, address)
if future is not None:
IOLoop.current().add_future(gen.convert_yielded(future),
lambda f: f.result())

class HTTPServer(TCPServer,...):
def handle_stream(self, stream, address):
context = _HTTPRequestContext(stream, address,
self.protocol,
self.trusted_downstream)
conn = HTTP1ServerConnection(
stream, self.conn_params, context)
self._connections.add(conn)
conn.start_serving(self)

如上,正式在 handle_connection 中,执行了 io_loop.add_future
将 conn 封装非阻塞的 IOStream, 然后添加上下文 _HTTPRequestContext,封装成 HTTP1Connection,放入了 io_loop 中。

HTTP1ServerConnection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
"""
conn.start_serving(self) # self, 在此处是 HTTPServer
"""
class HTTP1ServerConnection(object):
def start_serving(self, delegate):
self._serving_future = self._server_request_loop(delegate)
self.stream.io_loop.add_future(self._serving_future,
lambda f: f.result())
@gen.coroutine
def _server_request_loop(self, delegate):
try:
while True:
conn = HTTP1Connection(self.stream, False,
self.params, self.context)
request_delegate = delegate.start_request(self, conn)
try:
ret = yield conn.read_response(request_delegate)
except (iostream.StreamClosedError,
iostream.UnsatisfiableReadError):
return
except _QuietException:
# This exception was already logged.
conn.close()
return
except Exception:
gen_log.error("Uncaught exception", exc_info=True)
conn.close()
return
if not ret:
return
yield gen.moment
finally:
delegate.on_close(self)

通过 while 循环,不断的读取数据,完成请求处理。在其中,delegate.start_request,调用的是HTTPServer().start_request

注意,这个地方,有一个小技巧:

  • _server_request_loop,本身是一个 yiled 生成器
  • gen.coroutine 装饰器,会调用 next(),消费生成器
1
2
3
4
5
6
7
8
9
10
result = func(*args, **kwargs)
if isinstance(result, GeneratorType):
...
# 此处,会消费 gen,进入 _server_request_loop
# 然后进入,conn.read_response(request_delegate),获取结果
yielded = next(result)

future = _create_future()
runner = Runner(result, future, yielded)
future.add_done_callback(lambda _: runner)

start_request

1
2
3
4
5
6
7
8
9
10
11
12
13
# request_delegate = delegate.start_request(self, conn) 
class HTTPServer(TCPServer, Configurable,
httputil.HTTPServerConnectionDelegate):
def start_request(self, server_conn, request_conn):
if isinstance(self.request_callback, httputil.HTTPServerConnectionDelegate):
delegate = self.request_callback.start_request(server_conn, request_conn)
else:
delegate = _CallableAdapter(self.request_callback, request_conn)

if self.xheaders:
delegate = _ProxyAdapter(delegate, request_conn)

return delegate

在 HTTPServer.initialize() 中,request_callback 被初始化为 Application()。

1
2
3
4
# Application -> ReversibleRouter -> Router -> HTTPServerConnectionDelegate
class Router(httputil.HTTPServerConnectionDelegate):
def start_request(self, server_conn, request_conn):
return _RoutingDelegate(self, server_conn, request_conn)

根据 Application 继承链,将调用 Router.start_request,得到一个 RoutingDelegate 对象。
yield conn.read_response(request_delegate) 消费。

read_response

1
2
3
4
5
6
7
8
9
10
11
12
13
# yield conn.read_response(_RoutingDelegate())
class HTTP1Connection(httputil.HTTPConnection):
def read_response(self, delegate):
if self.params.decompress:
delegate = _GzipMessageDelegate(delegate, self.params.chunk_size)
return self._read_message(delegate)
@gen.coroutine
def _read_message(self, delegate):
""" 读取 IOStream
1. yield header_data
2. start_line, headers 解析请求行
3. headers_received(start_line, headers),匹配 Handler
"""

此处,再次通过 @gen.coroutine 生成器,消费 _read_message。
在 read_response,从 socket 读取数据,供用户代码使用。
当 read_message 把请求头 yield 之后,会返回到 `@gen.coroutine` 装饰器代码逻辑中:

1
2
3
4
5
6
7
# result: read_message 生成器
yielded = next(result) # yielded: 生成器 next 的结果,请求头

# 返回到此处,继续执行
future = _create_future() # future:新的 Future 对象
runner = Runner(result, future, yielded)
future.add_done_callback(lambda _: runner)

此时,会进入 Runner 的实例化流程,然后调用 run 方法:

1
2
3
4
5
6
7
8
9
10
11
class Runner(object):
def __init__(self, gen, result_future, first_yielded):
...
if self.handle_yield(first_yielded):
gen = result_future = first_yielded = None
self.run()
def run(self):
...
while True:
value = future.result()
yielded = self.gen.send(value)

在此处,循环消费 read_message,直到读取完所有请求体。
注意,在读取到请求头,再次进入 read_message 时,会先解析请求行,找到 handler。

实例化 HandlerDelegate 对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# delegate.headers_received(start_line, headers)
class _RoutingDelegate(httputil.HTTPMessageDelegate):
def __init__(self, router, server_conn, request_conn):
self.server_conn = server_conn
self.request_conn = request_conn
self.delegate = None
self.router = router # 用户的 Application

def headers_received(self, start_line, headers):
# 1. 封装 request 对象
request = httputil.HTTPServerRequest(
connection=self.request_conn,
server_connection=self.server_conn,
start_line=start_line, headers=headers)

# 在 Application 中寻找对应的 router,_HandlerDelegate 对象
# 封装到 _RoutingDelegate.delegate 属性上
self.delegate = self.router.find_handler(request)
if self.delegate is None:
app_log.debug("Delegate for %s %s request not found",
start_line.method, start_line.path)
self.delegate = _DefaultMessageDelegate(self.request_conn)

return self.delegate.headers_received(start_line, headers)

然后进入 _RoutingDelegate.finish(),进入到_RoutingDelegate.delegate.finish()
HandlerDelegate 实例对象,包含很多信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class _HandlerDelegate(httputil.HTTPMessageDelegate):
def __init__(self, application, request, handler_class, handler_kwargs,
path_args, path_kwargs):
self.application = application
self.connection = request.connection
self.request = request
self.handler_class = handler_class
self.handler_kwargs = handler_kwargs or {}
self.path_args = path_args or []
self.path_kwargs = path_kwargs or {}
self.chunks = []
self.stream_request_body = _has_stream_request_body(self.handler_class)

def execute(self):
# 创建用户 handler 类实例对象
self.handler = self.handler_class(self.application, self.request,
**self.handler_kwargs)
...
# 准备执行 用户代码
self.handler._execute(transforms, *self.path_args,
**self.path_kwargs)

在 execute 中,会实例化 HandClass 对象,调用 _execute 准备执行用户代码。
注意,execute 也是一个被 @gen.coroutine 装饰的函数。

RequestHandler._execute()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class RequestHandler(object):
@gen.coroutine
def _execute(self, transforms, *args, **kwargs):
"""Executes this request with the given output transforms."""
self._transforms = transforms
try:
if self.request.method not in self.SUPPORTED_METHODS:
raise HTTPError(405)
self.path_args = [self.decode_argument(arg) for arg in args]
self.path_kwargs = dict((k, self.decode_argument(v, name=k))
for (k, v) in kwargs.items())
# If XSRF cookies are turned on, reject form submissions without
# the proper cookie
if self.request.method not in ("GET", "HEAD", "OPTIONS") and \
self.application.settings.get("xsrf_cookies"):
self.check_xsrf_cookie()

result = self.prepare()
if result is not None:
result = yield result
if self._prepared_future is not None:
# Tell the Application we've finished with prepare()
# and are ready for the body to arrive.
future_set_result_unless_cancelled(self._prepared_future, None)
if self._finished:
return

if _has_stream_request_body(self.__class__):
# In streaming mode request.body is a Future that signals
# the body has been completely received. The Future has no
# result; the data has been passed to self.data_received
# instead.
try:
yield self.request.body
except iostream.StreamClosedError:
return

method = getattr(self, self.request.method.lower())
result = method(*self.path_args, **self.path_kwargs)
if result is not None:
result = yield result
if self._auto_finish and not self._finished:
self.finish()
except Exception as e:
try:
self._handle_request_exception(e)
except Exception:
app_log.error("Exception in exception handler", exc_info=True)
finally:
# Unset result to avoid circular references
result = None
if (self._prepared_future is not None and
not self._prepared_future.done()):
# In case we failed before setting _prepared_future, do it
# now (to unblock the HTTP server). Note that this is not
# in a finally block to avoid GC issues prior to Python 3.4.
self._prepared_future.set_result(None)

如上,execute 的代码逻辑,跟 Django/Flask 类似:

  1. 处理 405
  2. 处理 XSRF
  3. prepare->hook before method
  4. 调用 method()
  5. 用户代码执行 self.write() 写入结果
  6. 自动/手动执行 sefl.finish() 完成请求
  7. 调用 RequestHandler.flush() WEB 框架层执行结束
  8. HTTP 层开始执行,write data to socket

小结

Tornador 作为 WEB 框架,会经历以下几个步骤:

  1. 准备阶段
    1. 接收路由表,参数,创建 Application 实例对象
  2. Socket.bind+listen
    1. 创建 HTTPServer 实例化对象,其继承自 TCPServer
    2. TCPServer.listen,将执行 socket.bind+listen,并将 sock-handle_connection 放入 io_loop 中
  3. 启动事件循环
    1. 最刚开始,io_loop 只监听了 bind 的 Socket
  4. 连接请求 Accept()
    1. 一旦监听到请求,将执行回调,在 handle_connection 中将执行 sock.accept(),获取到 conn
    2. 将 conn 封装成非阻塞的 IOStream,添加上下文,封装成协程对象,放入 io_loop
  5. HTTP 请求处理
    1. 在 HTTP1Connection 中,read_message,读取请求体,解析请求行,匹配 Handler
    2. 进入 RequestHandler._execute() 执行用户层代码逻辑
    3. 返回 HTTP 层,将数据写入到 socket 完成请求响应

IOLoop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class IOLoop(Configurable):
@staticmethod
def current(instance=True):
""" Returns the current thread's `IOLoop` """
if asyncio is None:
...
else:
try:
loop = asyncio.get_event_loop()
except (RuntimeError, AssertionError):
if not instance:
return None
raise
try:
# IOLoop._ioloop_for_asyncio = dict()
return IOLoop._ioloop_for_asyncio[loop]
except KeyError:
if instance:
from tornado.platform.asyncio import AsyncIOMainLoop
current = AsyncIOMainLoop(make_current=True)
else:
current = None
return current

如上,当存在 asyncio 模块时,返回的是 AsyncIOMainLoop() 对象

AsyncIOMainLoop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class AsyncIOMainLoop(BaseAsyncIOLoop):
def initialize(self, **kwargs):
super(AsyncIOMainLoop, self).initialize(asyncio.get_event_loop(), **kwargs)

class BaseAsyncIOLoop(IOLoop):
def initialize(self, asyncio_loop, **kwargs):
self.asyncio_loop = asyncio_loop
self.handlers = {}
# Set of fds listening for reads/writes
self.readers = set()
self.writers = set()
self.closing = False

for loop in list(IOLoop._ioloop_for_asyncio):
if loop.is_closed():
del IOLoop._ioloop_for_asyncio[loop]
IOLoop._ioloop_for_asyncio[asyncio_loop] = self
super(BaseAsyncIOLoop, self).initialize(**kwargs)

class IOLoop(Configurable):
def initialize(self, make_current=None):
if make_current is None:
if IOLoop.current(instance=False) is None:
self.make_current()
elif make_current:
current = IOLoop.current(instance=False)
# AsyncIO loops can already be current by this point.
if current is not None and current is not self:
raise RuntimeError("current IOLoop already exists")
self.make_current()

class Configurable(object):
def __new__(cls, *args, **kwargs):
...
instance = super(Configurable, cls).__new__(impl)
instance.initialize(*args, **init_kwargs)
return instance

如上,实例化时,通过不断的 super() 调用 initialize,将 asyncio_loop 封装成 AsyncIOMainLoop 对象返回。

start

1
2
3
4
5
6
7
8
9
10
11
12
class BaseAsyncIOLoop(IOLoop):
def start(self):
try:
old_loop = asyncio.get_event_loop()
except (RuntimeError, AssertionError):
old_loop = None
try:
self._setup_logging()
asyncio.set_event_loop(self.asyncio_loop)
self.asyncio_loop.run_forever()
finally:
asyncio.set_event_loop(old_loop)

执行 AsyncIOMainLoop().start(),最终调用的是 asyncio_loop.run_forever()
在 asyncio 模块介绍中,已经知道:在run_forever中,通过while True不断的执行run_once方法,完成整个事件驱动的循环调度。