【CPython3.6源码分析】Python 多线程机制

前言

Python 的 GIL 可谓“大名鼎鼎”,正是由它控制着字节码解释器的执行权限。跟操作系统的进程调度一样,Python 必然会存在一个调度机制,决定了什么时候该进行线程切换。Python 也通过时间片的方式,挂起当前线程,切换其他线程。

1
2
3
4
5
6
7
sys.getcheckinterval()、sys.setcheckinterval()
在 Python3.2 中已经被弃用
sys.getswitchinterval()、sys.setswitchinterval()

/* microseconds (the Python API uses seconds, though) */
#define DEFAULT_INTERVAL 5000
static unsigned long gil_interval = DEFAULT_INTERVAL;

从3.2起,Python 的线程时间片控制,由以前的指令数改为了时间控制,默认0.005秒,可通过API进行更改。线程调度策略,依然是由操作系统决定,解释器仅参与GIL的释放和申请。

_thread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// _threadmodule.c
PyMODINIT_FUNC
PyInit__thread(void){
m = PyModule_Create(&threadmodule);
PyThread_init_thread();
return m;
}

static struct PyModuleDef threadmodule = { ... }

static PyMethodDef thread_methods[] = {
{"start_new_thread",(PyCFunction)thread_PyThread_start_new_thread,
METH_VARARGS, start_new_doc},
...
}

_thread是一个标准的 C 模块,通过上面这熟悉的套路,让我们可以直接在 Python 中使用 C 函数。创建线程正是通过_thread.start_new_thread()实现。

start_new_thread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static PyObject *
thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs)
{
PyObject *func, *args, *keyw = NULL;
struct bootstate *boot;
long ident;

if (!PyArg_UnpackTuple(fargs, "start_new_thread", 2, 3,
&func, &args, &keyw))
return NULL;
assert PyCallable_Check(func), "first arg must be callable"
assert PyTuple_Check(args), "2nd arg must be a tuple"
assert keyw == NULL or PyDict_Check(keyw),
"optional 3rd arg must be a dictionary"
/* 创建 bootstate 结构 */
boot = PyMem_NEW(struct bootstate, 1);
boot->interp = PyThreadState_GET()->interp;
boot->func = func;
boot->args = args;
boot->keyw = keyw;
boot->tstate = _PyThreadState_Prealloc(boot->interp);

/* 初始化 Python 多线程环境 */
PyEval_InitThreads();

/* 创建系统 原生线程 */
ident = PyThread_start_new_thread(t_bootstrap, (void*) boot);
return PyLong_FromLong(ident);
}

如上,在C函数中,func/args都是通过参数 fargs 以元组的形式传入。然后创建 boot 结构体,初始化线程环境,创建线程,返回线程 id。

PyEval_InitThreads

1
2
3
4
5
6
7
8
9
10
11
// ceval.c.230
void PyEval_InitThreads(void)
{
if (gil_created())
return;
create_gil();
take_gil(PyThreadState_GET());
main_thread = PyThread_get_thread_ident();
if (!pending_lock)
pending_lock = PyThread_allocate_lock();
}

此时,主线程暂未开启 GIL,直接进入创建 GIL环节,然后持有 GIL,创建信号量。完成后返回start_new_thread

create_gil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static void create_gil(void)
{
/* 基于 pthread 线程库
static COND_T gil_cond; // PyCOND_T = PyCOND_T = pthread_cond_t
static MUTEX_T gil_mutex; // MUTEX_T = PyMUTEX_T = pthread_mutex_t
*/

/* 创建默认的互斥锁 pthread_mutex_init(&gil_mutex, NULL) */
MUTEX_INIT(gil_mutex);

/* 初始化条件变量 pthread_cond_init(&gil_cond, NULL) */
COND_INIT(gil_cond);

_Py_atomic_store_relaxed(&gil_last_holder, 0);
_Py_ANNOTATE_RWLOCK_CREATE(&gil_locked);
_Py_atomic_store_explicit(&gil_locked, 0, _Py_memory_order_release);
}

利用 C 模块,创建互斥锁、初始化条件变量。改变 gil_locked 的值,完成 GIL 的创建工作。

take_gil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// ceval_gil.h.67
static void take_gil(PyThreadState *tstate)
{
MUTEX_LOCK(gil_mutex); // 加锁

if (!_Py_atomic_load_relaxed(&gil_locked))
// 获取 GIL,若已释放,直接获取,跳转
goto _ready;

while (_Py_atomic_load_relaxed(&gil_locked)) {
// GIL 未释放
int timed_out = 0;
unsigned long saved_switchnum;
// 记录切换次数
saved_switchnum = gil_switch_number;
// 利用 pthread_cond_tim 阻塞等待超时
COND_TIMED_WAIT(gil_cond, gil_mutex, INTERVAL, timed_out);
/* 等待超时,仍未释放, 发送释放请求信号 */
if (timed_out &&
_Py_atomic_load_relaxed(&gil_locked) &&
gil_switch_number == saved_switchnum) {
// 设置 gil_drop_request=1,eval_breaker=1
SET_GIL_DROP_REQUEST();
}
}
_ready:
/* We now hold the GIL */
_Py_atomic_store_relaxed(&gil_locked, 1); // 设置 GIL 占用
_Py_ANNOTATE_RWLOCK_ACQUIRED(&gil_locked, /*is_write=*/1);

if (tstate != (PyThreadState*)_Py_atomic_load_relaxed(&gil_last_holder)) {
_Py_atomic_store_relaxed(&gil_last_holder, (uintptr_t)tstate);
++gil_switch_number;
}

if (_Py_atomic_load_relaxed(&gil_drop_request)) {
// 重置 gil_drop_request=0
RESET_GIL_DROP_REQUEST();
}
if (tstate->async_exc != NULL) {
_PyEval_SignalAsyncExc();
}

MUTEX_UNLOCK(gil_mutex); // 解锁
}

整个申请释放GIL的过程,都被互斥锁 gil_mutex 锁住。如果 GIL 被占用,将进行等待,超时后改变全局变量,给持有线程发送信号。竞争到 GIL 后,重置全局变量,返回PyEval_InitThreads

PyThread_allocate_lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static PyThread_type_lock pending_lock = 0; /* for pending calls */

PyThread_type_lock PyThread_allocate_lock(void)
{
sem_t *lock; // 声明信号量指针
int status, error = 0;

if (!initialized)
PyThread_init_thread();

lock = (sem_t *)PyMem_RawMalloc(sizeof(sem_t));

if (lock) {
// 初始化信号量,0表示仅当前进程内使用,1表示信号量初始值
status = sem_init(lock,0,1);
CHECK_STATUS("sem_init");
}
return (PyThread_type_lock)lock;
}

创建一个初始状态为1的信号量,返回PyEval_InitThreads

PyThread_start_new_thread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// ident = PyThread_start_new_thread(t_bootstrap, (void*) boot);
long
PyThread_start_new_thread(void (*func)(void *), void *arg)
{
pthread_t th; // 创建线程对象
int status;
if (!initialized)
PyThread_init_thread();

status = pthread_create(&th,
(pthread_attr_t*)NULL,
(void* (*)(void *))func,
(void *)arg
);
pthread_detach(th); // 设置为 joinable,自动释放资源
return (long) th; // 线程 id 存放在 &th。
}

利用 pthread 库,创建新的线程,返回线程 id。此时新的线程,传入的 fun 是 函数 t_bootstrap,而参数 arg 是一个 bootstrap 结构体。

1
2
3
4
5
6
7
struct bootstate {
PyInterpreterState *interp;
PyObject *func;
PyObject *args;
PyObject *keyw;
PyThreadState *tstate;
};

线程开始执行,就是执行 t_bootstrap(bootstrap)。

t_bootstrap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static void
t_bootstrap(void *boot_raw)
{
struct bootstate *boot = (struct bootstate *) boot_raw;
PyThreadState *tstate;
PyObject *res;

tstate = boot->tstate;
tstate->thread_id = PyThread_get_thread_ident();
_PyThreadState_Init(tstate);
PyEval_AcquireThread(tstate); // 竞争 GIL
nb_threads++;
res = PyEval_CallObjectWithKeywords( // 执行函数
boot->func, boot->args, boot->keyw);
...
PyMem_DEL(boot_raw); // 销毁线程
nb_threads--;
PyThreadState_Clear(tstate);
PyThreadState_DeleteCurrent();
PyThread_exit_thread();
}

PyEval_AcquireThread

1
2
3
4
5
6
7
8
9
10
11
12
// ceval.c.272
void
PyEval_AcquireThread(PyThreadState *tstate)
{
assert tstate != NULL
/* Check someone has called PyEval_InitThreads() to create the lock */
assert(gil_created());
take_gil(tstate);
if (PyThreadState_Swap(tstate) != NULL)
Py_FatalError(
"PyEval_AcquireThread: non-NULL old thread state");
}

很明显,在take_gil中进入等待 GIL。获得 GIL 后,执行PyEval_CallObjectWithKeywords,执行相应用户代码。

线程切换

EvalFrameDefault

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
PyObject *
_PyEval_EvalFrameDefault(PyFrameObject *f, int throwflag){
...
for (;;) {
if (_Py_atomic_load_relaxed(&eval_breaker)) {
if (_Py_OPCODE(*next_instr) == SETUP_FINALLY ||
_Py_OPCODE(*next_instr) == YIELD_FROM) {
goto fast_next_opcode;
}
if (_Py_atomic_load_relaxed(&pendingcalls_to_do)) {
if (Py_MakePendingCalls() < 0)
goto error;
}

if (_Py_atomic_load_relaxed(&gil_drop_request)) {
/* Give another thread a chance */
if (PyThreadState_Swap(NULL) != tstate)
Py_FatalError("ceval: tstate mix-up");
drop_gil(tstate);

/* Other threads may run now */
take_gil(tstate);

/* Check if we should make a quick exit. */
if (_Py_Finalizing && _Py_Finalizing != tstate) {
drop_gil(tstate);
PyThread_exit_thread();
}

if (PyThreadState_Swap(tstate) != NULL)
Py_FatalError("ceval: orphan tstate");
}
}
...
}
}

如上,当检测到 eval_breaker、gil_drop_request 时,会被动的释放 GIL,跟其他线程一起再次竞争 GIL。从源码中还能发现,遇到SETUP_FINALLY、YIELD_FROM是不会被调度的。

pysleep

1
2
3
4
5
6
7
8
9
// timemodule.c.224
static PyObject * time_sleep(PyObject *self, PyObject *obj)
{
_PyTime_t secs; // typedef int64_t _PyTime_t;
...
if (pysleep(secs) != 0)
return NULL;
...
}

用户可以通过 time.sleep() 主动挂起线程,释放GIL。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static int pysleep(_PyTime_t secs)
{
_PyTime_t deadline, monotonic;
struct timeval timeout;
int err = 0;
deadline = _PyTime_GetMonotonicClock() + secs;
do {
if (_PyTime_AsTimeval(secs, &timeout, _PyTime_ROUND_CEILING) < 0)
return -1;
/* Linux上,通过宏 保存 ThreadState 状态,释放 GIL */
Py_BEGIN_ALLOW_THREADS
// 调用 select,利用 timeout 参数,实现挂起线程
err = select(0, (fd_set *)0, (fd_set *)0, (fd_set *)0, &timeout);
// 宏调用 PyEval_RestoreThread,竞争 GIL
Py_END_ALLOW_THREADS

/* sleep was interrupted by SIGINT */
if (PyErr_CheckSignals())
return -1;

monotonic = _PyTime_GetMonotonicClock();
secs = deadline - monotonic;
if (secs < 0)
break;
/* retry with the recomputed delay */
} while (1);

return 0;
}

如上,pysleep是一个区分平台的函数。上面只列出了 Linux 下的代码,利用宏,实现 GIL 的释放和再次申请。

线程销毁

1
2
3
4
5
6
7
8
res = PyEval_CallObjectWithKeywords(    // 执行函数
boot->func, boot->args, boot->keyw);
...
PyMem_DEL(boot_raw);
nb_threads--;
PyThreadState_Clear(tstate);
PyThreadState_DeleteCurrent();
PyThread_exit_thread();

子线程的整个生命周期都在t_bootstrap函数中,一旦函数执行完成,将会进入线程销毁环节。

1
2
3
4
5
6
7
8
9
void PyThreadState_Clear(PyThreadState *tstate)
{
if (Py_VerboseFlag && tstate->frame != NULL)
fprintf(stderr,
"PyThreadState_Clear: warning: thread still has a frame\n");
Py_CLEAR(tstate->frame);
Py_CLEAR(tstate->dict);
...
}

清理当前线程持有的对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void PyThreadState_DeleteCurrent()
{
PyThreadState *tstate = GET_TSTATE();
tstate_delete_common(tstate); // 调整链表
if (autoInterpreterState && PyThread_get_key_value(autoTLSkey) == tstate)
PyThread_delete_key_value(autoTLSkey);
SET_TSTATE(NULL);
PyEval_ReleaseLock(); // 释放 GIL
}
static void
tstate_delete_common(PyThreadState *tstate)
{
PyInterpreterState *interp;
interp = tstate->interp;
HEAD_LOCK(); // PyThread_acquire_lock(head_mutex, WAIT_LOCK)
if (tstate->prev)
tstate->prev->next = tstate->next;
else
interp->tstate_head = tstate->next;
if (tstate->next)
tstate->next->prev = tstate->prev;
HEAD_UNLOCK(); // PyThread_release_lock(head_mutex)
if (tstate->on_delete != NULL) {
tstate->on_delete(tstate->on_delete_data);
}
PyMem_RawFree(tstate);
}

PyEval_ReleaseLock(void)
{
drop_gil((PyThreadState*)_Py_atomic_load_relaxed(
&_PyThreadState_Current));
}

调整 ThreadState 链表,删除当前 tstate,释放内存。完事后释放 GIL,结束整个线程。

GIL 总结

1
2
3
4
5
6
7
8
9
10
11
12
13
14
typedef struct _Py_atomic_int {
atomic_int _value;
} _Py_atomic_int;

static _Py_atomic_int gil_locked = {-1};
static _Py_atomic_int gil_drop_request = {0};
static _Py_atomic_int eval_breaker = {0};
static _Py_atomic_int pendingcalls_to_do = {0};

static long main_thread = 0;
static PyThread_type_lock pending_lock = 0; /* for pending calls */

static COND_T gil_cond;
static MUTEX_T gil_mutex;

源码参见ceval_gil.h

  • GIL 实际上是一个 原子变量 gil_locked,初始为 -1,表示未启用
  • 互斥锁 gil_mutex 保护修改动作
  • 条件变量 gil_cond 用于等待 GIL
  • 持有 GIL 的线程执行 PyEval_EvalFrameEx 期间,必须能够接收到其他线程发送的释放请求
  • 每次循环都将检查变量 gil_drop_request/eval_breaker
  • 部分指令如’YIELD_FROM’会忽略释放请求
  • 想要申请 GIL 的线程,必须在条件变量 cil_cond 上等待,直至超时
  • 超时时间可通过sys.{get,set}switchinterval()进行获取、修改
  • 等待超时且未释放,会设置 gil_drop_request/eval_breaker = 1
  • 子线程获取到 GIL 后,会在 t_bootstrap 函数内,完成其整个生命周期

Threading

1
2
3
4
5
6
7
8
9
10
11
# Rename some stuff so "from threading import *" is safe
_start_new_thread = _thread.start_new_thread
_allocate_lock = _thread.allocate_lock
_set_sentinel = _thread._set_sentinel
get_ident = _thread.get_ident
ThreadError = _thread.error

_active_limbo_lock = _allocate_lock()
_active = {} # maps thread id to Thread object
_limbo = {}
_dangling = WeakSet()

Threading 是线程标准库,通过源码可以发现,它纯粹就是对 _thread.c 的封装,甚至是直接改了个名。

Class Thread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Thread:
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None):

self._target = target
_dangling.add(self)
....
def start(self):
with _active_limbo_lock:
_limbo[self] = self // 记录到 limbo
_start_new_thread(self._bootstrap, ())
...
def _bootstrap(self):
self._bootstrap_inner()
def _bootstrap_inner(self):
with _active_limbo_lock:
_active[self._ident] = self
del _limbo[self] // 从 limbo 移动到 active
...
self.run()
def run(self):
self._target(*self._args, **self._kwargs)

查看 Thread 类可以发现,一层层嵌套,最终还是用 _thread.start_new_thread来创建新的线程,换汤不换药。

API

1
2
3
4
5
def active_count():
with _active_limbo_lock:
return len(_active) + len(_limbo)
def current_thread():
return _active[get_ident()]

通过源码,能发现。threading 将已经 start,但暂未启动原生线程的放入了 limbo中。并且,只要放入了 limbo 就算是已经 active。对两个字典 active/limbo 的操作,都在 _thread.allocate_lock 的保护下。

另外,threading 模块还提供了一些其他线程间同步工具,例如递归锁RLock、条件变量Condition、信号量Semaphore、事件Event。