Pthread实现多线程编程

本文将用两个实例来实战使用pthread进行多线程编程,主要包含两个部分:

  1. 以数据划分的方式并行计算PI值
  2. 基于生产者—消费者模式进行线程池开发,具体的业务处理逻辑将被简化,重点表现线程管理和同步

计算Pi

思路简述

依据莱布尼兹公式,通过多线程计算较多的次数,逼近$\pi$ 。用多线程的方式进行数据划分、即每个线程分担处理部分数据,从而进行加速。 同时由于多线程访问全局的结果可能会有冲突,因此使用互斥量和信号量组织线程有序将局部结果加到全局结果中。

代码实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <pthread.h>  
#include <stdio.h>  
#include <stdlib.h>  
#include <math.h>  

const int BLOCK_SIZE = 100000;  
double sum = 0;  
int num_threads;  
// 定义互斥量和条件变量  
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;  
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;  
void *calculate_block(void *thread_id)  
{  
    long id = (long)thread_id;  
    int start = id * BLOCK_SIZE;  
    int end = start + BLOCK_SIZE;  
    double block_sum = 0;  
    for (int i = start; i < end; i++) {  
        double term = pow(-1, i) / (2 * i + 1);  
        block_sum += term;  
    }  
    pthread_mutex_lock(&mutex);  
    sum += block_sum;    
    pthread_mutex_unlock(&mutex);  
    pthread_cond_signal(&cond);  
}  

int main()  
{  
    num_threads = 8;  
    pthread_t threads[num_threads];  
    for (long i = 0; i < num_threads; i++)
        pthread_create(&threads[i], NULL, calculate_block, (void *)i);  
    for(int i = 0; i < num_threads; i ++)
        pthread_join(threads[i], NULL);
    printf("%lf", sum * 4);
}

线程池设计

线程池实现

使用一个任务队列作为生产者和消费者之间的缓冲区,任务队列中的每一个元素包含要执行的函数和函数参数,对应代码如下:

1
2
3
4
typedef struct task_t {
    void (*func)(void *);
    void *arg;
} task_t;

线程池包含一个任务队列、若干线程、互斥量与信号量以及其它关键属性,定义如下:

1
2
3
4
5
6
7
8
9
typedef struct thread_pool_t {
    pthread_t threads[MAX_THREADS];
    int num_threads;
    task_t queue[MAX_QUEUE];
    int front, rear, size;
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    bool shutdown;
} thread_pool_t;

在生产者方面,thread_pool_enqueue函数用于将任务添加到任务队列中。当生产者生产了一个任务后,它首先会通过线程池的互斥锁pool->mutex来保护任务队列,防止多个线程同时修改任务队列,然后使用条件变量pool->cond来通知消费者有新的任务到达。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
bool thread_pool_enqueue(thread_pool_t *pool, void (*func)(void *), void *arg){
    pthread_mutex_lock(&pool->mutex);
    if (pool->size == MAX_QUEUE) {
        pthread_mutex_unlock(&pool->mutex);
        return false;
    }
    task_t task = { .func = func, .arg = arg };
    pool->queue[pool->rear] = task;
    pool->rear = (pool->rear + 1) % MAX_QUEUE;
    pool->size++;
    pthread_cond_signal(&pool->cond);
    pthread_mutex_unlock(&pool->mutex);
    return true;
}

在消费者方面,thread_pool_worker函数用于从任务队列中取出任务并执行。当消费者从任务队列中取出一个任务时,它会使用互斥锁pool->mutex来保护任务队列。如果任务队列为空,消费者将会被条件变量pool->mutex阻塞,等待生产者添加新的任务到任务队列中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
void *thread_pool_worker(void *arg){
    thread_pool_t *pool = (thread_pool_t *)arg;
    while (true) {
        pthread_mutex_lock(&pool->mutex);
        while (pool->size == 0 && !pool->shutdown)
            pthread_cond_wait(&pool->cond, &pool->mutex);
        if (pool->size == 0 && pool->shutdown) {
            pthread_mutex_unlock(&pool->mutex);
            pthread_exit(NULL);
        }
        task_t task = pool->queue[pool->front];
        pool->front = (pool->front + 1) % MAX_QUEUE;
        pool->size--;
        pthread_mutex_unlock(&pool->mutex);
        task.func(task.arg);
    }
}

线程池的启动包括初始化任务队列、信号量、启动消费者线程;线程池的关闭最重要的是等待所有线程运行结束;两者实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
void thread_pool_init(thread_pool_t *pool, int num_threads) {
    pool->num_threads = num_threads;
    pool->front = pool->rear = pool->size = 0;
    pool->shutdown = false;
    pthread_mutex_init(&pool->mutex, NULL);
    pthread_cond_init(&pool->cond, NULL);
    for (int i = 0; i < num_threads; i++)
        pthread_create(&pool->threads[i], NULL, thread_pool_worker, pool);
}

void thread_pool_shutdown(thread_pool_t *pool) {
    pthread_mutex_lock(&pool->mutex);
    pool->shutdown = true;
    pthread_cond_broadcast(&pool->cond);
    pthread_mutex_unlock(&pool->mutex);
    for (int i = 0; i < pool->num_threads; i++)
        pthread_join(pool->threads[i], NULL);
    pthread_mutex_destroy(&pool->mutex);
    pthread_cond_destroy(&pool->cond);
}

设计一个简单的任务,输出任务id与线程id,并放到线程池运行:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
void my_task(void *arg){
    int *num = (int *)arg;
    printf("Task %d by thread %lu\n", *num, pthread_self());
    free(num);
}

int main() {
    thread_pool_t pool;
    thread_pool_init(&pool, 8);
    for (int i = 0; i < 20; i++) {
        int *num = malloc(sizeof(int));
        *num = i;
        thread_pool_enqueue(&pool, my_task, num);
    }
    printf("main thread %lu\n", pthread_self());
    thread_pool_shutdown(&pool);
    return 0;
}

运行结果

在ubuntu 20中编译gcc tp.c -o tp -lpthread,运行结果描述如下:

1
2
3
4
Task 0 by thread 140422668744478
Task 4 by thread 140422668744456
Task 5 by thread 140422668744434
...

线程池

使用线程池完成矩阵相加$a+b=c$,相应的任务函数如下:

1
2
3
4
5
6
7
8
void add_matrix(void *arg) {
    int *num = (int *)arg;
    int st = *num, ed = st + 5;
    for (int i = st; i < ed; i++)
        c[i] = a[i] + b[i];
    printf("Task [add_matrix] %d by thread %lu, range = %d ~ %d \n", *num /5, pthread_self(), st, ed);
    free(num);
}
Buy me a coffee~
Tim 支付宝支付宝
Tim 贝宝贝宝
Tim 微信微信
0%