注:学习资料来源<尚硅谷>
视频:点击跳转
代码仓库:pythonStudy
并发编程是现代软件开发中的基础能力之一。无论是后台服务的高吞吐处理、爬虫任务的批量调度、数据计算任务的并行执行,还是日志采集、文件复制、消息传递等场景,都离不开进程与线程相关知识。理解并发、并行、同步、异步、进程、线程、锁、通信机制以及进程池和线程池,不仅有助于编写性能更高的程序,也能帮助规避死锁、竞争条件、资源浪费等常见问题。
本文围绕 Python 中的进程与线程展开,系统梳理基础概念、创建方式、控制手段、通信模型、池化执行机制、GIL 的本质以及多进程与多线程的选型策略。内容以 multiprocessing、threading 和 concurrent.futures 为主线,同时辅以 Mermaid 图示帮助理解执行模型与数据流转过程。
一、并发编程中的几个核心概念
1. 并发 vs 并行
并发(Concurrency)描述的是多个任务在一段时间内交替推进。其核心并不在于“同一时刻同时执行”,而在于“多个任务都在前进”。单核 CPU 在面对多个任务时,通常会通过时间片轮转、上下文切换等机制,让不同任务轮流获得执行机会,因此从宏观上看像是在同时运行。
并行(Parallelism)则强调真正意义上的“同一时刻多个任务一起执行”。这通常依赖多核 CPU 或多处理器系统。每个核心可以独立执行一个任务,因此多个任务在物理层面确实是同时进行的。
现代操作系统中,并发与并行往往同时存在。操作系统既会调度多个任务,也会借助多核硬件让部分任务真正并行运行。
Mermaid:并发与并行示意图
简要理解
并发强调“调度和组织”
并行强调“物理同时执行”
并发不一定并行
并行通常包含并发
2. 同步 vs 异步
同步(Synchronous)描述的是任务之间的等待关系。发起一个任务后,调用方必须等待该任务完成,才能继续执行后续逻辑。同步的典型表现是当前执行流被阻塞。
异步(Asynchronous)则表示发起任务后,不必等待其完成,就可以继续做其他事情。等任务完成后,再通过回调、Future、事件通知等方式获取执行结果。异步的本质是“调用与完成解耦”。
需要特别区分的是:
并发 / 并行:关注多个任务“怎么执行”
同步 / 异步:关注多个任务“怎么等待”
也就是说,二者不是一组互斥概念,而是两个维度。一个程序可以是“并发 + 同步”,也可以是“并发 + 异步”,还可以是“并行 + 同步”等多种组合。
Mermaid:同步与异步的执行关系
一个容易混淆的关键点
CPU 核心数的多少,不会改变任务之间的逻辑依赖。若任务 2 必须依赖任务 1 的输出,那么即便拥有再多核心,任务 2 也不能越过任务 1 提前执行。这说明同步 / 异步属于程序设计层面的关系,而不是硬件速度可以直接改变的关系。
3. 进程 vs 线程
进程(Process)是操作系统进行资源分配的基本单位。一个运行中的程序,背后至少对应一个进程。每个进程通常拥有独立的虚拟地址空间、文件描述符、内核资源等。
线程(Thread)是进程内部的执行单元,也是操作系统进行 CPU 调度的基本单位。一个进程可以包含多个线程,这些线程共享进程的大部分资源,例如代码段、堆内存、全局变量、打开的文件等。
二者的典型差异
Mermaid:进程与线程关系图
二、主进程、子进程与 PID
在操作系统中,每个进程都有唯一的进程标识符,即 PID(Process ID)。主进程与子进程是相对概念:如果进程 A 创建了进程 B,那么 A 是 B 的父进程,B 是 A 的子进程。若 B 再创建 C,则 B 又成为 C 的父进程。
在 Python 中,可以通过 os.getpid() 获取当前进程 ID,通过 os.getppid() 获取父进程 ID。
import os
print("当前进程ID:", os.getpid())
print("父进程ID:", os.getppid())在 Windows 环境下,还可以通过以下命令查看进程名、父进程 ID 与进程 ID:
wmic process get Name,ParentProcessId,ProcessId三、使用 Process 创建进程
Python 标准库 multiprocessing 提供了 Process 类,用于创建独立子进程。通过 target 指定子进程执行的函数,通过 start() 启动进程。
最基础的示例如下:
import os
import time
from multiprocessing import Process
def speak():
for index in range(5):
print(f'说话任务 {index}, pid={os.getpid()}, ppid={os.getppid()}')
time.sleep(1)
def study():
for index in range(5):
print(f'学习任务 {index}, pid={os.getpid()}, ppid={os.getppid()}')
time.sleep(1)
if __name__ == '__main__':
print('主进程开始执行')
p1 = Process(target=speak)
p2 = Process(target=study)
p1.start()
p2.start()
print('主进程继续向下执行')为什么必须写 if __name__ == '__main__':
这是 multiprocessing 初学阶段最容易忽略、但最重要的规则之一,尤其是在 Windows 系统上。
原因在于:创建子进程时,Python 并不是简单地把父进程中的函数对象直接复制过去,而是会启动一个新的 Python 解释器,并重新执行当前模块。如果没有 if __name__ == '__main__': 的保护,模块顶层代码会被无限重复执行,最终导致递归创建子进程。
Mermaid:Windows 下子进程启动逻辑
四、Process 的常用参数
Process 在实例化时支持多个参数:
group:通常保持Nonetarget:子进程执行的可调用对象name:进程名称args:位置参数元组kwargs:关键字参数字典daemon:是否为守护进程
下面给出一个更完整的例子:
import os
import time
from multiprocessing import Process, current_process
def speak(a, b, msg):
for index in range(3):
print(
f'{msg} | a={a} | b={b} | 进程名={current_process().name} '
f'| pid={os.getpid()} | ppid={os.getppid()} | index={index}'
)
time.sleep(1)
def study():
for index in range(3):
print(f'学习任务 index={index}, pid={os.getpid()}, ppid={os.getppid()}')
time.sleep(1)
if __name__ == '__main__':
p1 = Process(
target=speak,
name='SpeechProcess',
args=(666, 888),
kwargs={'msg': 'demo'}
)
p2 = Process(target=study)
p1.start()
p2.start()五、进程控制:锁、等待、终止与守护机制
进程不仅可以被创建,还需要被安全地控制。典型控制手段包括进程锁、join()、terminate() 和守护进程。
1. Lock:进程锁
当多个进程竞争同一份共享资源时,若不加控制,极容易发生输出交叉、数据错乱、状态不一致等问题。这类被多个执行单元共享的代码区域称为临界区(Critical Section)。
multiprocessing.Lock 可以保证同一时刻只有一个进程进入临界区。
import time
from multiprocessing import Process, Lock
def task1(lock):
for _ in range(3):
with lock:
print('task1 -> A B C D')
time.sleep(1)
def task2(lock):
for _ in range(3):
with lock:
print('task2 -> 1 2 3 4')
time.sleep(1)
if __name__ == '__main__':
lock = Lock()
p1 = Process(target=task1, args=(lock,))
p2 = Process(target=task2, args=(lock,))
p1.start()
p2.start()推荐使用 with lock
相较于手动调用 acquire() 和 release(),上下文管理器写法更安全。即使临界区内抛出异常,也能自动释放锁,从而减少死锁风险。
2. RLock:可重入锁
普通 Lock 不支持同一个执行流重复加锁。如果在同一进程、同一线程里对同一把锁连续 acquire() 两次,而只持有一把普通锁,就可能导致自我阻塞。
这时应使用 RLock(Reentrant Lock,可重入锁)。
import time
from multiprocessing import Process, RLock
def task(lock):
for _ in range(3):
lock.acquire()
lock.acquire()
try:
print('可重入锁示例:连续加锁两次仍可正常执行')
finally:
lock.release()
lock.release()
time.sleep(1)
if __name__ == '__main__':
lock = RLock()
p = Process(target=task, args=(lock,))
p.start()
p.join()3. join():等待子进程结束
join() 的作用是阻塞当前进程,直到目标进程执行结束。
import time
from multiprocessing import Process
def work():
for i in range(5):
print(f'子进程执行中: {i}')
time.sleep(1)
if __name__ == '__main__':
p = Process(target=work)
p.start()
p.join(2)
print('主进程等待2秒后继续执行')注意事项
join()必须在start()之后调用p.join()的含义不是“让 p 等待”,而是“让当前执行join()的进程等待 p”join(timeout)超时后不会终止子进程,只是当前进程不再继续等待
4. terminate():强制终止进程
当需要立即停止某个子进程时,可以调用 terminate()。这是一种强制终止机制,通常不会保证子进程执行清理逻辑,因此要谨慎使用。
import time
from multiprocessing import Process
def work():
try:
for i in range(10):
print(f'工作中: {i}')
time.sleep(1)
finally:
print('finally 清理逻辑')
if __name__ == '__main__':
p = Process(target=work)
p.start()
time.sleep(3)
p.terminate()
p.join()
print('进程是否存活:', p.is_alive())这里需要特别注意:被 terminate() 强制结束后,finally 中的清理逻辑通常不会执行。
5. 守护进程(Daemon)
守护进程是一种依附于主进程存在的子进程。一旦主进程退出,守护进程会被自动结束。守护进程适合处理监控、采样、日志汇总、状态巡检等辅助型任务。
import os
import time
from multiprocessing import Process
def monitor():
while True:
print(f'监控进程运行中, pid={os.getpid()}')
time.sleep(1)
def business():
for i in range(5):
print(f'业务进程执行: {i}')
time.sleep(1)
if __name__ == '__main__':
p1 = Process(target=monitor, daemon=True)
p2 = Process(target=business)
p1.start()
p2.start()
p2.join()
print('主进程结束')守护进程的重要约束
守护进程必须是子进程
主进程结束时,守护进程随之结束
守护进程中不允许再创建新的子进程
daemon必须在start()之前设置
六、进程之间为什么不共享变量
进程拥有独立的内存空间,因此普通变量不会在多个进程之间自动共享。这是进程与线程最根本的区别之一。
下面是一个验证示例:
from multiproce
Python
from multiprocessing import Process
num = 100
names = []
def test1():
global num, names
num += 10
names.append('张三')
print('test1:', num, names)
def test2():
global num, names
num -= 10
names.append('李四')
print('test2:', num, names)
if __name__ == '__main__':
p1 = Process(target=test1)
p2 = Process(target=test2)
p1.start()
p2.start()
p1.join()
p2.join()
print('main:', num, names)
即使在子进程中修改了 num 和 names,主进程中的变量也不会随之变化。这是因为子进程操作的是各自独立的内存副本。
Mermaid:进程变量不共享ssing import Process
num = 100
names = []
def test1():
global num, names
num += 10
names.append('张三')
print('test1:', num, names)
def test2():
global num, names
num -= 10
names.append('李四')
print('test2:', num, names)
if __name__ == '__main__':
p1 = Process(target=test1)
p2 = Process(target=test2)
p1.start()
p2.start()
p1.join()
p2.join()
print('main:', num, names)即使在子进程中修改了 num 和 names,主进程中的变量也不会随之变化。这是因为子进程操作的是各自独立的内存副本。
Mermaid:进程变量不共享
这也是为什么多进程程序往往需要专门的通信机制,例如 Queue、Pipe、共享内存、Manager 等。
七、Queue:队列与进程通信
1. 队列的基本特性
multiprocessing.Queue 是跨进程安全的数据结构,遵循先进先出(FIFO)原则。由于它具备进程安全与阻塞语义,因此是最常见的进程间通信方式之一。
常用方法包括:
put():入队get():出队empty():是否为空full():是否已满qsize():元素个数put_nowait():非阻塞入队get_nowait():非阻塞出队
基础示例:
from multiprocessing import Queue
q = Queue(3)
q.put(10)
q.put(20)
q.put(30)
print(q.full()) # True
print(q.get()) # 10
print(q.get()) # 20
print(q.get()) # 30
print(q.empty()) # True2. 队列的等待模式
当队列已满时,继续 put() 会阻塞;当队列为空时,继续 get() 也会阻塞。
import time
from multiprocessing import Process, Queue
def consumer(q):
time.sleep(3)
value = q.get()
print('消费者取出:', value)
if __name__ == '__main__':
q = Queue(2)
q.put('A')
q.put('B')
p = Process(target=consumer, args=(q,))
p.start()
print('队列已满,准备继续放入...')
q.put('C') # 阻塞,直到consumer取走一个元素
print('放入完成')3. 使用 Queue 实现进程通信
一个进程负责生产数据,另一个进程负责消费数据,是最经典的生产者-消费者模型。
import time
from multiprocessing import Process, Queue
def producer(q):
for i in range(5):
print(f'生产者放入: {i}')
q.put(i)
time.sleep(0.5)
def consumer(q):
for _ in range(5):
data = q.get()
print(f'消费者取出: {data}')
time.sleep(1)
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()Mermaid:Queue 通信模型
八、Pipe:管道通信
Pipe 提供了另一种轻量级进程间通信方式,可以理解为一条双端连接的“数据管道”。Pipe() 返回两个连接对象,两端分别负责发送和接收。
import time
from multiprocessing import Process, Pipe
def sender(conn):
time.sleep(2)
conn.send(100)
print('发送方已发送 100')
def receiver(conn):
data = conn.recv()
print('接收方收到:', data)
if __name__ == '__main__':
conn1, conn2 = Pipe(duplex=False)
p1 = Process(target=sender, args=(conn1,))
p2 = Process(target=receiver, args=(conn2,))
p1.start()
p2.start()
p1.join()
p2.join()Mermaid:Pipe 通信模型
Pipe 与 Queue 的区别简述
Pipe更偏向点对点通信Queue更适合多生产者、多消费者场景Queue自带 FIFO 语义,更适合任务流转Pipe在简单双向通信中更直接
九、继承 Process 类创建进程
当子进程逻辑较复杂,或者希望将“任务行为 + 参数状态”封装成对象时,继承 Process 是更清晰的方式。关键点在于重写 run() 方法,并通过 start() 启动,而不是直接调用 run()。
import os
import time
from multiprocessing import Process
class SpeakProcess(Process):
def __init__(self, a, b, **kwargs):
super().__init__(**kwargs)
self.a = a
self.b = b
def run(self):
for index in range(3):
print(f'SpeakProcess: a={self.a}, b={self.b}, pid={os.getpid()}, index={index}')
time.sleep(1)
class StudyProcess(Process):
def run(self):
for index in range(3):
print(f'StudyProcess: pid={os.getpid()}, index={index}')
time.sleep(1)
if __name__ == '__main__':
p1 = SpeakProcess(100, 200)
p2 = StudyProcess()
p1.start()
p2.start()
p1.join()
p2.join()这种方式体现了面向对象风格,适用于复杂任务封装、定制化生命周期处理以及可维护性要求较高的代码结构。
十、进程池:高效管理大量进程任务
频繁创建和销毁进程的成本较高。当任务量很大时,逐个创建进程会造成明显的系统资源浪费。进程池的核心思想是:预先创建固定数量的工作进程,后续任务复用这些进程执行。
Python 中通常使用 concurrent.futures.ProcessPoolExecutor。
1. 提交任务与关闭进程池
import os
import time
from concurrent.futures import ProcessPoolExecutor
def work(n):
print(f'任务 {n} 执行中, pid={os.getpid()}')
time.sleep(1)
if __name__ == '__main__':
executor = ProcessPoolExecutor(max_workers=3)
for i in range(1, 8):
executor.submit(work, i)
executor.shutdown(wait=True)
print('所有任务执行完成')2. 获取返回结果
submit() 返回 Future 对象,可通过 result() 获取任务返回值。
import os
import time
from concurrent.futures import ProcessPoolExecutor
def work(n):
print(f'任务 {n} 执行中, pid={os.getpid()}')
time.sleep(1)
return f'结果-{n}'
if __name__ == '__main__':
with ProcessPoolExecutor(3) as executor:
futures = [executor.submit(work, i) for i in range(1, 8)]
for f in futures:
print(f.result())3. as_completed():按完成顺序取结果
import os
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
def work(n):
print(f'任务 {n} 执行中, pid={os.getpid()}')
if n == 1:
time.sleep(5)
elif n == 2:
time.sleep(3)
else:
time.sleep(1)
return f'结果-{n}'
if __name__ == '__main__':
with ProcessPoolExecutor(3) as executor:
futures = [executor.submit(work, i) for i in range(1, 8)]
result_list = [f.result() for f in as_completed(futures)]
print(result_list)这里的结果顺序反映的是“哪个任务先完成”,而不是“哪个任务先提交”。
4. add_done_callback():任务完成回调
from concurrent.futures import ProcessPoolExecutor
import time
import os
result_list = []
def work(n):
print(f'任务 {n} 执行中, pid={os.getpid()}')
time.sleep(1)
return f'结果-{n}'
def done_callback(f):
result_list.append(f.result())
if __name__ == '__main__':
with ProcessPoolExecutor(3) as executor:
for i in range(1, 8):
future = executor.submit(work, i)
future.add_done_callback(done_callback)
print(result_list)5. map():批量提交任务
import os
import time
from concurrent.futures import ProcessPoolExecutor
def work(n):
print(f'任务 {n} 执行中, pid={os.getpid()}')
time.sleep(1)
return f'结果-{n}'
if __name__ == '__main__':
with ProcessPoolExecutor(3) as executor:
results = executor.map(work, [1, 2, 3, 4, 5, 6, 7])
print(list(results))map() 会批量提交任务,并按提交顺序返回结果。需要注意,真正的阻塞发生在迭代结果时,而不是调用 map() 的瞬间。
Mermaid:进程池任务分发示意
十一、线程的本质与创建方式
任何一个正在运行的 Python 程序,至少都有一个线程,即主线程。线程是进程中的执行单元,多个线程共享进程的资源,因此创建成本比进程低很多。
1. 使用 Thread 创建线程
import os
import time
from threading import Thread, RLock, get_native_id
def speak(lock):
for index in range(5):
with lock:
print(f'说话线程 index={index}, pid={os.getpid()}, tid={get_native_id()}')
time.sleep(1)
def study(lock):
for index in range(5):
with lock:
print(f'学习线程 index={index}, pid={os.getpid()}, tid={get_native_id()}')
time.sleep(1)
if __name__ == '__main__':
print(f'主线程启动 pid={os.getpid()}, tid={get_native_id()}')
lock = RLock()
t1 = Thread(target=speak, args=(lock,))
t2 = Thread(target=study, args=(lock,))
t1.start()
t2.start()
t1.join()
t2.join()
print('主线程结束')2. 继承 Thread 创建线程
与继承 Process 类似,线程也可以通过继承 Thread 实现更强的封装性。
import os
import time
from threading import Thread, RLock, get_native_id
class SpeakThread(Thread):
def __init__(self, lock, **kwargs):
super().__init__(**kwargs)
self.lock = lock
def run(self):
for index in range(5):
with self.lock:
print(f'SpeakThread index={index}, pid={os.getpid()}, tid={get_native_id()}')
time.sleep(1)
class StudyThread(Thread):
def __init__(self, lock, **kwargs):
super().__init__(**kwargs)
self.lock = lock
def run(self):
for index in range(5):
with self.lock:
print(f'StudyThread index={index}, pid={os.getpid()}, tid={get_native_id()}')
time.sleep(1)
if __name__ == '__main__':
lock = RLock()
t1 = SpeakThread(lock)
t2 = StudyThread(lock)
t1.start()
t2.start()
t1.join()
t2.join()十二、线程池:轻量级并发任务的高效执行器
线程池与进程池在使用方式上几乎完全一致,只是执行单元从进程变成线程。Python 中通常使用 ThreadPoolExecutor。
1. 提交任务
import time
from threading import RLock, get_native_id
from concurrent.futures import ThreadPoolExecutor
def work(n, lock):
with lock:
print(f'任务 {n} 执行中, tid={get_native_id()}')
time.sleep(1)
if __name__ == '__main__':
lock = RLock()
executor = ThreadPoolExecutor(max_workers=3)
for i in range(1, 8):
executor.submit(work, i, lock)
executor.shutdown(wait=True)2. 获取返回结果
import time
from threading import RLock, get_native_id
from concurrent.futures import ThreadPoolExecutor
def work(n, lock):
with lock:
print(f'任务 {n} 执行中, tid={get_native_id()}')
time.sleep(1)
return f'线程任务结果-{n}'
if __name__ == '__main__':
lock = RLock()
with ThreadPoolExecutor(3) as executor:
futures = [executor.submit(work, i, lock) for i in range(1, 8)]
for f in futures:
print(f.result())3. 按完成顺序收集结果
import time
from threading import RLock, get_native_id
from concurrent.futures import ThreadPoolExecutor, as_completed
def work(n, lock):
with lock:
print(f'任务 {n} 执行中, tid={get_native_id()}')
if n == 1:
time.sleep(5)
elif n == 2:
time.sleep(3)
else:
time.sleep(1)
return f'线程任务结果-{n}'
if __name__ == '__main__':
lock = RLock()
with ThreadPoolExecutor(3) as executor:
futures = [executor.submit(work, i, lock) for i in range(1, 8)]
result_list = [f.result() for f in as_completed(futures)]
print(result_list)4. 使用 map() 批量提交线程任务
import time
from threading import RLock, get_native_id
from concurrent.futures import ThreadPoolExecutor
def work(n, lock):
with lock:
print(f'任务 {n} 执行中, tid={get_native_id()}')
time.sleep(1)
return f'线程任务结果-{n}'
if __name__ == '__main__':
lock = RLock()
with ThreadPoolExecutor(3) as executor:
result = executor.map(work, [1, 2, 3, 4, 5, 6, 7], [lock] * 7)
print(list(result))十三、GIL:Python 多线程为何难以真正并行
GIL,全称 Global Interpreter Lock,即全局解释器锁。它是 CPython 解释器中的一把互斥锁,其核心作用是:在同一个进程中,任意时刻只允许一个线程执行 Python 字节码。
这意味着,在 CPython 下,多线程对于纯 Python 的 CPU 密集型任务,通常无法利用多核实现真正并行,而表现为高频切换意义上的并发。
GIL 的存在原因
GIL 的设计初衷是保证解释器层面的数据安全。CPython 使用引用计数进行内存管理,如果没有一把全局锁,多个线程同时操作对象引用计数,极易导致底层状态不一致,进而触发严重内存问题。
GIL 何时释放
遇到 I/O 操作时,线程可能主动释放 GIL
时间片耗尽时,解释器可能强制切换线程
因此,多线程在 I/O 密集型场景中仍然非常有效,因为线程在等待磁盘、网络、数据库等外部资源时,不会一直占用执行机会。
Mermaid:GIL 对线程执行的约束
十四、GIL 与 Lock / RLock 不是同一种锁
这是并发编程中的高频误区。
GIL 是解释器级别的锁,用于保证 CPython 内部状态安全。业务代码几乎无法直接“管理”它。
Lock 和 RLock 则是业务层面的同步工具,用来保护临界区,确保共享资源访问顺序正确,例如避免输出交错、重复卖票、余额超扣等问题。
下面给出一个典型的线程加锁示例:防止多个窗口卖出同一张票。
import time
from threading import Thread, RLock, current_thread
current = 1
def sale(lock):
global current
while True:
with lock:
if current <= 20:
print(f'{current_thread().name} 售出了第 {current} 张票')
current += 1
else:
print('票已售空')
break
time.sleep(0.2)
if __name__ == '__main__':
lock = RLock()
t1 = Thread(target=sale, name='窗口1', args=(lock,))
t2 = Thread(target=sale, name='窗口2', args=(lock,))
t3 = Thread(target=sale, name='窗口3', args=(lock,))
t1.start()
t2.start()
t3.start()如果没有这把业务锁,就可能出现多个线程同时读取到相同票号,从而产生重复出售的逻辑错误。
十五、多进程 vs 多线程:如何选择
这是并发编程中的实际落地问题。没有绝对更优的方案,只有更匹配场景的方案。
1. CPU 密集型任务:优先多进程
CPU 密集型任务的特点是计算量大,例如图像处理、数值计算、机器学习预处理、大规模数据变换等。这类任务的瓶颈在 CPU 运算本身。
在 CPython 中,由于 GIL 的存在,多线程对纯计算任务通常无法显著提升性能,因此多进程更合适。多个进程可以真正利用多核 CPU。
import time
from concurrent.futures import ProcessPoolExecutor
def cpu_task(n):
print(f'CPU任务 {n} 开始')
total = 0
for i in range(10_000_000):
total += i * i
return total
if __name__ == '__main__':
start = time.time()
with ProcessPoolExecutor(4) as executor:
list(executor.map(cpu_task, [1, 2, 3, 4]))
end = time.time() - start
print(f'多进程总耗时: {end:.2f} 秒')2. I/O 密集型任务:优先多线程
I/O 密集型任务的瓶颈不在 CPU,而在等待外部资源,例如文件读写、网络请求、数据库访问、接口调用等。线程在等待 I/O 时可以让出执行机会,因此多线程能够有效提升吞吐量。
import time
from concurrent.futures import ThreadPoolExecutor
def copy_file(index):
with open('a.zip', 'rb') as src, open(f'a_copy_{index}.zip', 'wb') as dst:
while True:
data = src.read(1024 * 1024)
if not data:
break
dst.write(data)
if __name__ == '__main__':
start = time.time()
with ThreadPoolExecutor(4) as executor:
for i in range(4):
executor.submit(copy_file, i)
end = time.time() - start
print(f'多线程耗时: {end:.2f} 秒')3. 一张决策表快速总结
Mermaid:多进程与多线程选型流程图
十六、面试与实战中的高频结论
进程与线程相关知识经常出现在技术面试和日常开发中,以下结论具有较高实用价值:
并发不等于并行。并发强调交替推进,并行强调同时执行。单核 CPU 可以实现并发,但无法实现真正的并行。
同步与异步描述的是任务之间的等待关系,而不是 CPU 是否多核。异步并不意味着任务已经完成,只意味着调用方不必阻塞等待。
进程是资源分配的基本单位,线程是 CPU 调度的基本单位。进程更重、线程更轻;进程隔离更强、线程共享更方便。
Python 中使用 multiprocessing 时,尤其在 Windows 下,必须写 if __name__ == '__main__':,否则极易出现递归创建子进程的问题。
多进程之间默认不共享普通变量。若要通信,应使用 Queue、Pipe 等进程间通信机制。
Lock 用于互斥访问共享资源,RLock 用于同一执行流下的可重入加锁场景。
join() 只是等待目标进程结束,不会终止进程。terminate() 才是强制终止,但可能跳过清理逻辑。
守护进程适合做陪跑型任务,例如监控、日志统计等,但会随着主进程退出而自动结束。
线程池和进程池适合处理批量任务,可以显著降低频繁创建/销毁执行单元的成本。
CPython 的 GIL 决定了多线程在 CPU 密集型任务中通常难以真正并行,因此 CPU 密集型优先考虑多进程,I/O 密集型优先考虑多线程。
十七、总结
进程与线程是操作系统调度模型与并发程序设计的基础。对 Python 而言,理解 multiprocessing 与 threading 的差异,不仅是语法层面的掌握,更关系到性能、稳定性、可维护性与正确性。
从概念层面看,并发、并行、同步、异步构成了并发编程的认知基础;从实践层面看,Process、Thread、Lock、RLock、Queue、Pipe、进程池与线程池构成了 Python 并发编程的核心工具箱;从性能层面看,GIL 决定了多线程的适用边界,也直接影响多进程与多线程的选型策略。
实际工程中,选择多进程还是多线程,不应只看“能不能跑”,而应结合任务类型、资源共享方式、系统开销、稳定性要求和部署环境综合判断。能够清晰地区分这些概念,并根据业务特征选择合适模型,才是并发编程真正的核心能力。
如果需要,还可以继续扩展到以下方向:
基于
multiprocessing.Manager的共享对象基于
asyncio的异步 I/O 编程线程安全队列与生产者-消费者模型进阶
死锁形成条件与排查思路
Python 并发模型在 Web 服务、爬虫、数据处理中的落地实践