13进程与线程

13进程与线程

_

注:学习资料来源<尚硅谷>

视频:点击跳转

代码仓库:pythonStudy

并发编程是现代软件开发中的基础能力之一。无论是后台服务的高吞吐处理、爬虫任务的批量调度、数据计算任务的并行执行,还是日志采集、文件复制、消息传递等场景,都离不开进程与线程相关知识。理解并发、并行、同步、异步、进程、线程、锁、通信机制以及进程池和线程池,不仅有助于编写性能更高的程序,也能帮助规避死锁、竞争条件、资源浪费等常见问题。

本文围绕 Python 中的进程与线程展开,系统梳理基础概念、创建方式、控制手段、通信模型、池化执行机制、GIL 的本质以及多进程与多线程的选型策略。内容以 multiprocessingthreadingconcurrent.futures 为主线,同时辅以 Mermaid 图示帮助理解执行模型与数据流转过程。


一、并发编程中的几个核心概念

1. 并发 vs 并行

并发(Concurrency)描述的是多个任务在一段时间内交替推进。其核心并不在于“同一时刻同时执行”,而在于“多个任务都在前进”。单核 CPU 在面对多个任务时,通常会通过时间片轮转、上下文切换等机制,让不同任务轮流获得执行机会,因此从宏观上看像是在同时运行。

并行(Parallelism)则强调真正意义上的“同一时刻多个任务一起执行”。这通常依赖多核 CPU 或多处理器系统。每个核心可以独立执行一个任务,因此多个任务在物理层面确实是同时进行的。

现代操作系统中,并发与并行往往同时存在。操作系统既会调度多个任务,也会借助多核硬件让部分任务真正并行运行。

Mermaid:并发与并行示意图

flowchart TD A[多个任务] --> B{执行方式} B --> C[并发 Concurrency] B --> D[并行 Parallelism] C --> C1[单核或少量核心] C --> C2[任务交替推进] C --> C3[某一瞬间通常只有一个任务在CPU上执行] D --> D1[多核或多CPU] D --> D2[多个任务同一时刻执行] D --> D3[真正意义上的同时运行]

简要理解

  • 并发强调“调度和组织”

  • 并行强调“物理同时执行”

  • 并发不一定并行

  • 并行通常包含并发


2. 同步 vs 异步

同步(Synchronous)描述的是任务之间的等待关系。发起一个任务后,调用方必须等待该任务完成,才能继续执行后续逻辑。同步的典型表现是当前执行流被阻塞。

异步(Asynchronous)则表示发起任务后,不必等待其完成,就可以继续做其他事情。等任务完成后,再通过回调、Future、事件通知等方式获取执行结果。异步的本质是“调用与完成解耦”。

需要特别区分的是:

  • 并发 / 并行:关注多个任务“怎么执行”

  • 同步 / 异步:关注多个任务“怎么等待”

也就是说,二者不是一组互斥概念,而是两个维度。一个程序可以是“并发 + 同步”,也可以是“并发 + 异步”,还可以是“并行 + 同步”等多种组合。

Mermaid:同步与异步的执行关系

sequenceDiagram participant Caller as 调用方 participant Task as 任务 rect rgb(245,245,245) Note over Caller,Task: 同步 Caller->>Task: 发起任务 Task-->>Caller: 返回结果 Caller->>Caller: 继续后续逻辑 end rect rgb(235,245,255) Note over Caller,Task: 异步 Caller->>Task: 发起任务 Caller->>Caller: 立即继续执行其他逻辑 Task-->>Caller: 完成后通知/返回结果 end

一个容易混淆的关键点

CPU 核心数的多少,不会改变任务之间的逻辑依赖。若任务 2 必须依赖任务 1 的输出,那么即便拥有再多核心,任务 2 也不能越过任务 1 提前执行。这说明同步 / 异步属于程序设计层面的关系,而不是硬件速度可以直接改变的关系。


3. 进程 vs 线程

进程(Process)是操作系统进行资源分配的基本单位。一个运行中的程序,背后至少对应一个进程。每个进程通常拥有独立的虚拟地址空间、文件描述符、内核资源等。

线程(Thread)是进程内部的执行单元,也是操作系统进行 CPU 调度的基本单位。一个进程可以包含多个线程,这些线程共享进程的大部分资源,例如代码段、堆内存、全局变量、打开的文件等。

二者的典型差异

维度

进程

线程

资源分配

基本单位

不负责独立资源分配

调度单位

间接参与

直接调度单位

内存空间

独立

同进程内共享

创建成本

较高

较低

通信难度

相对较高

相对简单

稳定性

一个进程崩溃通常不影响其他进程

一个线程异常可能影响整个进程

Mermaid:进程与线程关系图

flowchart TB P[一个进程] --> T1[线程1 主线程] P --> T2[线程2] P --> T3[线程3] P --> R1[代码段] P --> R2[堆内存] P --> R3[全局变量] P --> R4[文件资源] T1 -.共享.-> R1 T2 -.共享.-> R2 T3 -.共享.-> R3


二、主进程、子进程与 PID

在操作系统中,每个进程都有唯一的进程标识符,即 PID(Process ID)。主进程与子进程是相对概念:如果进程 A 创建了进程 B,那么 A 是 B 的父进程,B 是 A 的子进程。若 B 再创建 C,则 B 又成为 C 的父进程。

在 Python 中,可以通过 os.getpid() 获取当前进程 ID,通过 os.getppid() 获取父进程 ID。

 import os
 ​
 print("当前进程ID:", os.getpid())
 print("父进程ID:", os.getppid())

在 Windows 环境下,还可以通过以下命令查看进程名、父进程 ID 与进程 ID:

 wmic process get Name,ParentProcessId,ProcessId

三、使用 Process 创建进程

Python 标准库 multiprocessing 提供了 Process 类,用于创建独立子进程。通过 target 指定子进程执行的函数,通过 start() 启动进程。

最基础的示例如下:

 import os
 import time
 from multiprocessing import Process
 ​
 def speak():
     for index in range(5):
         print(f'说话任务 {index}, pid={os.getpid()}, ppid={os.getppid()}')
         time.sleep(1)
 ​
 def study():
     for index in range(5):
         print(f'学习任务 {index}, pid={os.getpid()}, ppid={os.getppid()}')
         time.sleep(1)
 ​
 if __name__ == '__main__':
     print('主进程开始执行')
 ​
     p1 = Process(target=speak)
     p2 = Process(target=study)
 ​
     p1.start()
     p2.start()
 ​
     print('主进程继续向下执行')

为什么必须写 if __name__ == '__main__':

这是 multiprocessing 初学阶段最容易忽略、但最重要的规则之一,尤其是在 Windows 系统上。

原因在于:创建子进程时,Python 并不是简单地把父进程中的函数对象直接复制过去,而是会启动一个新的 Python 解释器,并重新执行当前模块。如果没有 if __name__ == '__main__': 的保护,模块顶层代码会被无限重复执行,最终导致递归创建子进程。

Mermaid:Windows 下子进程启动逻辑

flowchart LR A[主进程执行主模块] --> B[创建Process对象] B --> C[start] C --> D[启动新的Python解释器] D --> E[重新导入并执行当前模块] E --> F{是否有main保护?} F -->|有| G[仅执行受保护代码块] F -->|无| H[再次创建子进程] H --> I[无限递归创建]


四、Process 的常用参数

Process 在实例化时支持多个参数:

  • group:通常保持 None

  • target:子进程执行的可调用对象

  • name:进程名称

  • args:位置参数元组

  • kwargs:关键字参数字典

  • daemon:是否为守护进程

下面给出一个更完整的例子:

 import os
 import time
 from multiprocessing import Process, current_process
 ​
 def speak(a, b, msg):
     for index in range(3):
         print(
             f'{msg} | a={a} | b={b} | 进程名={current_process().name} '
             f'| pid={os.getpid()} | ppid={os.getppid()} | index={index}'
         )
         time.sleep(1)
 ​
 def study():
     for index in range(3):
         print(f'学习任务 index={index}, pid={os.getpid()}, ppid={os.getppid()}')
         time.sleep(1)
 ​
 if __name__ == '__main__':
     p1 = Process(
         target=speak,
         name='SpeechProcess',
         args=(666, 888),
         kwargs={'msg': 'demo'}
     )
     p2 = Process(target=study)
 ​
     p1.start()
     p2.start()

五、进程控制:锁、等待、终止与守护机制

进程不仅可以被创建,还需要被安全地控制。典型控制手段包括进程锁、join()terminate() 和守护进程。


1. Lock:进程锁

当多个进程竞争同一份共享资源时,若不加控制,极容易发生输出交叉、数据错乱、状态不一致等问题。这类被多个执行单元共享的代码区域称为临界区(Critical Section)。

multiprocessing.Lock 可以保证同一时刻只有一个进程进入临界区。

import time
from multiprocessing import Process, Lock

def task1(lock):
    for _ in range(3):
        with lock:
            print('task1 -> A B C D')
        time.sleep(1)

def task2(lock):
    for _ in range(3):
        with lock:
            print('task2 -> 1 2 3 4')
        time.sleep(1)

if __name__ == '__main__':
    lock = Lock()

    p1 = Process(target=task1, args=(lock,))
    p2 = Process(target=task2, args=(lock,))

    p1.start()
    p2.start()

推荐使用 with lock

相较于手动调用 acquire()release(),上下文管理器写法更安全。即使临界区内抛出异常,也能自动释放锁,从而减少死锁风险。


2. RLock:可重入锁

普通 Lock 不支持同一个执行流重复加锁。如果在同一进程、同一线程里对同一把锁连续 acquire() 两次,而只持有一把普通锁,就可能导致自我阻塞。

这时应使用 RLock(Reentrant Lock,可重入锁)。

import time
from multiprocessing import Process, RLock

def task(lock):
    for _ in range(3):
        lock.acquire()
        lock.acquire()
        try:
            print('可重入锁示例:连续加锁两次仍可正常执行')
        finally:
            lock.release()
            lock.release()
        time.sleep(1)

if __name__ == '__main__':
    lock = RLock()
    p = Process(target=task, args=(lock,))
    p.start()
    p.join()

3. join():等待子进程结束

join() 的作用是阻塞当前进程,直到目标进程执行结束。

import time
from multiprocessing import Process

def work():
    for i in range(5):
        print(f'子进程执行中: {i}')
        time.sleep(1)

if __name__ == '__main__':
    p = Process(target=work)
    p.start()

    p.join(2)
    print('主进程等待2秒后继续执行')

注意事项

  • join() 必须在 start() 之后调用

  • p.join() 的含义不是“让 p 等待”,而是“让当前执行 join() 的进程等待 p”

  • join(timeout) 超时后不会终止子进程,只是当前进程不再继续等待


4. terminate():强制终止进程

当需要立即停止某个子进程时,可以调用 terminate()。这是一种强制终止机制,通常不会保证子进程执行清理逻辑,因此要谨慎使用。

import time
from multiprocessing import Process

def work():
    try:
        for i in range(10):
            print(f'工作中: {i}')
            time.sleep(1)
    finally:
        print('finally 清理逻辑')

if __name__ == '__main__':
    p = Process(target=work)
    p.start()

    time.sleep(3)
    p.terminate()
    p.join()

    print('进程是否存活:', p.is_alive())

这里需要特别注意:被 terminate() 强制结束后,finally 中的清理逻辑通常不会执行。


5. 守护进程(Daemon)

守护进程是一种依附于主进程存在的子进程。一旦主进程退出,守护进程会被自动结束。守护进程适合处理监控、采样、日志汇总、状态巡检等辅助型任务。

import os
import time
from multiprocessing import Process

def monitor():
    while True:
        print(f'监控进程运行中, pid={os.getpid()}')
        time.sleep(1)

def business():
    for i in range(5):
        print(f'业务进程执行: {i}')
        time.sleep(1)

if __name__ == '__main__':
    p1 = Process(target=monitor, daemon=True)
    p2 = Process(target=business)

    p1.start()
    p2.start()

    p2.join()
    print('主进程结束')

守护进程的重要约束

  • 守护进程必须是子进程

  • 主进程结束时,守护进程随之结束

  • 守护进程中不允许再创建新的子进程

  • daemon 必须在 start() 之前设置


六、进程之间为什么不共享变量

进程拥有独立的内存空间,因此普通变量不会在多个进程之间自动共享。这是进程与线程最根本的区别之一。

下面是一个验证示例:

from multiproce

Python

from multiprocessing import Process

num = 100
names = []

def test1():
    global num, names
    num += 10
    names.append('张三')
    print('test1:', num, names)

def test2():
    global num, names
    num -= 10
    names.append('李四')
    print('test2:', num, names)

if __name__ == '__main__':
    p1 = Process(target=test1)
    p2 = Process(target=test2)

    p1.start()
    p2.start()
    p1.join()
    p2.join()

    print('main:', num, names)
即使在子进程中修改了 num 和 names,主进程中的变量也不会随之变化。这是因为子进程操作的是各自独立的内存副本。

Mermaid:进程变量不共享ssing import Process

num = 100
names = []

def test1():
    global num, names
    num += 10
    names.append('张三')
    print('test1:', num, names)

def test2():
    global num, names
    num -= 10
    names.append('李四')
    print('test2:', num, names)

if __name__ == '__main__':
    p1 = Process(target=test1)
    p2 = Process(target=test2)

    p1.start()
    p2.start()
    p1.join()
    p2.join()

    print('main:', num, names)

即使在子进程中修改了 numnames,主进程中的变量也不会随之变化。这是因为子进程操作的是各自独立的内存副本。

Mermaid:进程变量不共享

flowchart LR A["主进程 num=100 names=[]"] --> B["子进程1 副本"] A --> C["子进程2 副本"] B --> B1["num=110 names=['张三']"] C --> C1["num=90 names=['李四']"] A --> A1["主进程仍为 num=100 names=[]"]

这也是为什么多进程程序往往需要专门的通信机制,例如 Queue、Pipe、共享内存、Manager 等。


七、Queue:队列与进程通信

1. 队列的基本特性

multiprocessing.Queue 是跨进程安全的数据结构,遵循先进先出(FIFO)原则。由于它具备进程安全与阻塞语义,因此是最常见的进程间通信方式之一。

常用方法包括:

  • put():入队

  • get():出队

  • empty():是否为空

  • full():是否已满

  • qsize():元素个数

  • put_nowait():非阻塞入队

  • get_nowait():非阻塞出队

基础示例:

from multiprocessing import Queue

q = Queue(3)

q.put(10)
q.put(20)
q.put(30)

print(q.full())     # True
print(q.get())      # 10
print(q.get())      # 20
print(q.get())      # 30
print(q.empty())    # True

2. 队列的等待模式

当队列已满时,继续 put() 会阻塞;当队列为空时,继续 get() 也会阻塞。

import time
from multiprocessing import Process, Queue

def consumer(q):
    time.sleep(3)
    value = q.get()
    print('消费者取出:', value)

if __name__ == '__main__':
    q = Queue(2)
    q.put('A')
    q.put('B')

    p = Process(target=consumer, args=(q,))
    p.start()

    print('队列已满,准备继续放入...')
    q.put('C')   # 阻塞,直到consumer取走一个元素
    print('放入完成')

3. 使用 Queue 实现进程通信

一个进程负责生产数据,另一个进程负责消费数据,是最经典的生产者-消费者模型。

import time
from multiprocessing import Process, Queue

def producer(q):
    for i in range(5):
        print(f'生产者放入: {i}')
        q.put(i)
        time.sleep(0.5)

def consumer(q):
    for _ in range(5):
        data = q.get()
        print(f'消费者取出: {data}')
        time.sleep(1)

if __name__ == '__main__':
    q = Queue()

    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=consumer, args=(q,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

Mermaid:Queue 通信模型

flowchart LR P1[生产者进程] -->|put| Q[Queue] Q -->|get| P2[消费者进程]


八、Pipe:管道通信

Pipe 提供了另一种轻量级进程间通信方式,可以理解为一条双端连接的“数据管道”。Pipe() 返回两个连接对象,两端分别负责发送和接收。

import time
from multiprocessing import Process, Pipe

def sender(conn):
    time.sleep(2)
    conn.send(100)
    print('发送方已发送 100')

def receiver(conn):
    data = conn.recv()
    print('接收方收到:', data)

if __name__ == '__main__':
    conn1, conn2 = Pipe(duplex=False)

    p1 = Process(target=sender, args=(conn1,))
    p2 = Process(target=receiver, args=(conn2,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

Mermaid:Pipe 通信模型

flowchart LR A[发送进程] -->|send| B[Pipe] B -->|recv| C[接收进程]

Pipe 与 Queue 的区别简述

  • Pipe 更偏向点对点通信

  • Queue 更适合多生产者、多消费者场景

  • Queue 自带 FIFO 语义,更适合任务流转

  • Pipe 在简单双向通信中更直接


九、继承 Process 类创建进程

当子进程逻辑较复杂,或者希望将“任务行为 + 参数状态”封装成对象时,继承 Process 是更清晰的方式。关键点在于重写 run() 方法,并通过 start() 启动,而不是直接调用 run()

import os
import time
from multiprocessing import Process

class SpeakProcess(Process):
    def __init__(self, a, b, **kwargs):
        super().__init__(**kwargs)
        self.a = a
        self.b = b

    def run(self):
        for index in range(3):
            print(f'SpeakProcess: a={self.a}, b={self.b}, pid={os.getpid()}, index={index}')
            time.sleep(1)

class StudyProcess(Process):
    def run(self):
        for index in range(3):
            print(f'StudyProcess: pid={os.getpid()}, index={index}')
            time.sleep(1)

if __name__ == '__main__':
    p1 = SpeakProcess(100, 200)
    p2 = StudyProcess()

    p1.start()
    p2.start()

    p1.join()
    p2.join()

这种方式体现了面向对象风格,适用于复杂任务封装、定制化生命周期处理以及可维护性要求较高的代码结构。


十、进程池:高效管理大量进程任务

频繁创建和销毁进程的成本较高。当任务量很大时,逐个创建进程会造成明显的系统资源浪费。进程池的核心思想是:预先创建固定数量的工作进程,后续任务复用这些进程执行。

Python 中通常使用 concurrent.futures.ProcessPoolExecutor


1. 提交任务与关闭进程池

import os
import time
from concurrent.futures import ProcessPoolExecutor

def work(n):
    print(f'任务 {n} 执行中, pid={os.getpid()}')
    time.sleep(1)

if __name__ == '__main__':
    executor = ProcessPoolExecutor(max_workers=3)

    for i in range(1, 8):
        executor.submit(work, i)

    executor.shutdown(wait=True)
    print('所有任务执行完成')

2. 获取返回结果

submit() 返回 Future 对象,可通过 result() 获取任务返回值。

import os
import time
from concurrent.futures import ProcessPoolExecutor

def work(n):
    print(f'任务 {n} 执行中, pid={os.getpid()}')
    time.sleep(1)
    return f'结果-{n}'

if __name__ == '__main__':
    with ProcessPoolExecutor(3) as executor:
        futures = [executor.submit(work, i) for i in range(1, 8)]

    for f in futures:
        print(f.result())

3. as_completed():按完成顺序取结果

import os
import time
from concurrent.futures import ProcessPoolExecutor, as_completed

def work(n):
    print(f'任务 {n} 执行中, pid={os.getpid()}')
    if n == 1:
        time.sleep(5)
    elif n == 2:
        time.sleep(3)
    else:
        time.sleep(1)
    return f'结果-{n}'

if __name__ == '__main__':
    with ProcessPoolExecutor(3) as executor:
        futures = [executor.submit(work, i) for i in range(1, 8)]
        result_list = [f.result() for f in as_completed(futures)]

    print(result_list)

这里的结果顺序反映的是“哪个任务先完成”,而不是“哪个任务先提交”。


4. add_done_callback():任务完成回调

from concurrent.futures import ProcessPoolExecutor
import time
import os

result_list = []

def work(n):
    print(f'任务 {n} 执行中, pid={os.getpid()}')
    time.sleep(1)
    return f'结果-{n}'

def done_callback(f):
    result_list.append(f.result())

if __name__ == '__main__':
    with ProcessPoolExecutor(3) as executor:
        for i in range(1, 8):
            future = executor.submit(work, i)
            future.add_done_callback(done_callback)

    print(result_list)

5. map():批量提交任务

import os
import time
from concurrent.futures import ProcessPoolExecutor

def work(n):
    print(f'任务 {n} 执行中, pid={os.getpid()}')
    time.sleep(1)
    return f'结果-{n}'

if __name__ == '__main__':
    with ProcessPoolExecutor(3) as executor:
        results = executor.map(work, [1, 2, 3, 4, 5, 6, 7])
        print(list(results))

map() 会批量提交任务,并按提交顺序返回结果。需要注意,真正的阻塞发生在迭代结果时,而不是调用 map() 的瞬间。

Mermaid:进程池任务分发示意

flowchart TB A[任务提交端] --> B[ProcessPoolExecutor] B --> P1[工作进程1] B --> P2[工作进程2] B --> P3[工作进程3] P1 --> R[结果集合] P2 --> R P3 --> R


十一、线程的本质与创建方式

任何一个正在运行的 Python 程序,至少都有一个线程,即主线程。线程是进程中的执行单元,多个线程共享进程的资源,因此创建成本比进程低很多。


1. 使用 Thread 创建线程

import os
import time
from threading import Thread, RLock, get_native_id

def speak(lock):
    for index in range(5):
        with lock:
            print(f'说话线程 index={index}, pid={os.getpid()}, tid={get_native_id()}')
        time.sleep(1)

def study(lock):
    for index in range(5):
        with lock:
            print(f'学习线程 index={index}, pid={os.getpid()}, tid={get_native_id()}')
        time.sleep(1)

if __name__ == '__main__':
    print(f'主线程启动 pid={os.getpid()}, tid={get_native_id()}')

    lock = RLock()
    t1 = Thread(target=speak, args=(lock,))
    t2 = Thread(target=study, args=(lock,))

    t1.start()
    t2.start()

    t1.join()
    t2.join()

    print('主线程结束')

2. 继承 Thread 创建线程

与继承 Process 类似,线程也可以通过继承 Thread 实现更强的封装性。

import os
import time
from threading import Thread, RLock, get_native_id

class SpeakThread(Thread):
    def __init__(self, lock, **kwargs):
        super().__init__(**kwargs)
        self.lock = lock

    def run(self):
        for index in range(5):
            with self.lock:
                print(f'SpeakThread index={index}, pid={os.getpid()}, tid={get_native_id()}')
            time.sleep(1)

class StudyThread(Thread):
    def __init__(self, lock, **kwargs):
        super().__init__(**kwargs)
        self.lock = lock

    def run(self):
        for index in range(5):
            with self.lock:
                print(f'StudyThread index={index}, pid={os.getpid()}, tid={get_native_id()}')
            time.sleep(1)

if __name__ == '__main__':
    lock = RLock()

    t1 = SpeakThread(lock)
    t2 = StudyThread(lock)

    t1.start()
    t2.start()

    t1.join()
    t2.join()

十二、线程池:轻量级并发任务的高效执行器

线程池与进程池在使用方式上几乎完全一致,只是执行单元从进程变成线程。Python 中通常使用 ThreadPoolExecutor


1. 提交任务

import time
from threading import RLock, get_native_id
from concurrent.futures import ThreadPoolExecutor

def work(n, lock):
    with lock:
        print(f'任务 {n} 执行中, tid={get_native_id()}')
    time.sleep(1)

if __name__ == '__main__':
    lock = RLock()
    executor = ThreadPoolExecutor(max_workers=3)

    for i in range(1, 8):
        executor.submit(work, i, lock)

    executor.shutdown(wait=True)

2. 获取返回结果

import time
from threading import RLock, get_native_id
from concurrent.futures import ThreadPoolExecutor

def work(n, lock):
    with lock:
        print(f'任务 {n} 执行中, tid={get_native_id()}')
    time.sleep(1)
    return f'线程任务结果-{n}'

if __name__ == '__main__':
    lock = RLock()
    with ThreadPoolExecutor(3) as executor:
        futures = [executor.submit(work, i, lock) for i in range(1, 8)]

    for f in futures:
        print(f.result())

3. 按完成顺序收集结果

import time
from threading import RLock, get_native_id
from concurrent.futures import ThreadPoolExecutor, as_completed

def work(n, lock):
    with lock:
        print(f'任务 {n} 执行中, tid={get_native_id()}')
    if n == 1:
        time.sleep(5)
    elif n == 2:
        time.sleep(3)
    else:
        time.sleep(1)
    return f'线程任务结果-{n}'

if __name__ == '__main__':
    lock = RLock()
    with ThreadPoolExecutor(3) as executor:
        futures = [executor.submit(work, i, lock) for i in range(1, 8)]
        result_list = [f.result() for f in as_completed(futures)]

    print(result_list)

4. 使用 map() 批量提交线程任务

import time
from threading import RLock, get_native_id
from concurrent.futures import ThreadPoolExecutor

def work(n, lock):
    with lock:
        print(f'任务 {n} 执行中, tid={get_native_id()}')
    time.sleep(1)
    return f'线程任务结果-{n}'

if __name__ == '__main__':
    lock = RLock()
    with ThreadPoolExecutor(3) as executor:
        result = executor.map(work, [1, 2, 3, 4, 5, 6, 7], [lock] * 7)
        print(list(result))

十三、GIL:Python 多线程为何难以真正并行

GIL,全称 Global Interpreter Lock,即全局解释器锁。它是 CPython 解释器中的一把互斥锁,其核心作用是:在同一个进程中,任意时刻只允许一个线程执行 Python 字节码。

这意味着,在 CPython 下,多线程对于纯 Python 的 CPU 密集型任务,通常无法利用多核实现真正并行,而表现为高频切换意义上的并发。

GIL 的存在原因

GIL 的设计初衷是保证解释器层面的数据安全。CPython 使用引用计数进行内存管理,如果没有一把全局锁,多个线程同时操作对象引用计数,极易导致底层状态不一致,进而触发严重内存问题。

GIL 何时释放

  • 遇到 I/O 操作时,线程可能主动释放 GIL

  • 时间片耗尽时,解释器可能强制切换线程

因此,多线程在 I/O 密集型场景中仍然非常有效,因为线程在等待磁盘、网络、数据库等外部资源时,不会一直占用执行机会。

Mermaid:GIL 对线程执行的约束

flowchart TB P[一个Python进程] --> G[GIL] G --> T1[线程1] G --> T2[线程2] G --> T3[线程3] Note1[同一时刻仅一个线程执行Python字节码]


十四、GIL 与 Lock / RLock 不是同一种锁

这是并发编程中的高频误区。

GIL 是解释器级别的锁,用于保证 CPython 内部状态安全。业务代码几乎无法直接“管理”它。

LockRLock 则是业务层面的同步工具,用来保护临界区,确保共享资源访问顺序正确,例如避免输出交错、重复卖票、余额超扣等问题。

下面给出一个典型的线程加锁示例:防止多个窗口卖出同一张票。

import time
from threading import Thread, RLock, current_thread

current = 1

def sale(lock):
    global current
    while True:
        with lock:
            if current <= 20:
                print(f'{current_thread().name} 售出了第 {current} 张票')
                current += 1
            else:
                print('票已售空')
                break
        time.sleep(0.2)

if __name__ == '__main__':
    lock = RLock()

    t1 = Thread(target=sale, name='窗口1', args=(lock,))
    t2 = Thread(target=sale, name='窗口2', args=(lock,))
    t3 = Thread(target=sale, name='窗口3', args=(lock,))

    t1.start()
    t2.start()
    t3.start()

如果没有这把业务锁,就可能出现多个线程同时读取到相同票号,从而产生重复出售的逻辑错误。


十五、多进程 vs 多线程:如何选择

这是并发编程中的实际落地问题。没有绝对更优的方案,只有更匹配场景的方案。

1. CPU 密集型任务:优先多进程

CPU 密集型任务的特点是计算量大,例如图像处理、数值计算、机器学习预处理、大规模数据变换等。这类任务的瓶颈在 CPU 运算本身。

在 CPython 中,由于 GIL 的存在,多线程对纯计算任务通常无法显著提升性能,因此多进程更合适。多个进程可以真正利用多核 CPU。

import time
from concurrent.futures import ProcessPoolExecutor

def cpu_task(n):
    print(f'CPU任务 {n} 开始')
    total = 0
    for i in range(10_000_000):
        total += i * i
    return total

if __name__ == '__main__':
    start = time.time()

    with ProcessPoolExecutor(4) as executor:
        list(executor.map(cpu_task, [1, 2, 3, 4]))

    end = time.time() - start
    print(f'多进程总耗时: {end:.2f} 秒')

2. I/O 密集型任务:优先多线程

I/O 密集型任务的瓶颈不在 CPU,而在等待外部资源,例如文件读写、网络请求、数据库访问、接口调用等。线程在等待 I/O 时可以让出执行机会,因此多线程能够有效提升吞吐量。

 import time
 from concurrent.futures import ThreadPoolExecutor
 ​
 def copy_file(index):
     with open('a.zip', 'rb') as src, open(f'a_copy_{index}.zip', 'wb') as dst:
         while True:
             data = src.read(1024 * 1024)
             if not data:
                 break
             dst.write(data)
 ​
 if __name__ == '__main__':
     start = time.time()
 ​
     with ThreadPoolExecutor(4) as executor:
         for i in range(4):
             executor.submit(copy_file, i)
 ​
     end = time.time() - start
     print(f'多线程耗时: {end:.2f} 秒')

3. 一张决策表快速总结

场景

更推荐方案

原因

大量数值计算

多进程

可绕过 GIL,利用多核

文件读写

多线程

I/O 等待期间线程可切换

网络请求

多线程

高并发 I/O 场景更适配

强隔离需求

多进程

内存独立,稳定性更高

共享数据频繁

多线程

同进程共享内存更直接

任务非常轻量

多线程

创建成本更低

任务非常重且独立

多进程

并行能力更强

Mermaid:多进程与多线程选型流程图

flowchart TD A[并发任务类型判断] --> B{任务瓶颈是什么?} B -->|CPU计算| C[优先选择多进程] B -->|I/O等待| D[优先选择多线程] C --> C1[利用多核] C --> C2[绕过GIL限制] C --> C3[适合计算密集型] D --> D1[等待I/O时线程可切换] D --> D2[创建成本较低] D --> D3[适合网络/文件/数据库操作]


十六、面试与实战中的高频结论

进程与线程相关知识经常出现在技术面试和日常开发中,以下结论具有较高实用价值:

并发不等于并行。并发强调交替推进,并行强调同时执行。单核 CPU 可以实现并发,但无法实现真正的并行。

同步与异步描述的是任务之间的等待关系,而不是 CPU 是否多核。异步并不意味着任务已经完成,只意味着调用方不必阻塞等待。

进程是资源分配的基本单位,线程是 CPU 调度的基本单位。进程更重、线程更轻;进程隔离更强、线程共享更方便。

Python 中使用 multiprocessing 时,尤其在 Windows 下,必须写 if __name__ == '__main__':,否则极易出现递归创建子进程的问题。

多进程之间默认不共享普通变量。若要通信,应使用 QueuePipe 等进程间通信机制。

Lock 用于互斥访问共享资源,RLock 用于同一执行流下的可重入加锁场景。

join() 只是等待目标进程结束,不会终止进程。terminate() 才是强制终止,但可能跳过清理逻辑。

守护进程适合做陪跑型任务,例如监控、日志统计等,但会随着主进程退出而自动结束。

线程池和进程池适合处理批量任务,可以显著降低频繁创建/销毁执行单元的成本。

CPython 的 GIL 决定了多线程在 CPU 密集型任务中通常难以真正并行,因此 CPU 密集型优先考虑多进程,I/O 密集型优先考虑多线程。


十七、总结

进程与线程是操作系统调度模型与并发程序设计的基础。对 Python 而言,理解 multiprocessingthreading 的差异,不仅是语法层面的掌握,更关系到性能、稳定性、可维护性与正确性。

从概念层面看,并发、并行、同步、异步构成了并发编程的认知基础;从实践层面看,ProcessThreadLockRLockQueuePipe、进程池与线程池构成了 Python 并发编程的核心工具箱;从性能层面看,GIL 决定了多线程的适用边界,也直接影响多进程与多线程的选型策略。

实际工程中,选择多进程还是多线程,不应只看“能不能跑”,而应结合任务类型、资源共享方式、系统开销、稳定性要求和部署环境综合判断。能够清晰地区分这些概念,并根据业务特征选择合适模型,才是并发编程真正的核心能力。

如果需要,还可以继续扩展到以下方向:

  • 基于 multiprocessing.Manager 的共享对象

  • 基于 asyncio 的异步 I/O 编程

  • 线程安全队列与生产者-消费者模型进阶

  • 死锁形成条件与排查思路

  • Python 并发模型在 Web 服务、爬虫、数据处理中的落地实践

14协程 2026-04-26
12文件操作 2026-04-26

评论区