Python 多线程与多进程详解
线程与进程的基本概念
进程(Process)
进程是操作系统分配资源的基本单位,拥有独立的内存空间、文件句柄等系统资源。每个进程都有自己独立的地址空间,进程间通信需要特殊的机制(IPC)。
线程(Thread)
线程是 CPU 调度的基本单位,同一进程内的线程共享进程的内存空间和资源。线程间的通信相对简单,但需要处理同步问题。
Python 中的多线程
threading 模块
Python 的 threading 模块提供了线程相关的操作。
pythonimport threading import time def worker(name, delay): print(f"线程 {name} 开始") time.sleep(delay) print(f"线程 {name} 结束") # 创建线程 t1 = threading.Thread(target=worker, args=("Thread-1", 2)) t2 = threading.Thread(target=worker, args=("Thread-2", 1)) # 启动线程 t1.start() t2.start() # 等待线程结束 t1.join() t2.join() print("主线程结束")
线程同步
由于多个线程共享内存,需要使用同步机制来避免竞态条件。
1. Lock(锁)
pythonimport threading counter = 0 lock = threading.Lock() def increment(): global counter for _ in range(100000): with lock: # 自动获取和释放锁 counter += 1 threads = [] for _ in range(10): t = threading.Thread(target=increment) threads.append(t) t.start() for t in threads: t.join() print(f"计数器值: {counter}") # 应该是 1000000
2. RLock(可重入锁)
pythonimport threading lock = threading.RLock() def recursive_function(n): with lock: if n > 0: print(f"递归深度: {n}") recursive_function(n - 1) recursive_function(5)
3. Semaphore(信号量)
pythonimport threading import time semaphore = threading.Semaphore(3) # 最多允许 3 个线程同时执行 def worker(worker_id): with semaphore: print(f"Worker {worker_id} 开始工作") time.sleep(2) print(f"Worker {worker_id} 完成工作") threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)] for t in threads: t.start() for t in threads: t.join()
4. Event(事件)
pythonimport threading import time event = threading.Event() def waiter(): print("等待事件...") event.wait() # 等待事件被设置 print("事件已触发,继续执行") def setter(): time.sleep(2) print("设置事件") event.set() # 触发事件 t1 = threading.Thread(target=waiter) t2 = threading.Thread(target=setter) t1.start() t2.start() t1.join() t2.join()
5. Condition(条件变量)
pythonimport threading import time import random condition = threading.Condition() queue = [] def producer(): for i in range(5): time.sleep(random.random()) with condition: item = f"Item-{i}" queue.append(item) print(f"生产: {item}") condition.notify() # 通知等待的消费者 def consumer(): for _ in range(5): with condition: while not queue: print("等待产品...") condition.wait() # 等待产品 item = queue.pop(0) print(f"消费: {item}") t1 = threading.Thread(target=producer) t2 = threading.Thread(target=consumer) t1.start() t2.start() t1.join() t2.join()
Python 中的多进程
multiprocessing 模块
Python 的 multiprocessing 模块提供了多进程支持,每个进程有独立的 Python 解释器和 GIL。
pythonimport multiprocessing import time def worker(name, delay): print(f"进程 {name} 开始") time.sleep(delay) print(f"进程 {name} 结束") if __name__ == '__main__': # 创建进程 p1 = multiprocessing.Process(target=worker, args=("Process-1", 2)) p2 = multiprocessing.Process(target=worker, args=("Process-2", 1)) # 启动进程 p1.start() p2.start() # 等待进程结束 p1.join() p2.join() print("主进程结束")
进程池(Process Pool)
pythonimport multiprocessing def square(x): return x * x if __name__ == '__main__': # 创建进程池 with multiprocessing.Pool(processes=4) as pool: # 并行计算 numbers = list(range(10)) results = pool.map(square, numbers) print(f"结果: {results}") # 异步计算 async_results = [pool.apply_async(square, (x,)) for x in numbers] results = [r.get() for r in async_results] print(f"异步结果: {results}")
进程间通信(IPC)
1. Queue(队列)
pythonimport multiprocessing def producer(queue): for i in range(5): queue.put(f"Message-{i}") queue.put("STOP") def consumer(queue): while True: message = queue.get() if message == "STOP": break print(f"收到: {message}") if __name__ == '__main__': queue = multiprocessing.Queue() p1 = multiprocessing.Process(target=producer, args=(queue,)) p2 = multiprocessing.Process(target=consumer, args=(queue,)) p1.start() p2.start() p1.join() p2.join()
2. Pipe(管道)
pythonimport multiprocessing def sender(conn): conn.send("Hello from sender") conn.close() def receiver(conn): message = conn.recv() print(f"收到消息: {message}") if __name__ == '__main__': parent_conn, child_conn = multiprocessing.Pipe() p1 = multiprocessing.Process(target=sender, args=(child_conn,)) p2 = multiprocessing.Process(target=receiver, args=(parent_conn,)) p1.start() p2.start() p1.join() p2.join()
3. Value 和 Array(共享内存)
pythonimport multiprocessing def increment(counter): with counter.get_lock(): counter.value += 1 if __name__ == '__main__': counter = multiprocessing.Value('i', 0) processes = [multiprocessing.Process(target=increment, args=(counter,)) for _ in range(100)] for p in processes: p.start() for p in processes: p.join() print(f"计数器值: {counter.value}")
多线程 vs 多进程
性能对比
pythonimport threading import multiprocessing import time def cpu_bound_task(n): result = 0 for i in range(n): result += i ** 2 return result def test_multithreading(): start = time.time() threads = [threading.Thread(target=cpu_bound_task, args=(1000000,)) for _ in range(4)] for t in threads: t.start() for t in threads: t.join() print(f"多线程耗时: {time.time() - start:.4f} 秒") def test_multiprocessing(): start = time.time() with multiprocessing.Pool(processes=4) as pool: results = pool.map(cpu_bound_task, [1000000] * 4) print(f"多进程耗时: {time.time() - start:.4f} 秒") if __name__ == '__main__': test_multithreading() test_multiprocessing()
对比总结
| 特性 | 多线程 | 多进程 |
|---|---|---|
| 内存使用 | 共享内存,占用少 | 独立内存,占用多 |
| 通信方式 | 共享变量、队列等 | Queue、Pipe、共享内存 |
| 创建开销 | 小 | 大 |
| 数据共享 | 容易,但需要同步 | 困难,需要 IPC |
| GIL 影响 | 受 GIL 限制 | 不受 GIL 限制 |
| 适用场景 | I/O 密集型 | CPU 密集型 |
| 稳定性 | 一个线程崩溃可能影响整个进程 | 进程间隔离,更稳定 |
实际应用场景
1. I/O 密集型任务 - 使用多线程
pythonimport threading import requests import time def download_url(url): try: response = requests.get(url) print(f"下载完成: {url}, 大小: {len(response.content)} 字节") except Exception as e: print(f"下载失败: {url}, 错误: {e}") urls = [ "https://www.example.com", "https://www.google.com", "https://www.github.com", "https://www.python.org", ] start = time.time() threads = [threading.Thread(target=download_url, args=(url,)) for url in urls] for t in threads: t.start() for t in threads: t.join() print(f"总耗时: {time.time() - start:.4f} 秒")
2. CPU 密集型任务 - 使用多进程
pythonimport multiprocessing import time def process_image(image_data): # 模拟图像处理 result = sum(i ** 2 for i in range(len(image_data))) return result def process_images_concurrently(images): with multiprocessing.Pool(processes=4) as pool: results = pool.map(process_image, images) return results if __name__ == '__main__': images = [list(range(10000)) for _ in range(10)] start = time.time() results = process_images_concurrently(images) print(f"多进程处理耗时: {time.time() - start:.4f} 秒")
3. 混合使用 - 多进程 + 多线程
pythonimport multiprocessing import threading import time def io_task(url): # 模拟 I/O 操作 time.sleep(0.1) return f"Processed {url}" def worker(urls): # 每个进程内部使用多线程处理 I/O threads = [threading.Thread(target=io_task, args=(url,)) for url in urls] for t in threads: t.start() for t in threads: t.join() if __name__ == '__main__': all_urls = [f"url-{i}" for i in range(20)] # 将 URL 分组 url_groups = [all_urls[i:i+5] for i in range(0, len(all_urls), 5)] start = time.time() processes = [multiprocessing.Process(target=worker, args=(group,)) for group in url_groups] for p in processes: p.start() for p in processes: p.join() print(f"混合模式耗时: {time.time() - start:.4f} 秒")
最佳实践
1. 选择合适的并发模型
python# I/O 密集型 - 使用多线程 import threading import requests def fetch_url(url): response = requests.get(url) return response.text urls = ["url1", "url2", "url3"] threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls] for t in threads: t.start() for t in threads: t.join() # CPU 密集型 - 使用多进程 import multiprocessing def compute-intensive(data): return sum(x ** 2 for x in data) data_chunks = [chunk1, chunk2, chunk3, chunk4] with multiprocessing.Pool(4) as pool: results = pool.map(compute-intensive, data_chunks)
2. 避免过度并发
python# 不好的做法 - 创建过多线程 threads = [threading.Thread(target=task) for _ in range(1000)] # 好的做法 - 使用线程池 from concurrent.futures import ThreadPoolExecutor with ThreadPoolExecutor(max_workers=10) as executor: futures = [executor.submit(task) for _ in range(1000)] results = [f.result() for f in futures]
3. 正确处理异常
pythonimport threading import traceback def worker(): try: # 执行任务 result = do_something() except Exception as e: print(f"线程异常: {e}") traceback.print_exc() threads = [threading.Thread(target=worker) for _ in range(10)] for t in threads: t.start() for t in threads: t.join()
4. 使用上下文管理器
python# 好的做法 - 自动清理资源 from concurrent.futures import ThreadPoolExecutor def process_item(item): return item * 2 with ThreadPoolExecutor(max_workers=4) as executor: results = list(executor.map(process_item, range(100))) # 线程池自动关闭
总结
多线程
- 适合 I/O 密集型任务
- 内存占用少,创建开销小
- 受 GIL 限制,无法充分利用多核
- 需要处理线程同步问题
多进程
- 适合 CPU 密集型任务
- 可以充分利用多核 CPU
- 内存占用大,创建开销大
- 进程间隔离,更稳定
选择建议
- I/O 密集型:优先使用多线程或异步编程
- CPU 密集型:使用多进程
- 混合型:结合多进程和多线程
- 简单任务:考虑使用
concurrent.futures模块
理解多线程和多进程的特点,根据实际需求选择合适的并发模型,才能编写出高效、稳定的并发程序。