乐闻世界logo
搜索文章和话题

C++ 并发编程与多线程如何实现

2月18日 17:34

C++ 并发编程与多线程

C++11 引入了标准化的多线程支持,提供了丰富的并发编程工具,包括线程、互斥锁、条件变量、原子操作等。

线程基础

创建线程:

cpp
#include <thread> #include <iostream> void hello() { std::cout << "Hello from thread!" << std::endl; } int main() { // 创建线程 std::thread t(hello); // 等待线程完成 t.join(); return 0; }

带参数的线程:

cpp
void printMessage(const std::string& message, int count) { for (int i = 0; i < count; ++i) { std::cout << message << std::endl; } } int main() { std::string msg = "Hello"; std::thread t(printMessage, msg, 3); t.join(); return 0; }

使用 lambda 表达式:

cpp
int main() { int value = 42; std::thread t([value]() { std::cout << "Value: " << value << std::endl; }); t.join(); return 0; }

互斥锁(Mutex)

基本使用:

cpp
#include <mutex> #include <thread> #include <iostream> std::mutex mtx; int counter = 0; void increment() { for (int i = 0; i < 10000; ++i) { std::lock_guard<std::mutex> lock(mtx); ++counter; } } int main() { std::thread t1(increment); std::thread t2(increment); t1.join(); t2.join(); std::cout << "Counter: " << counter << std::endl; // 20000 return 0; }

unique_lock:

cpp
std::mutex mtx; void process() { std::unique_lock<std::mutex> lock(mtx); // 可以手动解锁 lock.unlock(); // 执行一些不需要锁的操作 // 重新加锁 lock.lock(); } // 条件变量配合使用 std::mutex mtx; std::condition_variable cv; bool ready = false; void worker() { std::unique_lock<std::mutex> lock(mtx); cv.wait(lock, []{ return ready; }); // 执行工作 }

try_lock:

cpp
std::mutex mtx1, mtx2; void process() { std::unique_lock<std::mutex> lock1(mtx1, std::defer_lock); std::unique_lock<std::mutex> lock2(mtx2, std::defer_lock); if (std::try_lock(lock1, lock2) == -1) { // 成功获取两个锁 // 执行操作 } else { // 未能获取所有锁 } }

条件变量(Condition Variable)

基本使用:

cpp
#include <condition_variable> #include <mutex> #include <thread> #include <queue> #include <iostream> std::mutex mtx; std::condition_variable cv; std::queue<int> dataQueue; bool finished = false; void producer() { for (int i = 0; i < 10; ++i) { { std::lock_guard<std::mutex> lock(mtx); dataQueue.push(i); std::cout << "Produced: " << i << std::endl; } cv.notify_one(); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } { std::lock_guard<std::mutex> lock(mtx); finished = true; } cv.notify_all(); } void consumer() { while (true) { std::unique_lock<std::mutex> lock(mtx); cv.wait(lock, []{ return !dataQueue.empty() || finished; }); if (dataQueue.empty() && finished) { break; } int value = dataQueue.front(); dataQueue.pop(); lock.unlock(); std::cout << "Consumed: " << value << std::endl; } } int main() { std::thread p(producer); std::thread c(consumer); p.join(); c.join(); return 0; }

原子操作(Atomic)

基本类型:

cpp
#include <atomic> #include <thread> #include <iostream> std::atomic<int> atomicCounter(0); void increment() { for (int i = 0; i < 10000; ++i) { atomicCounter.fetch_add(1, std::memory_order_relaxed); } } int main() { std::thread t1(increment); std::thread t2(increment); t1.join(); t2.join(); std::cout << "Atomic counter: " << atomicCounter << std::endl; return 0; }

内存顺序:

cpp
std::atomic<bool> flag(false); std::atomic<int> data(0); void writer() { data.store(42, std::memory_order_release); flag.store(true, std::memory_order_release); } void reader() { while (!flag.load(std::memory_order_acquire)) { // 等待 } int value = data.load(std::memory_order_acquire); std::cout << "Data: " << value << std::endl; }

CAS(Compare-And-Swap):

cpp
std::atomic<int> value(0); bool updateIfEqual(int expected, int desired) { return value.compare_exchange_weak(expected, desired); } // 使用 if (updateIfEqual(0, 1)) { std::cout << "Updated successfully" << std::endl; } else { std::cout << "Update failed" << std::endl; }

线程局部存储

thread_local:

cpp
#include <thread> #include <iostream> thread_local int threadLocalVar = 0; void printThreadId() { ++threadLocalVar; std::cout << "Thread ID: " << std::this_thread::get_id() << ", Value: " << threadLocalVar << std::endl; } int main() { std::thread t1(printThreadId); std::thread t2(printThreadId); std::thread t3(printThreadId); t1.join(); t2.join(); t3.join(); return 0; }

异步操作(Future 和 Promise)

基本使用:

cpp
#include <future> #include <iostream> int calculate() { std::this_thread::sleep_for(std::chrono::seconds(2)); return 42; } int main() { std::future<int> result = std::async(std::launch::async, calculate); std::cout << "Doing other work..." << std::endl; int value = result.get(); // 等待结果 std::cout << "Result: " << value << std::endl; return 0; }

Promise:

cpp
#include <future> #include <thread> #include <iostream> void setValue(std::promise<int> prom) { std::this_thread::sleep_for(std::chrono::seconds(1)); prom.set_value(100); } int main() { std::promise<int> prom; std::future<int> fut = prom.get_future(); std::thread t(setValue, std::move(prom)); int value = fut.get(); std::cout << "Value: " << value << std::endl; t.join(); return 0; }

Packaged Task:

cpp
#include <future> #include <functional> int add(int a, int b) { return a + b; } int main() { std::packaged_task<int(int, int)> task(add); std::future<int> result = task.get_future(); std::thread t(std::move(task), 10, 20); t.join(); std::cout << "Result: " << result.get() << std::endl; return 0; }

线程池实现

简单线程池:

cpp
#include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <functional> #include <vector> class ThreadPool { private: std::vector<std::thread> workers; std::queue<std::function<void()>> tasks; std::mutex queueMutex; std::condition_variable condition; bool stop; public: ThreadPool(size_t threads) : stop(false) { for (size_t i = 0; i < threads; ++i) { workers.emplace_back([this] { while (true) { std::function<void()> task; { std::unique_lock<std::mutex> lock(this->queueMutex); this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); }); if (this->stop && this->tasks.empty()) { return; } task = std::move(this->tasks.front()); this->tasks.pop(); } task(); } }); } } template <class F, class... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; auto task = std::make_shared<std::packaged_task<return_type()>>( std::bind(std::forward<F>(f), std::forward<Args>(args)...) ); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(queueMutex); if (stop) { throw std::runtime_error("enqueue on stopped ThreadPool"); } tasks.emplace([task]() { (*task)(); }); } condition.notify_one(); return res; } ~ThreadPool() { { std::unique_lock<std::mutex> lock(queueMutex); stop = true; } condition.notify_all(); for (std::thread& worker : workers) { worker.join(); } } }; // 使用 int main() { ThreadPool pool(4); auto result1 = pool.enqueue([](int a, int b) { return a + b; }, 10, 20); auto result2 = pool.enqueue([](int a, int b) { return a * b; }, 5, 6); std::cout << "Result 1: " << result1.get() << std::endl; std::cout << "Result 2: " << result2.get() << std::endl; return 0; }

最佳实践

1. 避免数据竞争

cpp
// 错误:数据竞争 int sharedData = 0; std::thread t1([&](){ ++sharedData; }); std::thread t2([&](){ ++sharedData; }); // 正确:使用互斥锁 std::mutex mtx; int sharedData = 0; std::thread t1([&](){ std::lock_guard<std::mutex> lock(mtx); ++sharedData; }); std::thread t2([&](){ std::lock_guard<std::mutex> lock(mtx); ++sharedData; });

2. 避免死锁

cpp
// 错误:可能导致死锁 std::mutex mtx1, mtx2; void thread1() { std::lock_guard<std::mutex> lock1(mtx1); std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::lock_guard<std::mutex> lock2(mtx2); } void thread2() { std::lock_guard<std::mutex> lock2(mtx2); std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::lock_guard<std::mutex> lock1(mtx1); } // 正确:使用 std::lock void thread1Safe() { std::lock(mtx1, mtx2); std::lock_guard<std::mutex> lock1(mtx1, std::adopt_lock); std::lock_guard<std::mutex> lock2(mtx2, std::adopt_lock); } void thread2Safe() { std::lock(mtx1, mtx2); std::lock_guard<std::mutex> lock1(mtx1, std::adopt_lock); std::lock_guard<std::mutex> lock2(mtx2, std::adopt_lock); }

3. 使用 RAII 管理锁

cpp
// 推荐 void safeFunction() { std::lock_guard<std::mutex> lock(mtx); // 临界区代码 } // 自动释放锁 // 不推荐 void unsafeFunction() { mtx.lock(); // 临界区代码 mtx.unlock(); // 可能忘记或异常导致未释放 }

4. 优先使用原子操作

cpp
// 推荐:原子操作 std::atomic<int> counter(0); ++counter; // 不推荐:互斥锁(对于简单操作) std::mutex mtx; int counter = 0; { std::lock_guard<std::mutex> lock(mtx); ++counter; }

注意事项

  • 始终确保线程被正确 join 或 detach
  • 避免在析构函数中使用互斥锁
  • 注意条件变量的虚假唤醒
  • 合理选择内存顺序,避免过度使用 memory_order_seq_cst
  • 避免过度细粒度的锁,可能导致性能下降
  • 使用线程池管理大量短期任务
  • 注意异常安全,确保资源正确释放
  • 避免在多线程环境中使用全局变量
标签:C++