Python Multithreading and Multiprocessing Explained
Basic Concepts of Threads and Processes
Process
A process is the basic unit of operating system resource allocation, having independent memory space, file handles, and other system resources. Each process has its own independent address space, and inter-process communication requires special mechanisms (IPC).
Thread
A thread is the basic unit of CPU scheduling. Threads within the same process share the process's memory space and resources. Inter-thread communication is relatively simple, but synchronization issues need to be handled.
Multithreading in Python
threading Module
Python's threading module provides thread-related operations.
pythonimport threading import time def worker(name, delay): print(f"Thread {name} started") time.sleep(delay) print(f"Thread {name} finished") # Create threads t1 = threading.Thread(target=worker, args=("Thread-1", 2)) t2 = threading.Thread(target=worker, args=("Thread-2", 1)) # Start threads t1.start() t2.start() # Wait for threads to finish t1.join() t2.join() print("Main thread finished")
Thread Synchronization
Since multiple threads share memory, synchronization mechanisms are needed to avoid race conditions.
1. Lock
pythonimport threading counter = 0 lock = threading.Lock() def increment(): global counter for _ in range(100000): with lock: # Automatically acquire and release lock counter += 1 threads = [] for _ in range(10): t = threading.Thread(target=increment) threads.append(t) t.start() for t in threads: t.join() print(f"Counter value: {counter}") # Should be 1000000
2. RLock (Reentrant Lock)
pythonimport threading lock = threading.RLock() def recursive_function(n): with lock: if n > 0: print(f"Recursion depth: {n}") recursive_function(n - 1) recursive_function(5)
3. Semaphore
pythonimport threading import time semaphore = threading.Semaphore(3) # Allow max 3 threads to execute simultaneously def worker(worker_id): with semaphore: print(f"Worker {worker_id} started") time.sleep(2) print(f"Worker {worker_id} finished") threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)] for t in threads: t.start() for t in threads: t.join()
4. Event
pythonimport threading import time event = threading.Event() def waiter(): print("Waiting for event...") event.wait() # Wait for event to be set print("Event triggered, continuing execution") def setter(): time.sleep(2) print("Setting event") event.set() # Trigger event t1 = threading.Thread(target=waiter) t2 = threading.Thread(target=setter) t1.start() t2.start() t1.join() t2.join()
5. Condition
pythonimport threading import time import random condition = threading.Condition() queue = [] def producer(): for i in range(5): time.sleep(random.random()) with condition: item = f"Item-{i}" queue.append(item) print(f"Produced: {item}") condition.notify() # Notify waiting consumers def consumer(): for _ in range(5): with condition: while not queue: print("Waiting for items...") condition.wait() # Wait for items item = queue.pop(0) print(f"Consumed: {item}") t1 = threading.Thread(target=producer) t2 = threading.Thread(target=consumer) t1.start() t2.start() t1.join() t2.join()
Multiprocessing in Python
multiprocessing Module
Python's multiprocessing module provides multiprocessing support, with each process having an independent Python interpreter and GIL.
pythonimport multiprocessing import time def worker(name, delay): print(f"Process {name} started") time.sleep(delay) print(f"Process {name} finished") if __name__ == '__main__': # Create processes p1 = multiprocessing.Process(target=worker, args=("Process-1", 2)) p2 = multiprocessing.Process(target=worker, args=("Process-2", 1)) # Start processes p1.start() p2.start() # Wait for processes to finish p1.join() p2.join() print("Main process finished")
Process Pool
pythonimport multiprocessing def square(x): return x * x if __name__ == '__main__': # Create process pool with multiprocessing.Pool(processes=4) as pool: # Parallel computation numbers = list(range(10)) results = pool.map(square, numbers) print(f"Results: {results}") # Async computation async_results = [pool.apply_async(square, (x,)) for x in numbers] results = [r.get() for r in async_results] print(f"Async results: {results}")
Inter-Process Communication (IPC)
1. Queue
pythonimport multiprocessing def producer(queue): for i in range(5): queue.put(f"Message-{i}") queue.put("STOP") def consumer(queue): while True: message = queue.get() if message == "STOP": break print(f"Received: {message}") if __name__ == '__main__': queue = multiprocessing.Queue() p1 = multiprocessing.Process(target=producer, args=(queue,)) p2 = multiprocessing.Process(target=consumer, args=(queue,)) p1.start() p2.start() p1.join() p2.join()
2. Pipe
pythonimport multiprocessing def sender(conn): conn.send("Hello from sender") conn.close() def receiver(conn): message = conn.recv() print(f"Received message: {message}") if __name__ == '__main__': parent_conn, child_conn = multiprocessing.Pipe() p1 = multiprocessing.Process(target=sender, args=(child_conn,)) p2 = multiprocessing.Process(target=receiver, args=(parent_conn,)) p1.start() p2.start() p1.join() p2.join()
3. Value and Array (Shared Memory)
pythonimport multiprocessing def increment(counter): with counter.get_lock(): counter.value += 1 if __name__ == '__main__': counter = multiprocessing.Value('i', 0) processes = [multiprocessing.Process(target=increment, args=(counter,)) for _ in range(100)] for p in processes: p.start() for p in processes: p.join() print(f"Counter value: {counter.value}")
Multithreading vs Multiprocessing
Performance Comparison
pythonimport threading import multiprocessing import time def cpu_bound_task(n): result = 0 for i in range(n): result += i ** 2 return result def test_multithreading(): start = time.time() threads = [threading.Thread(target=cpu_bound_task, args=(1000000,)) for _ in range(4)] for t in threads: t.start() for t in threads: t.join() print(f"Multithreading time: {time.time() - start:.4f} seconds") def test_multiprocessing(): start = time.time() with multiprocessing.Pool(processes=4) as pool: results = pool.map(cpu_bound_task, [1000000] * 4) print(f"Multiprocessing time: {time.time() - start:.4f} seconds") if __name__ == '__main__': test_multithreading() test_multiprocessing()
Comparison Summary
| Feature | Multithreading | Multiprocessing |
|---|---|---|
| Memory Usage | Shared memory, less usage | Independent memory, more usage |
| Communication | Shared variables, queues, etc. | Queue, Pipe, shared memory |
| Creation Overhead | Small | Large |
| Data Sharing | Easy, but needs synchronization | Difficult, needs IPC |
| GIL Impact | Limited by GIL | Not limited by GIL |
| Use Case | I/O intensive | CPU intensive |
| Stability | Thread crash may affect entire process | Process isolation, more stable |
Practical Application Scenarios
1. I/O Intensive Tasks - Use Multithreading
pythonimport threading import requests import time def download_url(url): try: response = requests.get(url) print(f"Downloaded: {url}, size: {len(response.content)} bytes") except Exception as e: print(f"Failed: {url}, error: {e}") urls = [ "https://www.example.com", "https://www.google.com", "https://www.github.com", "https://www.python.org", ] start = time.time() threads = [threading.Thread(target=download_url, args=(url,)) for url in urls] for t in threads: t.start() for t in threads: t.join() print(f"Total time: {time.time() - start:.4f} seconds")
2. CPU Intensive Tasks - Use Multiprocessing
pythonimport multiprocessing import time def process_image(image_data): # Simulate image processing result = sum(i ** 2 for i in range(len(image_data))) return result def process_images_concurrently(images): with multiprocessing.Pool(processes=4) as pool: results = pool.map(process_image, images) return results if __name__ == '__main__': images = [list(range(10000)) for _ in range(10)] start = time.time() results = process_images_concurrently(images) print(f"Multiprocessing time: {time.time() - start:.4f} seconds")
3. Mixed Usage - Multiprocessing + Multithreading
pythonimport multiprocessing import threading import time def io_task(url): # Simulate I/O operation time.sleep(0.1) return f"Processed {url}" def worker(urls): # Use multithreading for I/O within each process threads = [threading.Thread(target=io_task, args=(url,)) for url in urls] for t in threads: t.start() for t in threads: t.join() if __name__ == '__main__': all_urls = [f"url-{i}" for i in range(20)] # Group URLs url_groups = [all_urls[i:i+5] for i in range(0, len(all_urls), 5)] start = time.time() processes = [multiprocessing.Process(target=worker, args=(group,)) for group in url_groups] for p in processes: p.start() for p in processes: p.join() print(f"Mixed mode time: {time.time() - start:.4f} seconds")
Best Practices
1. Choose the Right Concurrency Model
python# I/O intensive - Use multithreading import threading import requests def fetch_url(url): response = requests.get(url) return response.text urls = ["url1", "url2", "url3"] threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls] for t in threads: t.start() for t in threads: t.join() # CPU intensive - Use multiprocessing import multiprocessing def compute_intensive(data): return sum(x ** 2 for x in data) data_chunks = [chunk1, chunk2, chunk3, chunk4] with multiprocessing.Pool(4) as pool: results = pool.map(compute_intensive, data_chunks)
2. Avoid Excessive Concurrency
python# Bad practice - Create too many threads threads = [threading.Thread(target=task) for _ in range(1000)] # Good practice - Use thread pool from concurrent.futures import ThreadPoolExecutor with ThreadPoolExecutor(max_workers=10) as executor: futures = [executor.submit(task) for _ in range(1000)] results = [f.result() for f in futures]
3. Handle Exceptions Properly
pythonimport threading import traceback def worker(): try: # Execute task result = do_something() except Exception as e: print(f"Thread exception: {e}") traceback.print_exc() threads = [threading.Thread(target=worker) for _ in range(10)] for t in threads: t.start() for t in threads: t.join()
4. Use Context Managers
python# Good practice - Automatic resource cleanup from concurrent.futures import ThreadPoolExecutor def process_item(item): return item * 2 with ThreadPoolExecutor(max_workers=4) as executor: results = list(executor.map(process_item, range(100))) # Thread pool automatically closed
Summary
Multithreading
- Suitable for I/O intensive tasks
- Less memory usage, small creation overhead
- Limited by GIL, cannot fully utilize multi-core
- Need to handle thread synchronization
Multiprocessing
- Suitable for CPU intensive tasks
- Can fully utilize multi-core CPUs
- More memory usage, large creation overhead
- Process isolation, more stable
Selection Recommendations
- I/O intensive: Prioritize multithreading or async programming
- CPU intensive: Use multiprocessing
- Mixed: Combine multiprocessing and multithreading
- Simple tasks: Consider using
concurrent.futuresmodule
Understanding the characteristics of multithreading and multiprocessing, and choosing the appropriate concurrency model based on actual needs, is essential for writing efficient and stable concurrent programs.