乐闻世界logo
搜索文章和话题

Python 中多线程和多进程有什么区别?分别在什么场景下使用?

2月21日 17:10

Python 多线程与多进程详解

线程与进程的基本概念

进程(Process)

进程是操作系统分配资源的基本单位,拥有独立的内存空间、文件句柄等系统资源。每个进程都有自己独立的地址空间,进程间通信需要特殊的机制(IPC)。

线程(Thread)

线程是 CPU 调度的基本单位,同一进程内的线程共享进程的内存空间和资源。线程间的通信相对简单,但需要处理同步问题。

Python 中的多线程

threading 模块

Python 的 threading 模块提供了线程相关的操作。

python
import threading import time def worker(name, delay): print(f"线程 {name} 开始") time.sleep(delay) print(f"线程 {name} 结束") # 创建线程 t1 = threading.Thread(target=worker, args=("Thread-1", 2)) t2 = threading.Thread(target=worker, args=("Thread-2", 1)) # 启动线程 t1.start() t2.start() # 等待线程结束 t1.join() t2.join() print("主线程结束")

线程同步

由于多个线程共享内存,需要使用同步机制来避免竞态条件。

1. Lock(锁)

python
import threading counter = 0 lock = threading.Lock() def increment(): global counter for _ in range(100000): with lock: # 自动获取和释放锁 counter += 1 threads = [] for _ in range(10): t = threading.Thread(target=increment) threads.append(t) t.start() for t in threads: t.join() print(f"计数器值: {counter}") # 应该是 1000000

2. RLock(可重入锁)

python
import threading lock = threading.RLock() def recursive_function(n): with lock: if n > 0: print(f"递归深度: {n}") recursive_function(n - 1) recursive_function(5)

3. Semaphore(信号量)

python
import threading import time semaphore = threading.Semaphore(3) # 最多允许 3 个线程同时执行 def worker(worker_id): with semaphore: print(f"Worker {worker_id} 开始工作") time.sleep(2) print(f"Worker {worker_id} 完成工作") threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)] for t in threads: t.start() for t in threads: t.join()

4. Event(事件)

python
import threading import time event = threading.Event() def waiter(): print("等待事件...") event.wait() # 等待事件被设置 print("事件已触发,继续执行") def setter(): time.sleep(2) print("设置事件") event.set() # 触发事件 t1 = threading.Thread(target=waiter) t2 = threading.Thread(target=setter) t1.start() t2.start() t1.join() t2.join()

5. Condition(条件变量)

python
import threading import time import random condition = threading.Condition() queue = [] def producer(): for i in range(5): time.sleep(random.random()) with condition: item = f"Item-{i}" queue.append(item) print(f"生产: {item}") condition.notify() # 通知等待的消费者 def consumer(): for _ in range(5): with condition: while not queue: print("等待产品...") condition.wait() # 等待产品 item = queue.pop(0) print(f"消费: {item}") t1 = threading.Thread(target=producer) t2 = threading.Thread(target=consumer) t1.start() t2.start() t1.join() t2.join()

Python 中的多进程

multiprocessing 模块

Python 的 multiprocessing 模块提供了多进程支持,每个进程有独立的 Python 解释器和 GIL。

python
import multiprocessing import time def worker(name, delay): print(f"进程 {name} 开始") time.sleep(delay) print(f"进程 {name} 结束") if __name__ == '__main__': # 创建进程 p1 = multiprocessing.Process(target=worker, args=("Process-1", 2)) p2 = multiprocessing.Process(target=worker, args=("Process-2", 1)) # 启动进程 p1.start() p2.start() # 等待进程结束 p1.join() p2.join() print("主进程结束")

进程池(Process Pool)

python
import multiprocessing def square(x): return x * x if __name__ == '__main__': # 创建进程池 with multiprocessing.Pool(processes=4) as pool: # 并行计算 numbers = list(range(10)) results = pool.map(square, numbers) print(f"结果: {results}") # 异步计算 async_results = [pool.apply_async(square, (x,)) for x in numbers] results = [r.get() for r in async_results] print(f"异步结果: {results}")

进程间通信(IPC)

1. Queue(队列)

python
import multiprocessing def producer(queue): for i in range(5): queue.put(f"Message-{i}") queue.put("STOP") def consumer(queue): while True: message = queue.get() if message == "STOP": break print(f"收到: {message}") if __name__ == '__main__': queue = multiprocessing.Queue() p1 = multiprocessing.Process(target=producer, args=(queue,)) p2 = multiprocessing.Process(target=consumer, args=(queue,)) p1.start() p2.start() p1.join() p2.join()

2. Pipe(管道)

python
import multiprocessing def sender(conn): conn.send("Hello from sender") conn.close() def receiver(conn): message = conn.recv() print(f"收到消息: {message}") if __name__ == '__main__': parent_conn, child_conn = multiprocessing.Pipe() p1 = multiprocessing.Process(target=sender, args=(child_conn,)) p2 = multiprocessing.Process(target=receiver, args=(parent_conn,)) p1.start() p2.start() p1.join() p2.join()

3. Value 和 Array(共享内存)

python
import multiprocessing def increment(counter): with counter.get_lock(): counter.value += 1 if __name__ == '__main__': counter = multiprocessing.Value('i', 0) processes = [multiprocessing.Process(target=increment, args=(counter,)) for _ in range(100)] for p in processes: p.start() for p in processes: p.join() print(f"计数器值: {counter.value}")

多线程 vs 多进程

性能对比

python
import threading import multiprocessing import time def cpu_bound_task(n): result = 0 for i in range(n): result += i ** 2 return result def test_multithreading(): start = time.time() threads = [threading.Thread(target=cpu_bound_task, args=(1000000,)) for _ in range(4)] for t in threads: t.start() for t in threads: t.join() print(f"多线程耗时: {time.time() - start:.4f} 秒") def test_multiprocessing(): start = time.time() with multiprocessing.Pool(processes=4) as pool: results = pool.map(cpu_bound_task, [1000000] * 4) print(f"多进程耗时: {time.time() - start:.4f} 秒") if __name__ == '__main__': test_multithreading() test_multiprocessing()

对比总结

特性多线程多进程
内存使用共享内存,占用少独立内存,占用多
通信方式共享变量、队列等Queue、Pipe、共享内存
创建开销
数据共享容易,但需要同步困难,需要 IPC
GIL 影响受 GIL 限制不受 GIL 限制
适用场景I/O 密集型CPU 密集型
稳定性一个线程崩溃可能影响整个进程进程间隔离,更稳定

实际应用场景

1. I/O 密集型任务 - 使用多线程

python
import threading import requests import time def download_url(url): try: response = requests.get(url) print(f"下载完成: {url}, 大小: {len(response.content)} 字节") except Exception as e: print(f"下载失败: {url}, 错误: {e}") urls = [ "https://www.example.com", "https://www.google.com", "https://www.github.com", "https://www.python.org", ] start = time.time() threads = [threading.Thread(target=download_url, args=(url,)) for url in urls] for t in threads: t.start() for t in threads: t.join() print(f"总耗时: {time.time() - start:.4f} 秒")

2. CPU 密集型任务 - 使用多进程

python
import multiprocessing import time def process_image(image_data): # 模拟图像处理 result = sum(i ** 2 for i in range(len(image_data))) return result def process_images_concurrently(images): with multiprocessing.Pool(processes=4) as pool: results = pool.map(process_image, images) return results if __name__ == '__main__': images = [list(range(10000)) for _ in range(10)] start = time.time() results = process_images_concurrently(images) print(f"多进程处理耗时: {time.time() - start:.4f} 秒")

3. 混合使用 - 多进程 + 多线程

python
import multiprocessing import threading import time def io_task(url): # 模拟 I/O 操作 time.sleep(0.1) return f"Processed {url}" def worker(urls): # 每个进程内部使用多线程处理 I/O threads = [threading.Thread(target=io_task, args=(url,)) for url in urls] for t in threads: t.start() for t in threads: t.join() if __name__ == '__main__': all_urls = [f"url-{i}" for i in range(20)] # 将 URL 分组 url_groups = [all_urls[i:i+5] for i in range(0, len(all_urls), 5)] start = time.time() processes = [multiprocessing.Process(target=worker, args=(group,)) for group in url_groups] for p in processes: p.start() for p in processes: p.join() print(f"混合模式耗时: {time.time() - start:.4f} 秒")

最佳实践

1. 选择合适的并发模型

python
# I/O 密集型 - 使用多线程 import threading import requests def fetch_url(url): response = requests.get(url) return response.text urls = ["url1", "url2", "url3"] threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls] for t in threads: t.start() for t in threads: t.join() # CPU 密集型 - 使用多进程 import multiprocessing def compute-intensive(data): return sum(x ** 2 for x in data) data_chunks = [chunk1, chunk2, chunk3, chunk4] with multiprocessing.Pool(4) as pool: results = pool.map(compute-intensive, data_chunks)

2. 避免过度并发

python
# 不好的做法 - 创建过多线程 threads = [threading.Thread(target=task) for _ in range(1000)] # 好的做法 - 使用线程池 from concurrent.futures import ThreadPoolExecutor with ThreadPoolExecutor(max_workers=10) as executor: futures = [executor.submit(task) for _ in range(1000)] results = [f.result() for f in futures]

3. 正确处理异常

python
import threading import traceback def worker(): try: # 执行任务 result = do_something() except Exception as e: print(f"线程异常: {e}") traceback.print_exc() threads = [threading.Thread(target=worker) for _ in range(10)] for t in threads: t.start() for t in threads: t.join()

4. 使用上下文管理器

python
# 好的做法 - 自动清理资源 from concurrent.futures import ThreadPoolExecutor def process_item(item): return item * 2 with ThreadPoolExecutor(max_workers=4) as executor: results = list(executor.map(process_item, range(100))) # 线程池自动关闭

总结

多线程

  • 适合 I/O 密集型任务
  • 内存占用少,创建开销小
  • 受 GIL 限制,无法充分利用多核
  • 需要处理线程同步问题

多进程

  • 适合 CPU 密集型任务
  • 可以充分利用多核 CPU
  • 内存占用大,创建开销大
  • 进程间隔离,更稳定

选择建议

  1. I/O 密集型:优先使用多线程或异步编程
  2. CPU 密集型:使用多进程
  3. 混合型:结合多进程和多线程
  4. 简单任务:考虑使用 concurrent.futures 模块

理解多线程和多进程的特点,根据实际需求选择合适的并发模型,才能编写出高效、稳定的并发程序。

标签:Python