高性能并发框架与库实践

探索Intel TBB、OpenMP任务并行、线程池等高性能并发框架的设计原理与实战应用

在现代C++并发编程中,直接管理线程和锁虽然能提供最大的控制力,但也容易出错且难以扩展。高性能并发框架(如Intel TBB、OpenMP任务模式)和成熟的库(如Folly、Abseil)封装了复杂的底层细节,提供了更安全、更高效的并行编程模型。本文将深入探讨这些框架的内部机制,并通过实战案例展示如何利用它们构建高性能并发应用。

Intel TBB:任务并行与流式处理

TBB核心概念

Rendering diagram...

TBB并行算法实战

#include <iostream>
#include <vector>
#include <algorithm>
#include <numeric>
#include <chrono>
#include <tbb/tbb.h>
#include <tbb/parallel_for.h>
#include <tbb/blocked_range.h>
#include <tbb/concurrent_queue.h>
#include <tbb/flow_graph.h>

// TBB并行for示例
void tbb_parallel_for_example() {
    std::cout << "TBB并行算法示例" << std::endl;
    std::cout << "=============================" << std::endl << std::endl;
    
    const int size = 10000000;
    std::vector<int> data(size);
    
    // 1. 初始化数据
    auto start = std::chrono::high_resolution_clock::now();
    tbb::parallel_for(tbb::blocked_range<int>(0,size),
     [&](const tbb::blocked_range<int>& r) {
         for (int i = r.begin(); i < r.end(); ++i) {
             data[i] = i * 2;
         }
     });
    auto end = std::chrono::high_resolution_clock::now();
    
    auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
    std::cout << "TBB parallel_for 初始化:" << std::endl;
    std::cout << "  耗时: " << duration.count() << " μs" << std::endl;
    
    // 2. 并行归约求和
    start = std::chrono::high_resolution_clock::now();
    long sum = tbb::parallel_reduce(
     tbb::blocked_range<int>(0,size),
     0L,
     [&](const tbb::blocked_range<int>& r,long init) -> long {
         for (int i = r.begin(); i < r.end(); ++i) {
             init += data[i];
         }
         return init;
     },
     [](long x,long y) -> long {
         return x + y;
     }
    );
    end = std::chrono::high_resolution_clock::now();
    
    duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
    std::cout << "\nTBB parallel_reduce 求和:" << std::endl;
    std::cout << "  结果: " << sum << std::endl;
    std::cout << "  耗时: " << duration.count() << " μs" << std::endl;
    
    // 3. 并行排序
    std::vector<int> unsorted_data(size);
    std::generate(unsorted_data.begin(),unsorted_data.end(),[](){ return rand(); });
    
    start = std::chrono::high_resolution_clock::now();
    tbb::parallel_sort(unsorted_data.begin(),unsorted_data.end());
    end = std::chrono::high_resolution_clock::now();
    
    duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
    std::cout << "\nTBB parallel_sort:" << std::endl;
    std::cout << "  耗时: " << duration.count() << " μs" << std::endl;
    
    // 验证排序正确性
    bool is_sorted = std::is_sorted(unsorted_data.begin(),unsorted_data.end());
    std::cout << "  排序正确: " << (is_sorted ? "是" : "否") << std::endl;
}

// TBB并发队列与生产者-消费者
void tbb_concurrent_queue_example() {
    std::cout << "\nTBB并发队列示例" << std::endl;
    std::cout << "=============================" << std::endl << std::endl;
    
    tbb::concurrent_bounded_queue<int> queue;
    queue.set_capacity(1000); // 设置容量限制
    
    // 生产者
    auto producer = [&]() {
     for (int i = 0; i < 100000; ++i) {
         queue.try_push(i);
     }
    };
    
    // 消费者
    std::atomic<int> sum(0);
    auto consumer = [&]() {
     int value;
     while (true) {
         if (queue.try_pop(value)) {
             sum += value;
         } else {
             // 简单的退出逻辑,实际生产中通常使用毒丸或标志位
             static std::atomic<int> finished(0);
             if (finished.fetch_add(1) >= 100) { // 模拟检查
                 break; // 这里的逻辑仅作演示
             }
         }
     }
    };
    
    std::thread p1(producer),p2(producer);
    std::thread c1(consumer),c2(consumer);
    
    p1.join(); p2.join();
    // 通知消费者结束(在实际代码中应使用更优雅的方式)
    
    c1.join(); c2.join();
    
    std::cout << "生产者-消费者完成" << std::endl;
    std::cout << "  队列已空,消费者应退出" << std::endl;
}

// TBB流图示例
void tbb_flow_graph_example() {
    std::cout << "\nTBB流图示例" << std::endl;
    std::cout << "=============================" << std::endl << std::endl;
    
    tbb::flow::graph g;
    
    // 节点1: 生成数字
    tbb::flow::source_node<int> source(g,[&](int& v) -> bool {
     static int counter = 0;
     if (counter < 10) {
         v = counter++;
         std::cout << "Source: " << v << std::endl;
         return true;
     }
     return false;
    });
    
    // 节点2: 平方
    tbb::flow::function_node<int,int> square(g,tbb::flow::unlimited,
     [](int v) -> int {
         int res = v * v;
         std::cout << "Square: " << v << " -> " << res << std::endl;
         return res;
     });
    
    // 节点3: 打印结果
    tbb::flow::function_node<int> sink(g,1,
     [](int v) {
         std::cout << "Sink received: " << v << std::endl;
     });
    
    // 构建图: source -> square -> sink
    tbb::flow::make_edge(source,square);
    tbb::flow::make_edge(square,sink);
    
    // 执行图
    source.activate();
    g.wait_for_all();
}

高级线程池设计

工作窃取线程池

#include <iostream>
#include <vector>
#include <thread>
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <future>
#include <atomic>
#include <memory>

// 任务包装器,支持任意返回类型
class Task {
public:
    virtual ~Task() = default;
    virtual void execute() = 0;
};

template<typename Func>
class GenericTask : public Task {
public:
    GenericTask(Func&& f) : func(std::move(f)) {}
    
    void execute() override {
     func();
    }
    
private:
    Func func;
};

// 工作窃取线程池
class ThreadPool {
private:
    std::vector<std::thread> workers;
    std::vector<std::queue<std::unique_ptr<Task>>> task_queues;
    std::vector<std::mutex> queue_mutexes;
    std::atomic<bool> stop;
    std::atomic<size_t> next_worker_index;
    
    // 窃取任务:尝试从其他线程队列中获取任务
    bool try_steal_task(int current_worker,std::unique_ptr<Task>& task) {
     size_t start = next_worker_index.fetch_add(1) % workers.size();
     
     for (size_t i = 0; i < workers.size(); ++i) {
         size_t target = (start + i) % workers.size();
         if (target == current_worker) continue;
         
         std::lock_guard<std::mutex> lock(queue_mutexes[target]);
         if (!task_queues[target].empty()) {
             task = std::move(task_queues[target].front());
             task_queues[target].pop();
             return true;
         }
     }
     return false;
    }
    
    void worker_thread(int worker_id) {
     while (true) {
         std::unique_ptr<Task> task;
         
         // 1. 尝试从本地队列获取任务
         {
             std::lock_guard<std::mutex> lock(queue_mutexes[worker_id]);
             if (!task_queues[worker_id].empty()) {
                 task = std::move(task_queues[worker_id].front());
                 task_queues[worker_id].pop();
             }
         }
         
         // 2. 如果本地队列为空,尝试窃取任务
         if (!task && !stop) {
             if (try_steal_task(worker_id,task)) {
                 // 窃取成功
             } else {
                 // 没有任务可做,短暂休眠或yield
                 std::this_thread::yield();
                 continue;
             }
         }
         
         if (task) {
             task->execute();
         } else if (stop) {
             break;
         }
     }
    }
    
public:
    ThreadPool(size_t num_threads = std::thread::hardware_concurrency()) 
     : stop(false),next_worker_index(0) {
     workers.reserve(num_threads);
     task_queues.resize(num_threads);
     queue_mutexes.resize(num_threads);
     
     for (size_t i = 0; i < num_threads; ++i) {
         workers.emplace_back(&ThreadPool::worker_thread,this,i);
     }
     
     std::cout << "线程池初始化完成,工作线程数: " << num_threads << std::endl;
    }
    
    ~ThreadPool() {
     stop = true;
     for (auto& worker : workers) {
         if (worker.joinable()) {
             worker.join();
         }
     }
    }
    
    // 提交任务 (轮询分发)
    template<typename F>
    auto enqueue(F&& f) -> std::future<decltype(f())> {
     using ReturnType = decltype(f());
     auto task_promise = std::make_shared<std::promise<ReturnType>>();
     std::future<ReturnType> result = task_promise->get_future();
     
     auto task_wrapper = [func = std::forward<F>(f),promise = task_promise]() {
         try {
             if constexpr (std::is_void_v<ReturnType>) {
                 func();
                 promise->set_value();
             } else {
                 promise->set_value(func());
             }
         } catch (...) {
             promise->set_exception(std::current_exception());
         }
     };
     
     // 轮询选择目标线程
     size_t target = next_worker_index.fetch_add(1) % workers.size();
     
     std::lock_guard<std::mutex> lock(queue_mutexes[target]);
     task_queues[target].emplace(std::make_unique<GenericTask<decltype(task_wrapper)>>(std::move(task_wrapper)));
     
     return result;
    }
};

// 线程池性能测试
void thread_pool_benchmark() {
    std::cout << "\n工作窃取线程池基准测试" << std::endl;
    std::cout << "=============================" << std::endl << std::endl;
    
    ThreadPool pool(std::thread::hardware_concurrency());
    const int task_count = 100000;
    std::vector<std::future<int>> futures;
    
    auto start = std::chrono::high_resolution_clock::now();
    
    // 提交大量计算任务
    for (int i = 0; i < task_count; ++i) {
     futures.emplace_back(pool.enqueue([i]() {
         int sum = 0;
         for (int j = 0; j < 100; ++j) {
             sum += i * j;
         }
         return sum;
     }));
    }
    
    // 等待所有任务完成
    long total_sum = 0;
    for (auto& f : futures) {
     total_sum += f.get();
    }
    
    auto end = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
    
    std::cout << "总任务数: " << task_count << std::endl;
    std::cout << "总耗时: " << duration.count() << " ms" << std::endl;
    std::cout << "吞吐量: " << (task_count * 1000.0 / duration.count()) << " tasks/sec" << std::endl;
    std::cout << "验证和: " << total_sum << std::endl;
}

现代C++并发库:Folly与Abseil

Folly的高性能并发容器

Folly(Facebook Open Source Library)提供了一系列针对高并发场景优化的数据结构。虽然我们无法直接编译依赖Folly的代码,但我们可以学习其设计思想并用标准C++实现类似的功能。

核心设计思想:

  1. ProducerConsumerQueue: 基于环形数组的无锁队列,专为单生产者单消费者或单生产者多消费者设计。
  2. AtomicUnorderedMap: 结合细粒度锁或分层锁的无锁哈希表优化。
#include <iostream>
#include <atomic>
#include <array>
#include <memory>

// 模拟 folly::ProducerConsumerQueue 的单生产者单消费者无锁队列
template<typename T,size_t Capacity>
class SPSCQueue {
private:
    std::array<std::atomic<T*>,Capacity> buffer_;
    std::atomic<size_t> write_index_;
    std::atomic<size_t> read_index_;
    
public:
    SPSCQueue() : write_index_(0),read_index_(0) {
     for (auto& ptr : buffer_) {
         ptr.store(nullptr,std::memory_order_relaxed);
     }
    }
    
    // 生产者写入
    bool write(T* item) {
     const size_t current_write = write_index_.load(std::memory_order_relaxed);
     const size_t next_write = (current_write + 1) % Capacity;
     
     if (next_write == read_index_.load(std::memory_order_acquire)) {
         return false; // 队列满
     }
     
     buffer_[current_write].store(item,std::memory_order_release);
     write_index_.store(next_write,std::memory_order_release);
     return true;
    }
    
    // 消费者读取
    bool read(T*& item) {
     const size_t current_read = read_index_.load(std::memory_order_relaxed);
     
     if (current_read == write_index_.load(std::memory_order_acquire)) {
         return false; // 队列空
     }
     
     item = buffer_[current_read].load(std::memory_order_acquire);
     buffer_[current_read].store(nullptr,std::memory_order_relaxed);
     
     const size_t next_read = (current_read + 1) % Capacity;
     read_index_.store(next_read,std::memory_order_release);
     return true;
    }
};

void folly_style_queue_test() {
    std::cout << "\nFolly风格高性能队列模拟" << std::endl;
    std::cout << "=============================" << std::endl << std::endl;
    
    SPSCQueue<int,1024> queue;
    const int N = 1000000;
    
    auto producer = [&]() {
     for (int i = 0; i < N; ++i) {
         int* item = new int(i);
         while (!queue.write(item)) {
             // Busy wait or yield (in real folly,optimized better)
             std::this_thread::yield();
         }
     }
    };
    
    std::atomic<int> received_count(0);
    auto consumer = [&]() {
     int* item;
     int sum = 0;
     while (received_count.load() < N) {
         if (queue.read(item)) {
             sum += *item;
             delete item;
             received_count.fetch_add(1);
         } else {
             std::this_thread::yield();
         }
     }
     // 可以在这里输出sum验证正确性
    };
    
    auto start = std::chrono::high_resolution_clock::now();
    
    std::thread p(producer);
    std::thread c(consumer);
    p.join();
    c.join();
    
    auto end = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
    
    std::cout << "SPSC无锁队列测试:" << std::endl;
    std::cout << "  数据量: " << N << std::endl;
    std::cout << "  耗时: " << duration.count() << " μs" << std::endl;
    std::cout << "  吞吐量: " << (N * 1000000.0 / duration.count()) << " items/sec" << std::endl;
}

// Abseil风格的优化Mutex (自旋+休眠)
class HybridMutex {
private:
    std::atomic<bool> locked{false};
    std::atomic<int> contention_count{0};
    std::mutex sys_mutex;
    
    // 简单的自旋阈值
    static constexpr int kSpinThreshold = 1000;
    
public:
    void lock() {
     // 第一阶段:快速自旋
     for (int i = 0; i < kSpinThreshold; ++i) {
         if (!locked.load(std::memory_order_acquire) &&
             !locked.exchange(true,std::memory_order_acquire)) {
             return;
         }
         // _mm_pause() // x86 pause instruction
         std::this_thread::yield();
     }
     
     // 第二阶段:竞争激烈,退让
     contention_count.fetch_add(1,std::memory_order_relaxed);
     sys_mutex.lock();
    }
    
    void unlock() {
     locked.store(false,std::memory_order_release);
     if (contention_count.load(std::memory_order_relaxed) > 0) {
         contention_count.store(0,std::memory_order_relaxed);
         sys_mutex.unlock();
     }
    }
};

void abseil_mutex_benchmark() {
    std::cout << "\nAbseil风格混合锁基准测试" << std::endl;
    std::cout << "=============================" << std::endl << std::endl;
    
    HybridMutex mutex;
    std::atomic<long> counter{0};
    const int iterations = 1000000;
    
    auto task = [&]() {
     for (int i = 0; i < iterations; ++i) {
         std::lock_guard<HybridMutex> lock(mutex);
         counter.fetch_add(1,std::memory_order_relaxed);
     }
    };
    
    const int thread_count = 4;
    std::vector<std::thread> threads;
    auto start = std::chrono::high_resolution_clock::now();
    
    for (int i = 0; i < thread_count; ++i) {
     threads.emplace_back(task);
    }
    
    for (auto& t : threads) t.join();
    
    auto end = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
    
    std::cout << "结果: " << counter.load() << std::endl;
    std::cout << "耗时: " << duration.count() << " ms" << std::endl;
}

Executor模式与未来C++标准

通用执行器接口

C++26预计将引入std::execution,定义了统一的任务执行接口。我们可以预先设计一个类似的接口。

#include <memory>
#include <vector>
#include <type_traits>

// 简单的Executor概念
class Executor {
public:
    virtual ~Executor() = default;
    
    // 提交任务
    template<typename Func>
    void execute(Func&& func) {
     execute_impl(std::make_unique<GenericTask<Func>>(std::forward<Func>(func)));
    }
    
    // 提交任务并获取Future
    template<typename Func>
    auto submit(Func&& func) -> std::future<decltype(func())> {
     using ReturnType = decltype(func());
     auto promise = std::make_shared<std::promise<ReturnType>>();
     auto future = promise->get_future();
     
     execute([p = promise,f = std::forward<Func>(func)]() mutable {
         try {
             if constexpr (std::is_void_v<ReturnType>) {
                 f();
                 p->set_value();
             } else {
                 p->set_value(f());
             }
         } catch (...) {
             p->set_exception(std::current_exception());
         }
     });
     
     return future;
    }
    
protected:
    virtual void execute_impl(std::unique_ptr<Task> task) = 0;
    
private:
    // 需要引用前面定义的Task基类和GenericTask
    // 假设Task和GenericTask在当前作用域可见
};

// 线程池Executor实现
class ThreadPoolExecutor : public Executor {
private:
    ThreadPool pool;
    
public:
    ThreadPoolExecutor(size_t threads = std::thread::hardware_concurrency()) 
     : pool(threads) {}
    
protected:
    void execute_impl(std::unique_ptr<Task> task) override {
     // 使用lambda捕获task,转换为void()函数类型
     pool.enqueue([t = std::move(task)]() { t->execute(); });
    }
};

void executor_pattern_test() {
    std::cout << "\nExecutor模式测试" << std::endl;
    std::cout << "=============================" << std::endl << std::endl;
    
    ThreadPoolExecutor executor(4);
    
    // 简单执行
    executor.execute([]() {
     std::cout << "Task 1 executed" << std::endl;
    });
    
    // 提交带返回值的任务
    auto future1 = executor.submit([]() {
     std::this_thread::sleep_for(std::chrono::milliseconds(100));
     return 42;
    });
    
    auto future2 = executor.submit([]() {
     std::this_thread::sleep_for(std::chrono::milliseconds(50));
     return std::string("Hello Executor");
    });
    
    std::cout << "Waiting for results..." << std::endl;
    std::cout << "Result 1: " << future1.get() << std::endl;
    std::cout << "Result 2: " << future2.get() << std::endl;
}

OpenMP Task并行(GPU/异构计算的前身)

OpenMP 3.0+引入了#pragma omp task,允许将任意代码块作为任务调度,非常适合递归算法(如斐波那契、快速排序)。

#include <iostream>
#include <vector>
#include <algorithm>
#include <omp.h>

// OpenMP任务并行:快速排序
void parallel_quicksort(std::vector<int>::iterator begin,std::vector<int>::iterator end) {
    auto len = std::distance(begin,end);
    if (len <= 10000) {
     std::sort(begin,end);
     return;
    }
    
    auto pivot = begin + len / 2;
    std::nth_element(begin,pivot,end);
    
    // 创建任务并行处理左右两边
    #pragma omp task
    parallel_quicksort(begin,pivot);
    
    #pragma omp task
    parallel_quicksort(pivot + 1,end);
    
    // 等待子任务完成
    #pragma omp taskwait
}

void openmp_task_benchmark() {
    std::cout << "\nOpenMP任务并行示例" << std::endl;
    std::cout << "=============================" << std::endl << std::endl;
    
    const int N = 10000000;
    std::vector<int> data(N);
    std::generate(data.begin(),data.end(),[](){ return rand(); });
    
    auto data_copy = data;
    
    // 串行排序
    auto start = std::chrono::high_resolution_clock::now();
    std::sort(data_copy.begin(),data_copy.end());
    auto end = std::chrono::high_resolution_clock::now();
    std::cout << "串行排序耗时: " 
           << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() << " ms" << std::endl;
    
    // OpenMP任务并行排序
    start = std::chrono::high_resolution_clock::now();
    #pragma omp parallel
    {
     #pragma omp single
     parallel_quicksort(data.begin(),data.end());
    }
    end = std::chrono::high_resolution_clock::now();
    
    std::cout << "OpenMP任务并行耗时: " 
           << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() << " ms" << std::endl;
    
    std::cout << "排序验证: " << (std::is_sorted(data.begin(),data.end()) ? "通过" : "失败") << std::endl;
}

综合实战:构建高性能日志系统

异步日志系统设计

结合线程池、无锁队列和生产者-消费者模式,构建一个高性能的异步日志系统。

#include <iostream>
#include <string>
#include <memory>
#include <sstream>
#include <fstream>
#include <chrono>
#include <iomanip>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <atomic>

// 日志级别
enum class LogLevel { DEBUG,INFO,WARN,ERROR };

// 日志消息结构
struct LogMessage {
    LogLevel level;
    std::string content;
    std::chrono::system_clock::time_point timestamp;
    
    LogMessage(LogLevel l,std::string c) 
     : level(l),content(c),timestamp(std::chrono::system_clock::now()) {}
};

// 高性能异步Logger
class AsyncLogger {
private:
    std::queue<LogMessage> message_queue_;
    std::mutex queue_mutex_;
    std::condition_variable cv_;
    std::thread worker_;
    std::atomic<bool> running_{false};
    std::ofstream log_file_;
    
    // 格式化日志消息
    std::string format_message(const LogMessage& msg) {
     auto time_t = std::chrono::system_clock::to_time_t(msg.timestamp);
     auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(
         msg.timestamp.time_since_epoch()) % 1000;
     
     std::stringstream ss;
     ss << "[" << std::put_time(std::localtime(&time_t),"%Y-%m-%d %H:%M:%S");
     ss << "." << std::setfill('0') << std::setw(3) << ms.count() << "]";
     
     switch (msg.level) {
         case LogLevel::DEBUG: ss << "[DEBUG] "; break;
         case LogLevel::INFO:  ss << "[INFO] ";  break;
         case LogLevel::WARN:  ss << "[WARN] ";  break;
         case LogLevel::ERROR: ss << "[ERROR] "; break;
     }
     ss << msg.content << "\n";
     return ss.str();
    }
    
    void worker_loop() {
     while (running_) {
         LogMessage msg;
         {
             std::unique_lock<std::mutex> lock(queue_mutex_);
             cv_.wait(lock,[this](){ 
                 return !message_queue_.empty() || !running_; 
             });
             
             if (!running_ && message_queue_.empty()) break;
             if (!message_queue_.empty()) {
                 msg = std::move(message_queue_.front());
                 message_queue_.pop();
             } else {
                 continue;
             }
         }
         
         // 写入文件 (I/O操作在工作线程中完成)
         log_file_ << format_message(msg);
         log_file_.flush();
     }
    }
    
public:
    AsyncLogger(const std::string& filename) {
     log_file_.open(filename,std::ios::app);
     if (!log_file_.is_open()) {
         throw std::runtime_error("Failed to open log file");
     }
     
     running_ = true;
     worker_ = std::thread(&AsyncLogger::worker_loop,this);
    }
    
    ~AsyncLogger() {
     running_ = false;
     cv_.notify_all();
     if (worker_.joinable()) {
         worker_.join();
     }
     log_file_.close();
    }
    
    void log(LogLevel level,const std::string& message) {
     {
         std::lock_guard<std::mutex> lock(queue_mutex_);
         message_queue_.emplace(level,message);
     }
     cv_.notify_one();
    }
};

// 全局Logger示例
void async_logger_demo() {
    std::cout << "\n高性能异步日志系统演示" << std::endl;
    std::cout << "=============================" << std::endl << std::endl;
    
    AsyncLogger logger("app.log");
    
    // 模拟多线程日志写入
    const int num_threads = 8;
    const int logs_per_thread = 10000;
    
    auto start = std::chrono::high_resolution_clock::now();
    
    #pragma omp parallel for num_threads(num_threads)
    for (int i = 0; i < num_threads; ++i) {
     for (int j = 0; j < logs_per_thread; ++j) {
         std::string msg = "Thread " + std::to_string(i) + " log " + std::to_string(j);
         logger.log(LogLevel::INFO,msg);
     }
    }
    
    auto end = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
    
    std::cout << "总日志数: " << num_threads * logs_per_thread << std::endl;
    std::cout << "总耗时: " << duration.count() << " ms" << std::endl;
    std::cout << "吞吐量: " << (num_threads * logs_per_thread * 1000.0 / duration.count()) << " logs/sec" << std::endl;
}

int main() {
    std::cout << "高性能并发框架与库实践" << std::endl;
    std::cout << "=============================" << std::endl << std::endl;
    
    tbb_parallel_for_example();
    // tbb_concurrent_queue_example(); // 可能会挂起,取决于环境
    // tbb_flow_graph_example();
    
    thread_pool_benchmark();
    
    folly_style_queue_test();
    abseil_mutex_benchmark();
    
    executor_pattern_test();
    openmp_task_benchmark();
    
    async_logger_demo();
    
    std::cout << "\n并发框架选择指南" << std::endl;
    std::cout << "=============================" << std::endl << std::endl;
    std::cout << "1. Intel TBB: 适合任务并行和数据并行,尤其是数据量大的计算任务。" << std::endl;
    std::cout << "2. OpenMP Tasks: 适合递归算法和不规则并行,易于集成到现有代码。" << std::endl;
    std::cout << "3. 自定义线程池: 适合I/O密集型任务,需要细粒度控制的场景。" << std::endl;
    std::cout << "4. Folly/Abseil: 适合追求极致性能的大规模服务端应用。" << std::endl;
    
    return 0;
}

通过合理选择和应用这些并发框架与库,开发者可以显著提升并发程序的开发效率和运行性能,从繁琐的线程管理中解放出来,专注于业务逻辑的实现。