【Python Web 系列】Flask 源码分析

参考

Demo

1
2
3
4
5
6
7
8
9
10
# Flask.__version__ == 1.0.2
from flask import Flask
app = Flask(__name__)

@app.route("/")
def hello():
return "Hello World!"

if __name__ == '__main__':
app.run()

Flask

1
2
3
4
5
6
7
class Flask(_PackageBoundObject):

def wsgi_app(self, environ, start_response):
...

def __call__(self, environ, start_response):
return self.wsgi_app(environ, start_response)

Flask实例化后,就是一个符合 WSGI 规定的,可调用对象。再来看看实例化都要干些啥。

init

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def __init__(...):
self.config = self.make_config()
self.view_functions = {}
self.error_handler_spec = {}
self.before_request_funcs = {}
self.before_first_request_funcs = []
self.after_request_funcs = {}
self.teardown_request_funcs = {}
self.teardown_appcontext_funcs = []
self.url_value_preprocessors = {}
self.url_default_functions = {}
self.url_map = Map()
self.blueprints = {}
self._blueprint_order = []
self.extensions = {}

看名字就知道,主要是对每次请求,创建一个处理通道。

@app.route

1
2
3
4
5
6
7
8
9
10
11
12
13
def route(self, rule, **options):
def decorator(f):
endpoint = options.pop('endpoint', None)
self.add_url_rule(rule, endpoint, f, **options)
return f
return decorator

def add_url_rule(self, rule, endpoint=None, view_func=None,
provide_automatic_options=None, **options):
...
rule = Rule(rule, methods=methods, **options)
self.url_map.add(rule)
self.view_functions[endpoint] = view_func

把路由 rule,加到 url_map 里;把 rule 对应的 endpint 跟 view_func,通过字典 view_functions 进行关联。

app.run

1
# 自行脑补

run 仅仅在开发环境使用。我们可以自行脑补出代码:”创建一个符合 WSGI 的服务器,从浏览器接收 HTTP 请求,封装 environ,调用 app.call“。
生产环境一般都通过配置文件,指定 WSGI app 所在位置。我们直接来看,这种情况下怎么运行。

wsgi_app

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def wsgi_app(self, environ, start_response):
ctx = RequestContext(self, environ) # flask.ctx.RequestContext
error = None
try:
try:
ctx.push() # Binds the request context to the current context."
response = self.full_dispatch_request()
except Exception as e:
error = e
response = self.handle_exception(e)
except:
error = sys.exc_info()[1]
raise
return response(environ, start_response)
finally:
if self.should_ignore_error(error):
error = None
ctx.auto_pop(error)

整个请求都在这部分完成:请求上下文的压栈,请求路由分发,错误处理,返回内容,上下文出栈。

接下来就到了Flask 的精华部分,Context!

Context

RequestContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class RequestContext(object):
"""The request context contains all request relevant information. It is
created at the beginning of the request and pushed to the
`_request_ctx_stack` and removed at the end of it. It will create the
URL adapter and request object for the WSGI environment provided.
"""
def __init__(self, app, environ, request=None):
self.app = app
self.request = Request(environ)
self.url_adapter = app.create_url_adapter(self.request)
self.flashes = None
self.session = None

# 请求上下文可以 Push 多次
# 如果缺少 application context,会为每层都添加一个
self._implicit_app_ctx_stack = []

# 请求结束后,'after_request'执行前,执行
self._after_request_functions = []

# 调用 werkzeug.routing.MapAdapter().match
# 对 request.url_rule/view_args,进行赋值
self.match_request()

# _app_ctx_stack.top.g 处理

def push(self):
"""Binds the request context to the current context."""
top = _request_ctx_stack.top
app_ctx = _app_ctx_stack.top
if app_ctx is None or app_ctx.app != self.app:
app_ctx = self.app.app_context()
app_ctx.push()
self._implicit_app_ctx_stack.append(app_ctx)
else:
self._implicit_app_ctx_stack.append(None)

_request_ctx_stack.push(self)

# 处理 self.session
# 从 self.request.cookies 加载,没有则创建一个 NullSession

def pop(self, exc=_sentinel):
# exc 处理
# self.app.do_teardown_request(exc)
rv = _request_ctx_stack.pop()
assert rv is self, ...

def auto_pop(self, exc):
self.pop(exc)

说的好有道理,RequestContext(app, env),包含了请求需要的全部信息。在请求前创建,并 push 到_request_ctx_stack,请求结束后,弹出。从代码中,还能看见另一个上下文_app_ctx_stack,即应用上下文。在创建请求上下文的同时,若_app_ctx_stack.top不存在,会先把self.app入栈。

那么,是时候看看这个ctx_stack是个什么东西。

Local

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
try:
from greenlet import getcurrent as get_ident
except ImportError:
try:
from thread import get_ident
except ImportError:
from _thread import get_ident

class Local(object):
def __init__(self):
self.__storage__ = {}

def __setattr__(self, name, value):
ident = get_ident()
storage = self.__storage__
try:
storage[ident][name] = value
except KeyError:
storage[ident] = {name: value}

利用get_ident获取到协程/线程号,通过这个唯一id,在字典中实现线程安全。

LocalStack

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class LocalStack(object):
""" from werkzeug.local import LocalStack
>>> ls = LocalStack()
>>> ls.push(42)
>>> ls.top
42
>>> ls.push(23)
>>> ls.top
23
>>> ls.pop()
23
>>> ls.top
42
"""
def __init__(self):
self._local = Local()

def push(self, obj):
rv = getattr(self._local, 'stack', None)
if rv is None:
self._local.stack = rv = []
rv.append(obj)
return rv

在线程安全的基础上,用列表,实现栈操作。

ctx_stack

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# flask.globals
def _lookup_req_object(name):
top = _request_ctx_stack.top
if top is None:
raise RuntimeError(_request_ctx_err_msg)
return getattr(top, name)

def _find_app():
top = _app_ctx_stack.top
if top is None:
raise RuntimeError(_app_ctx_err_msg)
return top.app

_request_ctx_stack = LocalStack()
_app_ctx_stack = LocalStack()
current_app = LocalProxy(_find_app)
request = LocalProxy(partial(_lookup_req_object, 'request'))
session = LocalProxy(partial(_lookup_req_object, 'session'))
g = LocalProxy(partial(_lookup_app_object, 'g'))

# LocalProxy 就是对象访问的代理封装

可见,经常使用的 request/session,都是线程安全的。并且,都是从最顶层请求上下文中获取到。

获取到上下文之后,自然继续执行wsgi_app()

1
2
response = self.full_dispatch_request()
return response(environ, start_response)

full_dispatch_request

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def full_dispatch_request(self):
# 处理第一个请求前置 func
self.try_trigger_before_first_request_functions()
try:
# 发送信号 request-started
request_started.send(self)
# 调用 request 处理函数
rv = self.preprocess_request()
if rv is None:
# 根据 url_map,找到定义的 url 处理函数
rv = self.dispatch_request()
except Exception as e:
rv = self.handle_user_exception(e)
# 调用 make_response & process_response
return self.finalize_request(rv)

嗯,这个地方就是 Flask 的视图执行逻辑了。路由以及对应的装饰器函数,都在此处被执行。
很明显,只要preprocess_request有返回结果,就会跳过定义的视图函数。
整个运行期间,只要有错误,就会调用注册的错误处理函数。

注意,错中错在此处并没有捕捉,在最外层的 wsgi_app() 中进行了捕捉,然后会尝试调用注册的 500 错误处理过程。

preprocess_request

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def preprocess_request(self):
bp = _request_ctx_stack.top.request.blueprint

# @url_value_preprocessor -> {None:[], bp:[]}
funcs = self.url_value_preprocessors.get(None, ())
if bp is not None and bp in self.url_value_preprocessors:
funcs = chain(funcs, self.url_value_preprocessors[bp])
for func in funcs:
func(request.endpoint, request.view_args)

# @before_request -> {None:[], bp:[]}
funcs = self.before_request_funcs.get(None, ())
if bp is not None and bp in self.before_request_funcs:
funcs = chain(funcs, self.before_request_funcs[bp])
for func in funcs:
rv = func()
if rv is not None:
return rv

嗯,通过两个装饰器,在 view_func 之前运行指定函数。需要注意的是:

  • 两个装饰器都以 blueprint 为单位,None 为所有 bp 共享
  • @url_value_preprocessor装饰的 func,接收两个参数,并且忽略返回值。通常用于对 url args 进行处理
  • @before_request装饰的 func,不接收参数,且只要有返回值,就直接 return。通常用于获取 db 连接,获取 session info

装饰器处理完,自然就开始执行真正匹配到的 view_func。

dispatch_request

1
2
3
4
5
6
7
8
9
10
11
12
def dispatch_request(self):
req = _request_ctx_stack.top.request
if req.routing_exception is not None:
self.raise_routing_exception(req)
rule = req.url_rule # 在 RequestContext 初始化时 创建

if getattr(rule, 'provide_automatic_options', False) \
and req.method == 'OPTIONS':
return self.make_default_options_response()

# 在 add_url_rule 填充 view_functions
return self.view_functions[rule.endpoint](**req.view_args)

一目了然,通过 endpoint 获取到可调用对象,以关键字形式,传入 url 参数,返回结果。

接下来自然是 错误处理/返回结果。

handle_user_exception

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def handle_user_exception(self, e):
""" _find_error_handler(self, e).__doc__
Return a registered error handler for an exception in this order:
blueprint handler for a specific code, app handler for a specific code,
blueprint handler for an exception class, app handler for an exception
class, or ``None`` if a suitable handler is not found.
"""
exc_type, exc_value, tb = sys.exc_info()

# @errorhandler -> error_handler_spec -> {bp:{code:{cls:func,..}}}
handler = self._find_error_handler(e)
if handler is None:
reraise(exc_type, exc_value, tb)
return handler(e)

错误处理,会按照 code 优先,bp 优先的顺序寻找注册的错误处理函数,得到一个 rv。

finalize_request

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def finalize_request(self, rv, from_error_handler=False):
# 封装 Response()
response = self.make_response(rv)
try:
# 调用 @after_request
response = self.process_response(response)
# 发送信号
request_finished.send(self, response=response)
except Exception:
if not from_error_handler:
raise
self.logger.exception('Request finalizing failed with an '
'error while handling an error')
return response

这个函数,会在两个地方被调用:

  1. 正常处理完,会被 full_dispatch_request 调用
  2. 执行 full_dispatch_request 报错,会被 wsgi_app 捕捉到,然后执行 handle_exception(内部错误),被调用

process_response

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def process_response(self, response):
ctx = _request_ctx_stack.top
bp = ctx.request.blueprint

# @after_this_request -> _after_request_functions -> []
funcs = ctx._after_request_functions

# @after_request -> after_request_funcs -> {bp:{}, None:{}}
if bp is not None and bp in self.after_request_funcs:
funcs = chain(funcs, reversed(self.after_request_funcs[bp]))
if None in self.after_request_funcs:
funcs = chain(funcs, reversed(self.after_request_funcs[None]))

# 链式处理 response
for handler in funcs:
response = handler(response)

# 处理 session
if not self.session_interface.is_null_session(ctx.session):
self.session_interface.save_session(self, ctx.session, response)
return response

这里又有两个装饰器,其中一个很少用的是@after_this_request,它在 view_func 内部使用。

返回一个 Response 实例对象,回到 wsgi_app 中继续执行 return response(environ, start_response)

Response

1
return response(environ, start_response)

我们已经知道,WSGI 规定 app 必须保证在 return iterable 之前,必须调用 start_response。
那么,必然在 Response 中定义了 call 方法,并且会在其中调用 start_response。

1
2
3
4
5
6
7
8
9
10
class BaseResponse:
def get_wsgi_response(self, environ):
headers = self.get_wsgi_headers(environ)
app_iter = self.get_app_iter(environ)
return app_iter, self.status, headers.to_wsgi_list()

def __call__(self, environ, start_response):
app_iter, status, headers = self.get_wsgi_response(environ)
start_response(status, headers)
return app_iter

至此,Flask 作为 WSGI app 的整个流程已经走完。

小结

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@app.route('/')
调用 add_url_rule(),填充 view_functions[endpoint] = view_func

@app.errorhandler(404)
@bp.errorhandler(ExceptionClass)
根据 code/cls,填充 error_handler_spec[None/bp] = {code: {cls: func}}

@app.before_first_request
before_first_request_funcs.append(f)
特点:不接收参数,且忽略返回值

@app.url_value_preprocessor
@bp.url_value_preprocessor
url_value_preprocessors.setdefault(None/bp, []).append(func)
特点:func(request.endpoint, request.view_args),忽略返回值

@app.before_request
@bp.before_request
before_request_funcs.setdefault(None/bp, []).append(f)
特点:func(),具有 截取特性

@app.after_request
@bp.after_request
after_request_funcs.setdefault(None/bp, []).append(f)
特点:response = handler(response),具有叠加特性

@app.teardown_request
@bp.teardown_request
teardown_request_funcs.setdefault(None/bp, []).append(f)
特点:called after each request, even if an exception has occurred.
在 RequestContext.pop() 时被调用,执行完后会发送信号`request_tearing_down`

@app.teardown_appcontext
teardown_appcontext_funcs.append(f)
特点:Called right before the application context is popped.
在 AppContext.pop() 是被调用,执行完后发送信号`appcontext_tearing_down`

Flask is a microframework. 在内部大量使用装饰器,作为整个处理流程的中间环节。

利用 werkzeug 的 Request/Response,作为一进一出,配上一堆装饰器构成中间处理流程,这就是 Flask。微小,但包容。