Linux并发模型与锁机制深度解析

深入探讨Linux多线程编程模型、锁机制原理,以及并发控制的最佳实践

在现代多核处理器系统中,并发编程是发挥硬件性能的关键。Linux提供了丰富的并发控制机制,从基础的线程同步原语到高级的无锁算法。本文将深入解析Linux并发模型的工作原理,分析各种锁机制的性能特征,并提供并发优化的最佳实践。

Linux并发编程模型

并发模型对比

Rendering diagram...

线程创建与管理

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/sysinfo.h>

// 线程参数
typedef struct {
    int thread_id;
    long iterations;
    unsigned long counter;
    pthread_mutex_t *mutex;
} thread_param;

// 基础线程函数
void* basic_thread_function(void *arg) {
    thread_param *param = (thread_param *)arg;
    printf("线程 %d 启动\n",param->thread_id);
    
    // 获取线程信息
    pthread_t thread = pthread_self();
    pid_t tid = gettid();
    
    printf("线程 %d: pthread_id=%lu,tid=%d\n",
        param->thread_id,(unsigned long)thread,tid);
    
    // 获取CPU亲和性
    cpu_set_t cpuset;
    pthread_getaffinity_np(thread,sizeof(cpu_set),&cpuset);
    
    printf("线程 %d CPU亲和性: ",param->thread_id);
    for (int cpu = 0; cpu < CPU_SETSIZE; cpu++) {
     if (CPU_ISSET(cpu,&cpuset)) {
         printf("%d ",cpu);
     }
    }
    printf("\n");
    
    // 执行计算任务
    for (long i = 0; i < param->iterations; i++) {
     param->counter += i;
     // 添加工作负载
     volatile int result = 0;
     for (int j = 0; j < 100; j++) {
         result += j;
     }
    }
    
    printf("线程 %d 完成,counter=%lu\n",param->thread_id,param->counter);
    return NULL;
}

// 带互斥锁的线程函数
void* mutex_thread_function(void *arg) {
    thread_param *param = (thread_param *)arg;
    printf("线程 %d (带锁) 启动\n",param->thread_id);
    
    for (long i = 0; i < param->iterations; i++) {
     pthread_mutex_lock(param->mutex);
     
     // 临界区操作
     param->counter += i;
     
     // 模拟工作负载
     volatile int result = 0;
     for (int j = 0; j < 100; j++) {
         result += j;
     }
     
     pthread_mutex_unlock(param->mutex);
    }
    
    printf("线程 %d (带锁) 完成,counter=%lu\n",
        param->thread_id,param->counter);
    return NULL;
}

// 线程池工作函数
void* worker_thread(void *arg) {
    int thread_id = *(int *)arg;
    printf("工作线程 %d 就绪\n",thread_id);
    
    // 设置线程为可取消状态
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,NULL);
    
    while (1) {
     // 检查取消请求
     pthread_testcancel();
     
     // 模拟工作
     usleep(1000);  // 1ms
     
     // 执行任务...
     static long task_counter = 0;
     if (task_counter++ % 1000 == 0) {
         printf("工作线程 %d 已执行 %ld 个任务\n",thread_id,task_counter);
     }
    }
    
    return NULL;
}

// 线程属性配置
void configure_thread_attributes(pthread_attr_t *attr) {
    // 初始化属性
    pthread_attr_init(attr);
    
    // 设置分离状态
    pthread_attr_setdetachstate(attr,PTHREAD_CREATE_JOINABLE);
    
    // 设置调度策略
    pthread_attr_setschedpolicy(attr,SCHED_RR);
    
    // 设置继承调度
    pthread_attr_setinheritsched(attr,PTHREAD_EXPLICIT_SCHED);
    
    // 设置作用域
    pthread_attr_setscope(attr,PTHREAD_SCOPE_SYSTEM);
    
    // 设置栈大小
    size_t stack_size = 1024 * 1024;  // 1MB
    pthread_attr_setstacksize(attr,stack_size);
    
    printf("线程属性配置完成:\n");
    printf("  分离状态: PTHREAD_CREATE_JOINABLE\n");
    printf("  调度策略: SCHED_RR\n");
    printf("  作用域: PTHREAD_SCOPE_SYSTEM\n");
    printf("  栈大小: %zu KB\n",stack_size / 1024);
}

// 线程创建性能测试
void thread_creation_performance_test() {
    printf("\n线程创建性能测试\n");
    printf("=============================\n");
    
    const int num_threads = 1000;
    const int iterations = 100000;
    
    // 获取CPU核心数
    int num_cpus = sysconf(_SC_NPROCESSORS_ONLN);
    printf("系统CPU核心数: %d\n",num_cpus);
    
    // 测试1: 无锁版本
    printf("\n测试1: 无锁多线程\n");
    clock_t start = clock();
    
    pthread_t threads1[num_threads];
    thread_param params1[num_threads];
    
    for (int i = 0; i < num_threads; i++) {
     params1[i].thread_id = i;
     params1[i].iterations = iterations / num_threads;
     params1[i].counter = 0;
     params1[i].mutex = NULL;
     
     pthread_create(&threads1[i],NULL,basic_thread_function,&params1[i]);
     
     // 设置CPU亲和性,均匀分布到各个核心
     cpu_set_t cpuset;
     CPU_ZERO(&cpuset);
     CPU_SET(i % num_cpus,&cpuset);
     pthread_setaffinity_np(threads1[i],sizeof(cpu_set),&cpuset);
    }
    
    unsigned long total_counter1 = 0;
    for (int i = 0; i < num_threads; i++) {
     pthread_join(threads1[i],NULL);
     total_counter1 += params1[i].counter;
    }
    
    clock_t end = clock();
    double time1 = ((double)(end - start)) / CLOCKS_PER_SEC;
    
    printf("无锁版本:\n");
    printf("  执行时间: %.4f 秒\n",time1);
    printf("  总计数: %lu\n",total_counter1);
    printf("  吞吐量: %.2f M ops/sec\n",
        (num_threads * iterations) / time1 / 1e6);
    
    // 测试2: 带锁版本
    printf("\n测试2: 带互斥锁多线程\n");
    start = clock();
    
    pthread_mutex_t mutex;
    pthread_mutex_init(&mutex,NULL);
    
    pthread_t threads2[num_threads];
    thread_param params2[num_threads];
    
    for (int i = 0; i < num_threads; i++) {
     params2[i].thread_id = i;
     params2[i].iterations = iterations / num_threads;
     params2[i].counter = 0;
     params2[i].mutex = &mutex;
     
     pthread_create(&threads2[i],NULL,mutex_thread_function,&params2[i]);
     
     // 设置CPU亲和性
     cpu_set_t cpuset;
     CPU_ZERO(&cpuset);
     CPU_SET(i % num_cpus,&cpuset);
     pthread_setaffinity_np(threads2[i],sizeof(cpu_set),&cpuset);
    }
    
    unsigned long total_counter2 = 0;
    for (int i = 0; i < num_threads; i++) {
     pthread_join(threads2[i],NULL);
     total_counter2 += params2[i].counter;
    }
    
    end = clock();
    double time2 = ((double)(end - start)) / CLOCKS_PER_SEC;
    
    pthread_mutex_destroy(&mutex);
    
    printf("带锁版本:\n");
    printf("  执行时间: %.4f 秒\n",time2);
    printf("  总计数: %lu\n",total_counter2);
    printf("  吞吐量: %.2f M ops/sec\n",
        (num_threads * iterations) / time2 / 1e6);
    printf("  性能损失: %.2f%%\n",(time2 - time1) / time1 * 100);
}

// 线程池实现
typedef struct {
    pthread_t *threads;
    int thread_count;
    int *thread_ids;
    int running;
} thread_pool;

// 创建线程池
thread_pool* create_thread_pool(int thread_count) {
    thread_pool *pool = malloc(sizeof(thread_pool));
    if (!pool) return NULL;
    
    pool->thread_count = thread_count;
    pool->threads = malloc(sizeof(pthread_t) * thread_count);
    pool->thread_ids = malloc(sizeof(int) * thread_count);
    pool->running = 1;
    
    for (int i = 0; i < thread_count; i++) {
     pool->thread_ids[i] = i;
     pthread_create(&pool->threads[i],NULL,worker_thread,&pool->thread_ids[i]);
    }
    
    printf("线程池创建完成: %d 个工作线程\n",thread_count);
    return pool;
}

// 销毁线程池
void destroy_thread_pool(thread_pool *pool) {
    printf("销毁线程池...\n");
    
    pool->running = 0;
    
    // 取消所有线程
    for (int i = 0; i < pool->thread_count; i++) {
     pthread_cancel(pool->threads[i]);
     pthread_join(pool->threads[i],NULL);
    }
    
    free(pool->threads);
    free(pool->thread_ids);
    free(pool);
    
    printf("线程池已销毁\n");
}

int main() {
    printf("Linux并发编程模型分析\n");
    printf("=============================\n\n");
    
    // 获取系统信息
    printf("系统信息:\n");
    printf("  CPU核心数: %d\n",sysconf(_SC_NPROCESSORS_ONLN));
    printf("  页面大小: %ld KB\n",sysconf(_SC_PAGESIZE) / 1024);
    printf("  时钟频率: %ld Hz\n",sysconf(_SC_CLK_TCK));
    
    // 线程创建性能测试
    thread_creation_performance_test();
    
    // 线程属性配置演示
    printf("\n线程属性配置演示\n");
    printf("=============================\n\n");
    
    pthread_attr_t attr;
    configure_thread_attributes(&attr);
    
    pthread_t test_thread;
    thread_param param = {0,1000,0,NULL};
    pthread_create(&test_thread,&attr,basic_thread_function,&param);
    pthread_join(test_thread,NULL);
    
    pthread_attr_destroy(&attr);
    
    // 线程池演示
    printf("\n线程池演示\n");
    printf("=============================\n\n");
    
    thread_pool *pool = create_thread_pool(4);
    sleep(5);  // 运行5秒
    destroy_thread_pool(pool);
    
    return 0;
}

锁机制深度分析

锁类型与性能特征

Rendering diagram...

互斥锁性能分析

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <time.h>
#include <sched.h>

#define NUM_ITERATIONS 10000000
#define NUM_THREADS 8

// 互斥锁性能测试
void mutex_performance_test() {
    printf("互斥锁性能分析\n");
    printf("=============================\n\n");
    
    pthread_mutex_t mutex;
    pthread_mutex_init(&mutex,NULL);
    
    // 测试不同的锁类型
    printf("1. 普通互斥锁\n");
    
    struct timespec start,end;
    clock_gettime(CLOCK_MONOTONIC,&start);
    
    for (int i = 0; i < NUM_ITERATIONS; i++) {
     pthread_mutex_lock(&mutex);
     // 临界区操作
     volatile int value = 42;
     (void)value;
     pthread_mutex_unlock(&mutex);
    }
    
    clock_gettime(CLOCK_MONOTONIC,&end);
    double time1 = (end.tv_sec - start.tv_sec) + 
               (end.tv_nsec - start.tv_nsec) / 1e9;
    
    printf("  执行时间: %.6f 秒\n",time1);
    printf("  平均延迟: %.6f μs\n",(time1 * 1e6) / NUM_ITERATIONS);
    printf("  吞吐量: %.2f M ops/sec\n",NUM_ITERATIONS / time1 / 1e6);
    
    // 测试自适应互斥锁
    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    pthread_mutexattr_settype(&attr,PTHREAD_MUTEX_ADAPTIVE_NP);
    
    pthread_mutex_t adaptive_mutex;
    pthread_mutex_init(&adaptive_mutex,&attr);
    
    printf("\n2. 自适应互斥锁\n");
    clock_gettime(CLOCK_MONOTONIC,&start);
    
    for (int i = 0; i < NUM_ITERATIONS; i++) {
     pthread_mutex_lock(&adaptive_mutex);
     volatile int value = 42;
     (void)value;
     pthread_mutex_unlock(&adaptive_mutex);
    }
    
    clock_gettime(CLOCK_MONOTONIC,&end);
    double time2 = (end.tv_sec - start.tv_sec) + 
               (end.tv_nsec - start.tv_nsec) / 1e9;
    
    printf("  执行时间: %.6f 秒\n",time2);
    printf("  平均延迟: %.6f μs\n",(time2 * 1e6) / NUM_ITERATIONS);
    printf("  吞吐量: %.2f M ops/sec\n",NUM_ITERATIONS / time2 / 1e6);
    
    pthread_mutexattr_destroy(&attr);
    pthread_mutex_destroy(&mutex);
    pthread_mutex_destroy(&adaptive_mutex);
}

// 读写锁性能测试
void rwlock_performance_test() {
    printf("\n读写锁性能分析\n");
    printf("=============================\n\n");
    
    pthread_rwlock_t rwlock;
    pthread_rwlock_init(&rwlock,NULL);
    
    const int iterations = 10000000;
    
    // 读操作测试
    printf("1. 读操作测试\n");
    struct timespec start,end;
    clock_gettime(CLOCK_MONOTONIC,&start);
    
    for (int i = 0; i < iterations; i++) {
     pthread_rwlock_rdlock(&rwlock);
     volatile int value = 42;
     (void)value;
     pthread_rwlock_unlock(&rwlock);
    }
    
    clock_gettime(CLOCK_MONOTONIC,&end);
    double read_time = (end.tv_sec - start.tv_sec) + 
                   (end.tv_nsec - start.tv_nsec) / 1e9;
    
    printf("  执行时间: %.6f 秒\n",read_time);
    printf("  读吞吐量: %.2f M ops/sec\n",iterations / read_time / 1e6);
    
    // 写操作测试
    printf("\n2. 写操作测试\n");
    clock_gettime(CLOCK_MONOTONIC,&start);
    
    for (int i = 0; i < iterations; i++) {
     pthread_rwlock_wrlock(&rwlock);
     volatile int value = 42;
     (void)value;
     pthread_rwlock_unlock(&rwlock);
    }
    
    clock_gettime(CLOCK_MONOTONIC,&end);
    double write_time = (end.tv_sec - start.tv_sec) + 
                    (end.tv_nsec - start.tv_nsec) / 1e9;
    
    printf("  执行时间: %.6f 秒\n",write_time);
    printf("  写吞吐量: %.2f M ops/sec\n",iterations / write_time / 1e6);
    
    printf("\n性能特征:\n");
    printf("  读操作延迟: %.6f μs\n",(read_time * 1e6) / iterations);
    printf("  写操作延迟: %.6f μs\n",(write_time * 1e6) / iterations);
    printf("  读写性能比: %.2f:1\n",write_time / read_time);
    
    pthread_rwlock_destroy(&rwlock);
}

// 自旋锁性能测试
void spinlock_performance_test() {
    printf("\n自旋锁性能分析\n");
    printf("=============================\n\n");
    
    pthread_spinlock_t spinlock;
    pthread_spin_init(&spinlock,PTHREAD_PROCESS_PRIVATE);
    
    const int iterations = 10000000;
    
    printf("自旋锁测试:\n");
    struct timespec start,end;
    clock_gettime(CLOCK_MONOTONIC,&start);
    
    for (int i = 0; i < iterations; i++) {
     pthread_spin_lock(&spinlock);
     // 临界区操作(短时间)
     volatile int value = 42;
     (void)value;
     pthread_spin_unlock(&spinlock);
    }
    
    clock_gettime(CLOCK_MONOTONIC,&end);
    double time = (end.tv_sec - start.tv_sec) + 
              (end.tv_nsec - start.tv_nsec) / 1e9;
    
    printf("  执行时间: %.6f 秒\n",time);
    printf("  平均延迟: %.6f μs\n",(time * 1e6) / iterations);
    printf("  吞吐量: %.2f M ops/sec\n",iterations / time / 1e6);
    
    pthread_spin_destroy(&spinlock);
    
    printf("\n自旋锁使用建议:\n");
    printf("  - 适合短临界区(< 1μs)\n");
    printf("  - 避免在临界区进行阻塞操作\n");
    printf("  - 适合多核系统\n");
    printf("  - 注意CPU占用率\n");
}

// 锁竞争分析
void lock_contention_analysis() {
    printf("\n锁竞争分析\n");
    printf("=============================\n\n");
    
    const int num_threads = 8;
    const int iterations_per_thread = 100000;
    
    pthread_mutex_t mutex;
    pthread_mutex_init(&mutex,NULL);
    
    typedef struct {
     int thread_id;
     unsigned long counter;
     unsigned long wait_cycles;
    } contention_data;
    
    contention_data data[num_threads];
    pthread_t threads[num_threads];
    
    // 高竞争场景
    printf("高竞争场景测试 (%d 线程)\n",num_threads);
    struct timespec start,end;
    clock_gettime(CLOCK_MONOTONIC,&start);
    
    for (int i = 0; i < num_threads; i++) {
     data[i].thread_id = i;
     data[i].counter = 0;
     data[i].wait_cycles = 0;
     
     pthread_create(&threads[i],NULL,[](void *arg) -> void* {
         contention_data *d = (contention_data *)arg;
         
         for (int j = 0; j < iterations_per_thread; j++) {
             struct timespec wait_start,wait_end;
             clock_gettime(CLOCK_MONOTONIC,&wait_start);
             
             pthread_mutex_lock(&mutex);
             clock_gettime(CLOCK_MONOTONIC,&wait_end);
             
             // 计算等待时间
             unsigned long wait_ns = (wait_end.tv_sec - wait_start.tv_sec) * 1000000000 +
                                    (wait_end.tv_nsec - wait_start.tv_nsec);
             d->wait_cycles += wait_ns;
             
             // 临界区操作
             d->counter++;
             volatile int value = d->counter;
             (void)value;
             
             pthread_mutex_unlock(&mutex);
         }
         
         return NULL;
     },&data[i]);
    }
    
    for (int i = 0; i < num_threads; i++) {
     pthread_join(threads[i],NULL);
    }
    
    clock_gettime(CLOCK_MONOTONIC,&end);
    double total_time = (end.tv_sec - start.tv_sec) + 
                    (end.tv_nsec - start.tv_nsec) / 1e9;
    
    // 统计分析
    unsigned long total_counter = 0;
    unsigned long total_wait_cycles = 0;
    
    for (int i = 0; i < num_threads; i++) {
     total_counter += data[i].counter;
     total_wait_cycles += data[i].wait_cycles;
    }
    
    printf("  总执行时间: %.4f 秒\n",total_time);
    printf("  总计数: %lu\n",total_counter);
    printf("  总吞吐量: %.2f M ops/sec\n",total_counter / total_time / 1e6);
    printf("  平均等待时间: %.6f μs\n",(total_wait_cycles / 1e3) / total_counter);
    printf("  等待时间占比: %.2f%%\n",(total_wait_cycles / 1e9) / total_time * 100);
    
    pthread_mutex_destroy(&mutex);
    
    printf("\n优化建议:\n");
    printf("  - 减少锁的持有时间\n");
    printf("  - 使用细粒度锁\n");
    printf("  - 考虑无锁算法\n");
    printf("  - 使用读写锁区分读写操作\n");
}

int main() {
    printf("锁机制深度分析\n");
    printf("=============================\n\n");
    
    mutex_performance_test();
    rwlock_performance_test();
    spinlock_performance_test();
    lock_contention_analysis();
    
    return 0;
}

并发编程最佳实践

并发设计原则

Rendering diagram...

线程安全数据结构

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdbool.h>

// 线程安全的队列
typedef struct queue_node {
    void *data;
    struct queue_node *next;
} queue_node;

typedef struct {
    queue_node *head;
    queue_node *tail;
    size_t size;
    size_t max_size;
    pthread_mutex_t mutex;
    pthread_cond_t not_empty;
    pthread_cond_t not_full;
} thread_safe_queue;

// 初始化线程安全队列
thread_safe_queue* queue_init(size_t max_size) {
    thread_safe_queue *queue = malloc(sizeof(thread_safe_queue));
    if (!queue) return NULL;
    
    queue->head = NULL;
    queue->tail = NULL;
    queue->size = 0;
    queue->max_size = max_size;
    
    pthread_mutex_init(&queue->mutex,NULL);
    pthread_cond_init(&queue->not_empty,NULL);
    pthread_cond_init(&queue->not_full,NULL);
    
    printf("线程安全队列初始化完成 (最大大小: %zu)\n",max_size);
    return queue;
}

// 入队操作
bool queue_push(thread_safe_queue *queue,void *data,bool block) {
    pthread_mutex_lock(&queue->mutex);
    
    // 等待队列不满
    while (queue->size >= queue->max_size && block) {
     pthread_cond_wait(&queue->not_full,&queue->mutex);
    }
    
    if (queue->size >= queue->max_size) {
     pthread_mutex_unlock(&queue->mutex);
     return false;  // 队列已满
    }
    
    // 创建新节点
    queue_node *node = malloc(sizeof(queue_node));
    if (!node) {
     pthread_mutex_unlock(&queue->mutex);
     return false;
    }
    
    node->data = data;
    node->next = NULL;
    
    // 添加到队列尾部
    if (queue->tail) {
     queue->tail->next = node;
    } else {
     queue->head = node;
    }
    queue->tail = node;
    queue->size++;
    
    // 通知等待的消费者
    pthread_cond_signal(&queue->not_empty);
    
    pthread_mutex_unlock(&queue->mutex);
    return true;
}

// 出队操作
void* queue_pop(thread_safe_queue *queue,bool block) {
    pthread_mutex_lock(&queue->mutex);
    
    // 等待队列不空
    while (queue->size == 0 && block) {
     pthread_cond_wait(&queue->not_empty,&queue->mutex);
    }
    
    if (queue->size == 0) {
     pthread_mutex_unlock(&queue->mutex);
     return NULL;  // 队列为空
    }
    
    // 移除队列头部节点
    queue_node *node = queue->head;
    void *data = node->data;
    
    queue->head = node->next;
    if (!queue->head) {
     queue->tail = NULL;
    }
    queue->size--;
    
    free(node);
    
    // 通知等待的生产者
    pthread_cond_signal(&queue->not_full);
    
    pthread_mutex_unlock(&queue->mutex);
    return data;
}

// 获取队列大小
size_t queue_size(thread_safe_queue *queue) {
    pthread_mutex_lock(&queue->mutex);
    size_t size = queue->size;
    pthread_mutex_unlock(&queue->mutex);
    return size;
}

// 销毁队列
void queue_destroy(thread_safe_queue *queue) {
    pthread_mutex_lock(&queue->mutex);
    
    // 释放所有节点
    queue_node *node = queue->head;
    while (node) {
     queue_node *next = node->next;
     free(node);
     node = next;
    }
    
    pthread_mutex_unlock(&queue->mutex);
    
    pthread_mutex_destroy(&queue->mutex);
    pthread_cond_destroy(&queue->not_empty);
    pthread_cond_destroy(&queue->not_full);
    
    free(queue);
    printf("线程安全队列已销毁\n");
}

// 生产者-消费者模式
typedef struct {
    thread_safe_queue *queue;
    int producer_id;
    int num_items;
} producer_param;

typedef struct {
    thread_safe_queue *queue;
    int consumer_id;
} consumer_param;

// 生产者线程
void* producer_thread(void *arg) {
    producer_param *param = (producer_param *)arg;
    
    for (int i = 0; i < param->num_items; i++) {
     int *data = malloc(sizeof(int));
     *data = param->producer_id * 1000 + i;
     
     if (queue_push(param->queue,data,true)) {
         printf("生产者 %d 生产了: %d\n",param->producer_id,*data);
     } else {
         free(data);
         printf("生产者 %d 队列已满\n",param->producer_id);
     }
     
     usleep(1000);  // 1ms
    }
    
    return NULL;
}

// 消费者线程
void* consumer_thread(void *arg) {
    consumer_param *param = (consumer_param *)arg;
    
    while (true) {
     int *data = queue_pop(param->queue,true);
     if (data) {
         printf("消费者 %d 消费了: %d\n",param->consumer_id,*data);
         free(data);
     } else {
         printf("消费者 %d 队列为空\n",param->consumer_id);
         break;
     }
     
     usleep(2000);  // 2ms
    }
    
    return NULL;
}

// 生产者-消费者测试
void producer_consumer_test() {
    printf("\n生产者-消费者模式测试\n");
    printf("=============================\n\n");
    
    // 创建线程安全队列
    thread_safe_queue *queue = queue_init(100);
    
    // 创建生产者线程
    const int num_producers = 3;
    const int num_consumers = 2;
    const int items_per_producer = 10;
    
    pthread_t producer_threads[num_producers];
    pthread_t consumer_threads[num_consumers];
    
    producer_param producer_params[num_producers];
    consumer_param consumer_params[num_consumers];
    
    for (int i = 0; i < num_producers; i++) {
     producer_params[i].queue = queue;
     producer_params[i].producer_id = i;
     producer_params[i].num_items = items_per_producer;
     
     pthread_create(&producer_threads[i],NULL,producer_thread,&producer_params[i]);
    }
    
    for (int i = 0; i < num_consumers; i++) {
     consumer_params[i].queue = queue;
     consumer_params[i].consumer_id = i;
     
     pthread_create(&consumer_threads[i],NULL,consumer_thread,&consumer_params[i]);
    }
    
    // 等待生产者完成
    for (int i = 0; i < num_producers; i++) {
     pthread_join(producer_threads[i],NULL);
    }
    
    printf("所有生产者完成\n");
    
    // 等待消费者完成(简化处理)
    sleep(2);
    
    for (int i = 0; i < num_consumers; i++) {
     pthread_cancel(consumer_threads[i]);
     pthread_join(consumer_threads[i],NULL);
    }
    
    printf("队列最终大小: %zu\n",queue_size(queue));
    
    queue_destroy(queue);
}

int main() {
    producer_consumer_test();
    
    printf("\n并发编程最佳实践总结\n");
    printf("=============================\n\n");
    
    printf("1. 锁使用原则:\n");
    printf("   - 最小化锁的持有时间\n");
    printf("   - 避免在临界区进行复杂操作\n");
    printf("   - 使用适当的锁类型\n");
    printf("   - 注意锁的顺序避免死锁\n");
    
    printf("\n2. 数据结构设计:\n");
    printf("   - 优先使用线程安全的数据结构\n");
    printf("   - 考虑无锁数据结构\n");
    printf("   - 合理设计数据访问模式\n");
    printf("   - 减少共享数据的使用\n");
    
    printf("\n3. 线程管理:\n");
    printf("   - 合理设置线程数量\n");
    printf("   - 使用线程池管理线程\n");
    printf("   - 正确处理线程取消\n");
    printf("   - 避免线程泄漏\n");
    
    printf("\n4. 性能优化:\n");
    printf("   - 减少锁竞争\n");
    printf("   - 使用细粒度锁\n");
    printf("   - 考虑读写锁优化\n");
    printf("   - 监控并发性能指标\n");
    
    return 0;
}

通过深入理解Linux并发模型和锁机制,结合最佳实践和线程安全的数据结构设计,可以构建高性能、高可靠性的并发应用程序。关键是选择合适的并发控制机制,并在性能和正确性之间找到最佳平衡点。