无锁编程与原子操作深度实践

深入探讨无锁编程原理、原子操作技术,以及lock-free数据结构的实现与性能优化

无锁编程是并发编程的高级技术,通过原子操作和内存序保证,在不使用锁的情况下实现线程安全的数据访问。相比传统的锁机制,无锁算法能够提供更好的性能和可扩展性,特别是在高并发场景下。本文将深入探讨无锁编程的原理、原子操作技术,以及实际应用中的最佳实践。

原子操作基础

C++11原子类型

#include <iostream>
#include <atomic>
#include <thread>
#include <vector>
#include <chrono>

// 基础原子操作演示
void basic_atomic_operations() {
    std::cout << "基础原子操作演示" << std::endl;
    std::cout << "=============================" << std::endl << std::endl;
    
    // 原子整数类型
    std::atomic<int> atomic_int(0);
    std::atomic<long> atomic_long(0);
    std::atomic<bool> atomic_bool(false);
    
    std::cout << "原子类型特性:" << std::endl;
    std::cout << "  atomic_int is_lock_free: " << atomic_int.is_lock_free() << std::endl;
    std::cout << "  atomic_long is_lock_free: " << atomic_long.is_lock_free() << std::endl;
    std::cout << "  atomic_bool is_lock_free: " << atomic_bool.is_lock_free() << std::endl;
    
    // 原子读取和写入
    atomic_int.store(42);
    int value = atomic_int.load();
    std::cout << "  读写操作: atomic_int = " << value << std::endl;
    
    // 原子交换操作
    int old_value = atomic_int.exchange(100);
    std::cout << "  交换操作: old=" << old_value << ",new=100" << std::endl;
    
    // 原子比较交换
    int expected = 100;
    bool success = atomic_int.compare_exchange_weak(expected,200);
    std::cout << "  比较交换: success=" << success << ",value=" << atomic_int.load() << std::endl;
    
    // 原子算术操作
    atomic_int.fetch_add(10);
    std::cout << "  加法操作: " << atomic_int.load() << std::endl;
    
    atomic_int.fetch_sub(5);
    std::cout << "  减法操作: " << atomic_int.load() << std::endl;
    
    atomic_int.fetch_and(0xFF);
    std::cout << "  位与操作: " << atomic_int.load() << std::endl;
    
    atomic_int.fetch_or(0x0F);
    std::cout << "  位或操作: " << atomic_int.load() << std::endl;
    
    // 原子指针操作
    int data = 42;
    std::atomic<int*> atomic_ptr(&data);
    
    int* ptr = atomic_ptr.load();
    std::cout << "  指针操作: *ptr = " << *ptr << std::endl;
}

// 内存序类型演示
void memory_order_demonstration() {
    std::cout << "\n内存序类型演示" << std::endl;
    std::cout << "=============================" << std::endl << std::endl;
    
    std::atomic<int> x(0);
    std::atomic<int> y(0);
    
    // 松散序 - 最弱保证,仅保证原子性
    x.store(1,std::memory_order_relaxed);
    int relaxed_value = y.load(std::memory_order_relaxed);
    
    // 获取-释放序 - 保证同步操作
    x.store(1,std::memory_order_release);
    int acquire_value = y.load(std::memory_order_acquire);
    
    // 顺序一致性 - 最强保证,全局顺序
    x.store(1,std::memory_order_seq_cst);
    int seq_cst_value = y.load(std::memory_order_seq_cst);
    
    std::cout << "内存序类型性能影响:" << std::endl;
    std::cout << "  memory_order_relaxed: 最快,保证最少" << std::endl;
    std::cout << "  memory_order_acquire/release: 中等,保证同步" << std::endl;
    std::cout << "  memory_order_seq_cst: 最慢,保证全局顺序" << std::endl;
}

// 原子操作性能测试
void atomic_performance_test() {
    std::cout << "\n原子操作性能测试" << std::endl;
    std::cout << "=============================" << std::endl << std::endl;
    
    const int num_iterations = 10000000;
    
    // 测试1: 原子自增
    std::atomic<int> counter(0);
    
    auto start = std::chrono::high_resolution_clock::now();
    for (int i = 0; i < num_iterations; i++) {
     counter.fetch_add(1,std::memory_order_relaxed);
    }
    auto end = std::chrono::high_resolution_clock::now();
    
    auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
    std::cout << "原子自增操作:" << std::endl;
    std::cout << "  耗时: " << duration.count() << " μs" << std::endl;
    std::cout << "  吞吐量: " << (num_iterations * 1000.0 / duration.count()) << " M ops/sec" << std::endl;
    
    // 测试2: 比较交换操作
    std::atomic<int> compare_exchange_counter(0);
    
    start = std::chrono::high_resolution_clock::now();
    for (int i = 0; i < num_iterations; i++) {
     int expected = compare_exchange_counter.load(std::memory_order_relaxed);
     while (!compare_exchange_counter.compare_exchange_weak(
                expected,expected + 1,
                std::memory_order_release,
                std::memory_order_relaxed)) {
         expected = compare_exchange_counter.load(std::memory_order_relaxed);
     }
    }
    end = std::chrono::high_resolution_clock::now();
    
    duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
    std::cout << "\n比较交换操作:" << std::endl;
    std::cout << "  耗时: " << duration.count() << " μs" << std::endl;
    std::cout << "  吞吐量: " << (num_iterations * 1000.0 / duration.count()) << " M ops/sec" << std::endl;
    
    // 测试3: 原子指针操作
    struct Node {
     int value;
     Node* next;
    };
    
    std::atomic<Node*> atomic_head(nullptr);
    
    start = std::chrono::high_resolution_clock::now();
    for (int i = 0; i < num_iterations; i++) {
     Node* new_node = new Node{i,nullptr};
     Node* old_head = atomic_head.load(std::memory_order_relaxed);
     do {
         new_node->next = old_head;
     } while (!atomic_head.compare_exchange_weak(
                old_head,new_node,
                std::memory_order_release,
                std::memory_order_relaxed));
    }
    end = std::chrono::high_resolution_clock::now();
    
    duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
    std::cout << "\n原子指针操作 (无锁链表插入):" << std::endl;
    std::cout << "  耗时: " << duration.count() << " μs" << std::endl;
    std::cout << "  吞吐量: " << (num_iterations * 1000.0 / duration.count()) << " M ops/sec" << std::endl;
    
    // 清理链表
    while (Node* node = atomic_head.load()) {
     atomic_head.store(node->next,std::memory_order_relaxed);
     delete node;
    }
}

无锁数据结构实现

无锁队列

#include <iostream>
#include <atomic>
#include <memory>

// 无锁队列实现
template<typename T>
class LockFreeQueue {
private:
    struct Node {
     std::shared_ptr<T> data;
     Node* next;
     
     Node() : next(nullptr) {}
     Node(T const& value) : data(std::make_shared<T>(value)),next(nullptr) {}
    };
    
    std::atomic<Node*> head;
    std::atomic<Node*> tail;
    
public:
    LockFreeQueue() : head(new Node),tail(head.load()) {}
    
    ~LockFreeQueue() {
     while (Node* const old_head = head.load()) {
         head.store(old_head->next);
         delete old_head;
     }
    }
    
    // 入队操作
    void enqueue(T const& value) {
     Node* const new_node = new Node(value);
     Node* old_tail = tail.load();
     
     while (true) {
         // 尝试将新节点链接到队列尾部
         if (old_tail->next.compare_exchange_weak(
                 nullptr,new_node,
                 std::memory_order_release,
                 std::memory_order_relaxed)) {
             break;  // 成功链接
         }
         
         // 如果失败,更新tail指针重试
         old_tail = tail.load();
     }
     
     // 更新tail指针指向新节点
     tail.store(new_node,std::memory_order_release);
    }
    
    // 出队操作
    std::shared_ptr<T> dequeue() {
     Node* old_head = head.load();
     
     while (true) {
         if (old_head == tail.load()) {
             return nullptr;  // 队列为空
         }
         
         Node* const next = old_head->next;
         
         if (head.compare_exchange_weak(
                 old_head,next,
                 std::memory_order_acquire,
                 std::memory_order_relaxed)) {
             // 成功更新head指针
             std::shared_ptr<T> const res = old_head->data;
             delete old_head;
             return res;
         }
         
         // 如果失败,继续循环重试
     }
    }
    
    bool empty() const {
     return head.load() == tail.load();
    }
    
    size_t size() const {
     size_t count = 0;
     Node* current = head.load();
     while (current != tail.load()) {
         count++;
         current = current->next;
     }
     return count;
    }
};

// 无锁队列性能测试
void lockfree_queue_test() {
    std::cout << "\n无锁队列性能测试" << std::endl;
    std::cout << "=============================" << std::endl << std::endl;
    
    LockFreeQueue<int> queue;
    
    const int num_operations = 1000000;
    
    // 入队性能测试
    auto start = std::chrono::high_resolution_clock::now();
    for (int i = 0; i < num_operations; i++) {
     queue.enqueue(i);
    }
    auto end = std::chrono::high_resolution_clock::now();
    
    auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
    std::cout << "入队操作:" << std::endl;
    std::cout << "  操作数: " << num_operations << std::endl;
    std::cout << "  耗时: " << duration.count() << " μs" << std::endl;
    std::cout << "  吞吐量: " << (num_operations * 1000.0 / duration.count()) << " M ops/sec" << std::endl;
    
    // 出队性能测试
    start = std::chrono::high_resolution_clock::now();
    int count = 0;
    while (auto value = queue.dequeue()) {
     count++;
    }
    end = std::chrono::high_resolution_clock::now();
    
    duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
    std::cout << "\n出队操作:" << std::endl;
    std::cout << "  操作数: " << count << std::endl;
    std::cout << "  耗时: " << duration.count() << " μs" << std::endl;
    std::cout << "  吞吐量: " << (count * 1000.0 / duration.count()) << " M ops/sec" << std::endl;
}

// 无锁栈实现
template<typename T>
class LockFreeStack {
private:
    struct Node {
     T data;
     Node* next;
     
     Node(T const& value) : data(value),next(nullptr) {}
    };
    
    std::atomic<Node*> head;
    
public:
    LockFreeStack() : head(nullptr) {}
    
    ~LockFreeStack() {
     while (Node* const old_head = head.load()) {
         head.store(old_head->next);
         delete old_head;
     }
    }
    
    // 入栈操作
    void push(T const& value) {
     Node* const new_node = new Node(value);
     new_node->next = head.load(std::memory_order_relaxed);
     
     while (!head.compare_exchange_weak(
                new_node->next,new_node,
                std::memory_order_release,
                std::memory_order_relaxed)) {
         // 重试直到成功
     }
    }
    
    // 出栈操作
    std::shared_ptr<T> pop() {
     Node* old_head = head.load(std::memory_order_acquire);
     
     while (old_head && !head.compare_exchange_weak(
                old_head,old_head->next,
                std::memory_order_acquire,
                std::memory_order_relaxed)) {
         // 重试直到成功
     }
     
     if (!old_head) {
         return nullptr;  // 栈为空
     }
     
     std::shared_ptr<T> const res(std::make_shared<T>(old_head->data));
     delete old_head;
     return res;
    }
    
    bool empty() const {
     return head.load() == nullptr;
    }
};

// 无锁栈性能测试
void lockfree_stack_test() {
    std::cout << "\n无锁栈性能测试" << std::endl;
    std::cout << "=============================" << std::endl << std::endl;
    
    LockFreeStack<int> stack;
    
    const int num_operations = 1000000;
    
    // 入栈性能测试
    auto start = std::chrono::high_resolution_clock::now();
    for (int i = 0; i < num_operations; i++) {
     stack.push(i);
    }
    auto end = std::chrono::high_resolution_clock::now();
    
    auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
    std::cout << "入栈操作:" << std::endl;
    std::cout << "  操作数: " << num_operations << std::endl;
    std::cout << "  耗时: " << duration.count() << " μs" << std::endl;
    std::cout << "  吞吐量: " << (num_operations * 1000.0 / duration.count()) << " M ops/sec" << std::endl;
    
    // 出栈性能测试
    start = std::chrono::high_resolution_clock::now();
    int count = 0;
    while (auto value = stack.pop()) {
     count++;
    }
    end = std::chrono::high_resolution_clock::now();
    
    duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
    std::cout << "\n出栈操作:" << std::endl;
    std::cout << "  操作数: " << count << std::endl;
    std::cout << "  耗时: " << duration.count() << " μs" << std::endl;
    std::cout << "  吞吐量: " << (count * 1000.0 / duration.count()) << " M ops/sec" << std::endl;
}

无锁哈希表

#include <iostream>
#include <atomic>
#include <vector>
#include <functional>

// 无锁哈希表简化实现
template<typename Key,typename Value,typename Hash = std::hash<Key>>
class LockFreeHashTable {
private:
    struct Node {
     Key key;
     Value value;
     Node* next;
     
     Node(Key const& k,Value const& v) : key(k),value(v),next(nullptr) {}
    };
    
    std::vector<std::atomic<Node*>> buckets;
    Hash hash_function;
    size_t size_;
    
public:
    LockFreeHashTable(size_t bucket_count = 16) 
     : buckets(bucket_count),size_(0) {
     for (auto& bucket : buckets) {
         bucket.store(nullptr,std::memory_order_relaxed);
     }
    }
    
    ~LockFreeHashTable() {
     for (auto& bucket : buckets) {
         Node* node = bucket.load(std::memory_order_relaxed);
         while (node) {
             Node* next = node->next;
             delete node;
             node = next;
         }
     }
    }
    
    // 插入操作
    bool insert(Key const& key,Value const& value) {
     size_t index = hash_function(key) % buckets.size();
     std::atomic<Node*>& bucket = buckets[index];
     
     Node* new_node = new Node(key,value);
     
     Node* expected = bucket.load(std::memory_order_relaxed);
     new_node->next = expected;
     
     while (!bucket.compare_exchange_weak(
                expected,new_node,
                std::memory_order_release,
                std::memory_order_relaxed)) {
         // 检查是否已存在相同key
         for (Node* current = expected; current; current = current->next) {
             if (current->key == key) {
                 delete new_node;
                 return false;  // key已存在
             }
         }
         
         new_node->next = expected;
     }
     
     size_.fetch_add(1,std::memory_order_relaxed);
     return true;
    }
    
    // 查找操作
    bool find(Key const& key,Value& result) {
     size_t index = hash_function(key) % buckets.size();
     std::atomic<Node*>& bucket = buckets[index];
     
     Node* current = bucket.load(std::memory_order_acquire);
     while (current) {
         if (current->key == key) {
             result = current->value;
             return true;
         }
         current = current->next;
     }
     
     return false;
    }
    
    // 删除操作
    bool remove(Key const& key) {
     size_t index = hash_function(key) % buckets.size();
     std::atomic<Node*>& bucket = buckets[index];
     
     Node* prev = nullptr;
     Node* current = bucket.load(std::memory_order_acquire);
     
     while (current) {
         if (current->key == key) {
             if (prev) {
                 prev->next = current->next;
             } else {
                 Node* next = current->next;
                 if (!bucket.compare_exchange_weak(
                         current,next,
                         std::memory_order_release,
                         std::memory_order_relaxed)) {
                     return false;  // CAS失败,重试
                 }
             }
             
             delete current;
             size_.fetch_sub(1,std::memory_order_relaxed);
             return true;
         }
         
         prev = current;
         current = current->next;
     }
     
     return false;  // key不存在
    }
    
    size_t size() const {
     return size_.load(std::memory_order_relaxed);
    }
    
    double load_factor() const {
     return static_cast<double>(size()) / buckets.size();
    }
};

// 无锁哈希表性能测试
void lockfree_hashtable_test() {
    std::cout << "\n无锁哈希表性能测试" << std::endl;
    std::cout << "=============================" << std::endl << std::endl;
    
    LockFreeHashTable<int,std::string> hashtable(1024);
    
    const int num_operations = 100000;
    
    // 插入性能测试
    auto start = std::chrono::high_resolution_clock::now();
    for (int i = 0; i < num_operations; i++) {
     std::string value = "value_" + std::to_string(i);
     hashtable.insert(i,value);
    }
    auto end = std::chrono::high_resolution_clock::now();
    
    auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
    std::cout << "插入操作:" << std::endl;
    std::cout << "  操作数: " << num_operations << std::endl;
    std::cout << "  耗时: " << duration.count() << " μs" << std::endl;
    std::cout << "  吞吐量: " << (num_operations * 1000.0 / duration.count()) << " K ops/sec" << std::endl;
    std::cout << "  实际大小: " << hashtable.size() << std::endl;
    std::cout << "  负载因子: " << hashtable.load_factor() << std::endl;
    
    // 查找性能测试
    start = std::chrono::high_resolution_clock::now();
    int found_count = 0;
    for (int i = 0; i < num_operations; i++) {
     std::string value;
     if (hashtable.find(i,value)) {
         found_count++;
     }
    }
    end = std::chrono::high_resolution_clock::now();
    
    duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
    std::cout << "\n查找操作:" << std::endl;
    std::cout << "  操作数: " << num_operations << std::endl;
    std::cout << "  找到: " << found_count << std::endl;
    std::cout << "  耗时: " << duration.count() << " μs" << std::endl;
    std::cout << "  吞吐量: " << (num_operations * 1000.0 / duration.count()) << " K ops/sec" << std::endl;
}

高级无锁算法

ABA问题与解决方案

#include <iostream>
#include <atomic>
#include <memory>

// ABA问题演示
struct Node {
    int value;
    std::shared_ptr<Node> next;
    
    Node(int v) : value(v),next(nullptr) {}
};

void aba_problem_demonstration() {
    std::cout << "ABA问题演示" << std::endl;
    std::cout << "=============================" << std::endl << std::endl;
    
    std::cout << "ABA问题描述:" << std::endl;
    std::cout << "1. 线程1读取节点A" << std::endl;
    std::cout << "2. 线程2将A从链表中移除" << std::endl;
    std::cout << "3. 线程2分配新节点B,插入到链表" << std::endl;
    std::cout << "4. 线程2再次将A插入到链表" << std::endl;
    std::cout << "5. 线程1执行CAS操作,认为A没有变化" << std::endl;
    std::cout << "6. 实际上A已经被修改,导致数据不一致" << std::endl << std::endl;
    
    std::cout << "解决方案:" << std::endl;
    std::cout << "1. 使用版本号或计数器" << std::endl;
    std::cout << "2. 使用双重CAS" << std::endl;
    std::cout << "3. 使用垃圾回收机制" << std::endl;
    std::cout << "4. 使用Hazard Pointers" << std::endl;
}

// 带版本号的原子指针
template<typename T>
class VersionedPointer {
private:
    struct PackedPointer {
     uintptr_t ptr;
     uint64_t version;
    };
    
    std::atomic<PackedPointer> atomic_ptr;
    
public:
    VersionedPointer(T* ptr = nullptr) {
     PackedPointer packed;
     packed.ptr = reinterpret_cast<uintptr_t>(ptr);
     packed.version = 0;
     atomic_ptr.store(packed,std::memory_order_relaxed);
    }
    
    // 带版本号的比较交换
    bool compare_exchange_weak(T*& expected_ptr,T* desired_ptr) {
     PackedPointer expected,desired;
     expected.ptr = reinterpret_cast<uintptr_t>(expected_ptr);
     expected.version = atomic_ptr.load().version;
     
     desired.ptr = reinterpret_cast<uintptr_t>(desired_ptr);
     desired.version = expected.version + 1;
     
     if (atomic_ptr.compare_exchange_weak(
             expected,desired,
             std::memory_order_acq_rel,
             std::memory_order_relaxed)) {
         return true;
     }
     
     expected_ptr = reinterpret_cast<T*>(expected.ptr);
     return false;
    }
    
    T* load() const {
     return reinterpret_cast<T*>(atomic_ptr.load().ptr);
    }
};

// 使用版本号指针的无锁栈
template<typename T>
class ABASafeLockFreeStack {
private:
    struct Node {
     T data;
     Node* next;
     
     Node(T const& value) : data(value),next(nullptr) {}
    };
    
    VersionedPointer<Node> head;
    
public:
    void push(T const& value) {
     Node* new_node = new Node(value);
     Node* expected_head = head.load();
     
     do {
         new_node->next = expected_head;
     } while (!head.compare_exchange_weak(expected_head,new_node));
    }
    
    std::shared_ptr<T> pop() {
     Node* expected_head = head.load();
     Node* desired_head;
     
     do {
         if (!expected_head) {
             return nullptr;  // 栈为空
         }
         desired_head = expected_head->next;
     } while (!head.compare_exchange_weak(expected_head,desired_head));
     
     std::shared_ptr<T> result(std::make_shared<T>(expected_head->data));
     delete expected_head;
     return result;
    }
};

void aba_safe_stack_test() {
    std::cout << "\nABA安全无锁栈测试" << std::endl;
    std::cout << "=============================" << std::endl << std::endl;
    
    ABASafeLockFreeStack<int> stack;
    
    const int num_operations = 100000;
    
    // 基础功能测试
    auto start = std::chrono::high_resolution_clock::now();
    for (int i = 0; i < num_operations; i++) {
     stack.push(i);
    }
    auto end = std::chrono::high_resolution_clock::now();
    
    auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
    std::cout << "ABA安全栈性能:" << std::endl;
    std::cout << "  入栈操作: " << num_operations << std::endl;
    std::cout << "  耗时: " << duration.count() << " μs" << std::endl;
    std::cout << "  吞吐量: " << (num_operations * 1000.0 / duration.count()) << " M ops/sec" << std::endl;
    
    // 出栈测试
    start = std::chrono::high_resolution_clock::now();
    int count = 0;
    while (auto value = stack.pop()) {
     count++;
    }
    end = std::chrono::high_resolution_clock::now();
    
    duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
    std::cout << "  出栈操作: " << count << std::endl;
    std::cout << "  耗时: " << duration.count() << " μs" << std::endl;
    std::cout << "  吞吐量: " << (count * 1000.0 / duration.count()) << " M ops/sec" << std::endl;
}

无锁编程最佳实践

性能对比分析

#include <iostream>
#include <atomic>
#include <mutex>
#include <vector>
#include <thread>

// 锁队列 vs 无锁队列性能对比
template<typename T>
class LockQueue {
private:
    std::mutex mtx;
    std::vector<T> data;
    
public:
    void enqueue(T const& value) {
     std::lock_guard<std::mutex> lock(mtx);
     data.push_back(value);
    }
    
    bool dequeue(T& value) {
     std::lock_guard<std::mutex> lock(mtx);
     if (data.empty()) {
         return false;
     }
     value = data.front();
     data.erase(data.begin());
     return true;
    }
    
    size_t size() {
     std::lock_guard<std::mutex> lock(mtx);
     return data.size();
    }
};

void lock_vs_lockfree_comparison() {
    std::cout << "\n锁队列 vs 无锁队列性能对比" << std::endl;
    std::cout << "=============================" << std::endl << std::endl;
    
    const int num_operations = 1000000;
    const int num_threads = 4;
    
    // 测试锁队列
    LockQueue<int> lock_queue;
    
    auto start = std::chrono::high_resolution_clock::now();
    
    std::vector<std::thread> producers;
    std::vector<std::thread> consumers;
    
    for (int i = 0; i < num_threads; i++) {
     producers.emplace_back([&lock_queue,num_operations,i]() {
         for (int j = 0; j < num_operations / num_threads; j++) {
             lock_queue.enqueue(i * 1000 + j);
         }
     });
    }
    
    for (int i = 0; i < num_threads; i++) {
     consumers.emplace_back([&lock_queue]() {
         int value;
         while (lock_queue.dequeue(value)) {
             // 处理数据
         }
     });
    }
    
    for (auto& thread : producers) {
     thread.join();
    }
    for (auto& thread : consumers) {
     thread.join();
    }
    
    auto end = std::chrono::high_resolution_clock::now();
    auto lock_duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
    
    std::cout << "锁队列性能:" << std::endl;
    std::cout << "  耗时: " << lock_duration.count() << " μs" << std::endl;
    std::cout << "  吞吐量: " << (num_operations * 1000.0 / lock_duration.count()) << " M ops/sec" << std::endl;
    
    // 测试无锁队列
    LockFreeQueue<int> lockfree_queue;
    
    start = std::chrono::high_resolution_clock::now();
    
    producers.clear();
    consumers.clear();
    
    for (int i = 0; i < num_threads; i++) {
     producers.emplace_back([&lockfree_queue,num_operations,i]() {
         for (int j = 0; j < num_operations / num_threads; j++) {
             lockfree_queue.enqueue(i * 1000 + j);
         }
     });
    }
    
    for (int i = 0; i < num_threads; i++) {
     consumers.emplace_back([&lockfree_queue]() {
         while (auto value = lockfree_queue.dequeue()) {
             // 处理数据
         }
     });
    }
    
    for (auto& thread : producers) {
     thread.join();
    }
    for (auto& thread : consumers) {
     thread.join();
    }
    
    end = std::chrono::high_resolution_clock::now();
    auto lockfree_duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
    
    std::cout << "\n无锁队列性能:" << std::endl;
    std::cout << "  耗时: " << lockfree_duration.count() << " μs" << std::endl;
    std::cout << "  吞吐量: " << (num_operations * 1000.0 / lockfree_duration.count()) << " M ops/sec" << std::endl;
    
    // 性能对比
    std::cout << "\n性能对比:" << std::endl;
    double speedup = static_cast<double>(lock_duration.count()) / lockfree_duration.count();
    std::cout << "  无锁队列加速比: " << speedup << "x" << std::endl;
    std::cout << "  性能提升: " << (speedup - 1) * 100 << "%" << std::endl;
}

int main() {
    std::cout << "无锁编程与原子操作深度实践" << std::endl;
    std::cout << "=============================" << std::endl << std::endl;
    
    basic_atomic_operations();
    memory_order_demonstration();
    atomic_performance_test();
    
    lockfree_queue_test();
    lockfree_stack_test();
    lockfree_hashtable_test();
    
    aba_problem_demonstration();
    aba_safe_stack_test();
    
    lock_vs_lockfree_comparison();
    
    std::cout << "\n无锁编程最佳实践总结" << std::endl;
    std::cout << "=============================" << std::endl << std::endl;
    
    std::cout << "1. 适用场景:" << std::endl;
    std::cout << "   - 高并发环境" << std::endl;
    std::cout << "   - 对延迟敏感的应用" << std::endl;
    std::cout << "   - 需要高可扩展性的系统" << std::endl;
    std::cout << "   - 细粒度并发控制" << std::endl;
    
    std::cout << "\n2. 注意事项:" << std::endl;
    std::cout << "   - 确保算法的正确性和线程安全性" << std::endl;
    std::cout << "   - 处理ABA问题" << std::endl;
    std::cout << "   - 避免内存泄漏" << std::endl;
    std::cout << "   - 考虑CPU缓存一致性" << std::endl;
    
    std::cout << "\n3. 性能优化:" << std::endl;
    std::cout << "   - 选择合适的内存序" << std::endl;
    std::cout << "   - 减少CAS重试次数" << std::endl;
    std::cout << "   - 优化数据结构布局" << std::endl;
    std::cout << "   - 考虑NUMA架构影响" << std::endl;
    
    return 0;
}

通过深入理解原子操作和无锁编程技术,结合具体的数据结构实现和性能优化,可以构建高性能的并发系统。无锁编程虽然复杂,但在适当的场景下能够提供显著性能优势,是现代高并发系统的重要技术手段。