Multithreading Programming With Pthread

This article will use two examples to practically use pthread for multithreading programming, mainly including two parts:

  1. Parallel calculation of the PI value by data division
  2. Thread pool development based on the producer-consumer model, with the specific business logic simplified, focusing on thread management and synchronization

1 Calculating Pi

1.1 Brief Idea

According to the Leibniz formula, calculate more times with multithreading to approximate $\pi$. Use multithreading to divide data, i.e., each thread handles part of the data to accelerate the process. At the same time, since multithreading access to the global result may have conflicts, mutexes and semaphores are used to organize threads to orderly add local results to the global result.

1.2 Code Implementation

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <pthread.h>  
#include <stdio.h>  
#include <stdlib.h>  
#include <math.h>  

const int BLOCK_SIZE = 100000;  
double sum = 0;  
int num_threads;  
// Define mutex and condition variable  
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;  
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;  
void *calculate_block(void *thread_id)  
{  
    long id = (long)thread_id;  
    int start = id * BLOCK_SIZE;  
    int end = start + BLOCK_SIZE;  
    double block_sum = 0;  
    for (int i = start; i < end; i++) {  
        double term = pow(-1, i) / (2 * i + 1);  
        block_sum += term;  
    }  
    pthread_mutex_lock(&mutex);  
    sum += block_sum;    
    pthread_mutex_unlock(&mutex);  
    pthread_cond_signal(&cond);  
}  

int main()  
{  
    num_threads = 8;  
    pthread_t threads[num_threads];  
    for (long i = 0; i < num_threads; i++)
        pthread_create(&threads[i], NULL, calculate_block, (void *)i);  
    for(int i = 0; i < num_threads; i ++)
        pthread_join(threads[i], NULL);
    printf("%lf", sum * 4);
}

2 Thread Pool Design

2.1 Thread Pool Implementation

Use a task queue as a buffer between producers and consumers, where each element in the task queue contains the function to execute and the function parameters, corresponding code as follows:

1
2
3
4
typedef struct task_t {
    void (*func)(void *);
    void *arg;
} task_t;

The thread pool contains a task queue, several threads, mutexes, semaphores, and other key attributes, defined as follows:

1
2
3
4
5
6
7
8
9
typedef struct thread_pool_t {
    pthread_t threads[MAX_THREADS];
    int num_threads;
    task_t queue[MAX_QUEUE];
    int front, rear, size;
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    bool shutdown;
} thread_pool_t;

On the producer side, the thread_pool_enqueue function is used to add tasks to the task queue. When a producer produces a task, it first uses the thread pool’s mutex pool->mutex to protect the task queue, preventing multiple threads from modifying the task queue simultaneously, then uses the condition variable pool->cond to notify consumers of new tasks arriving.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
bool thread_pool_enqueue(thread_pool_t *pool, void (*func)(void *), void *arg){
    pthread_mutex_lock(&pool->mutex);
    if (pool->size == MAX_QUEUE) {
        pthread_mutex_unlock(&pool->mutex);
        return false;
    }
    task_t task = { .func = func, .arg = arg };
    pool->queue[pool->rear] = task;
    pool->rear = (pool->rear + 1) % MAX_QUEUE;
    pool->size++;
    pthread_cond_signal(&pool->cond);
    pthread_mutex_unlock(&pool->mutex);
    return true;
}

On the consumer side, the thread_pool_worker function is used to take tasks from the task queue and execute them. When a consumer takes a task from the task queue, it uses the mutex pool->mutex to protect the task queue. If the task queue is empty, the consumer will be blocked by the condition variable pool->mutex, waiting for the producer to add new tasks to the task queue.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
void *thread_pool_worker(void *arg){
    thread_pool_t *pool = (thread_pool_t *)arg;
    while (true) {
        pthread_mutex_lock(&pool->mutex);
        while (pool->size == 0 && !pool->shutdown)
            pthread_cond_wait(&pool->cond, &pool->mutex);
        if (pool->size == 0 && pool->shutdown) {
            pthread_mutex_unlock(&pool->mutex);
            pthread_exit(NULL);
        }
        task_t task = pool->queue[pool->front];
        pool->front = (pool->front + 1) % MAX_QUEUE;
        pool->size--;
        pthread_mutex_unlock(&pool->mutex);
        task.func(task.arg);
    }
}

Starting the thread pool includes initializing the task queue, semaphores, starting consumer threads; shutting down the thread pool mainly involves waiting for all threads to finish running; both implementations are as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
void thread_pool_init(thread_pool_t *pool, int num_threads) {
    pool->num_threads = num_threads;
    pool->front = pool->rear = pool->size = 0;
    pool->shutdown = false;
    pthread_mutex_init(&pool->mutex, NULL);
    pthread_cond_init(&pool->cond, NULL);
    for (int i = 0; i < num_threads; i++)
        pthread_create(&pool->threads[i], NULL, thread_pool_worker, pool);
}

void thread_pool_shutdown(thread_pool_t *pool) {
    pthread_mutex_lock(&pool->mutex);
    pool->shutdown = true;
    pthread_cond_broadcast(&pool->cond);
    pthread_mutex_unlock(&pool->mutex);
    for (int i = 0; i < pool->num_threads; i++)
        pthread_join(pool->threads[i], NULL);
    pthread_mutex_destroy(&pool->mutex);
    pthread_cond_destroy(&pool->cond);
}

Design a simple task to output task id and thread id, and run it in the thread pool:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
void my_task(void *arg){
    int *num = (int *)arg;
    printf("Task %d by thread %lu\n", *num, pthread_self());
    free(num);
}

int main() {
    thread_pool_t pool;
    thread_pool_init(&pool, 8);
    for (int i = 0; i < 20; i++) {
        int *num = malloc(sizeof(int));
        *num = i;
        thread_pool_enqueue(&pool, my_task, num);
    }
    printf("main thread %lu\n", pthread_self());
    thread_pool_shutdown(&pool);
    return 0;
}

2.2 Running Results

Compile in ubuntu 20 with gcc tp.c -o tp -lpthread, the running results are described as follows:

1
2
3
4
Task 0 by thread 140422668744478
Task 4 by thread 140422668744456
Task 5 by thread 140422668744434
...

2.3 Thread Pool

Use the thread pool to complete matrix addition $a+b=c$, the corresponding task function is as follows:

1
2
3
4
5
6
7
8
void add_matrix(void *arg) {
    int *num = (int *)arg;
    int st = *num, ed = st + 5;
    for (int i = st; i < ed; i++)
        c[i] = a[i] + b[i];
    printf("Task [add_matrix] %d by thread %lu, range = %d ~ %d \n", *num /5, pthread_self(), st, ed);
    free(num);
}
Buy me a coffee~
Tim AlipayAlipay
Tim PayPalPayPal
Tim WeChat PayWeChat Pay
0%