Pthread for Multithreading Programming

This article will use two examples to practice multithreading programming with pthread, mainly covering two parts:

  1. Parallel computation of the PI value by dividing data
  2. Thread pool development based on the producer-consumer pattern, with the specific business logic simplified to focus on thread management and synchronization

Calculating Pi

Conceptual Overview

Based on the Leibniz formula, calculate a large number of times through multithreading to approximate $\pi$. Use multithreading for dividing data, i.e., each thread handles a part of the data for acceleration. Due to potential conflicts when multiple threads access a global result, mutexes and semaphores are used to organize threads to orderly add local results to the global outcome.

Code Implementation

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <pthread.h>  
#include <stdio.h>  
#include <stdlib.h>  
#include <math.h>  

const int BLOCK_SIZE = 100000;  
double sum = 0;  
int num_threads;  
// Define mutex and condition variables  
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;  
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;  
void *calculate_block(void *thread_id)  
{  
    long id = (long)thread_id;  
    int start = id * BLOCK_SIZE;  
    int end = start + BLOCK_SIZE;  
    double block_sum = 0;  
    for (int i = start; i < end; i++) {  
        double term = pow(-1, i) / (2 * i + 1);  
        block_sum += term;  
    }  
    pthread_mutex_lock(&mutex);  
    sum += block_sum;    
    pthread_mutex_unlock(&mutex);  
    pthread_cond_signal(&cond);  
}  

int main()  
{  
    num_threads = 8;  
    pthread_t threads[num_threads];  
    for (long i = 0; i < num_threads; i++)
        pthread_create(&threads[i], NULL, calculate_block, (void *)i);  
    for(int i = 0; i < num_threads; i ++)
        pthread_join(threads[i], NULL);
    printf("%lf", sum * 4);
}

Thread Pool Design

Thread Pool Implementation

A task queue is used as a buffer area between the producer and the consumer. Each element in the task queue contains the function to be executed and its parameters, corresponding code is as follows:

1
2
3
4
typedef struct task_t {
    void (*func)(void *);
    void *arg;
} task_t;

The thread pool contains a task queue, several threads, mutexes and semaphores, among other key attributes, defined as follows:

1
2
3
4
5
6
7
8
9
typedef struct thread_pool_t {
    pthread_t threads[MAX_THREADS];
    int num_threads;
    task_t queue[MAX_QUEUE];
    int front, rear, size;
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    bool shutdown;
} thread_pool_t;

On the producer side, the thread_pool_enqueue function is used to add tasks to the task queue. When a producer generates a task, it first protects the task queue with the thread pool’s mutex pool->mutex to prevent multiple threads from modifying the queue simultaneously, then uses the condition variable pool->cond to notify consumers of new tasks.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
bool thread_pool_enqueue(thread_pool_t *pool, void (*func)(void *), void *arg){
    pthread_mutex_lock(&pool->mutex);
    if (pool->size == MAX_QUEUE) {
        pthread_mutex_unlock(&pool->mutex);
        return false;
    }
    task_t task = { .func = func, .arg = arg };
    pool->queue[pool->rear] = task;
    pool->rear = (pool->rear + 1) % MAX_QUEUE;
    pool->size++;
    pthread_cond_signal(&pool->cond);
    pthread_mutex_unlock(&pool->mutex);
    return true;
}

On the consumer side, the thread_pool_worker function is used to retrieve and execute tasks from the task queue. When a consumer takes a task from the queue, it uses the mutex pool->mutex to protect the task queue. If the task queue is empty, the consumer will be blocked by the condition variable pool->mutex, waiting for the producer to add new tasks to the queue.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
void *thread_pool_worker(void *arg){
    thread_pool_t *pool = (thread_pool_t *)arg;
    while (true) {
        pthread_mutex_lock(&pool->mutex);
        while (pool->size == 0 && !pool->shutdown)
            pthread_cond_wait(&pool->cond, &pool->mutex);
        if (pool->size == 0 && pool->shutdown) {
            pthread_mutex_unlock(&pool->mutex);
            pthread_exit(NULL);
        }
        task_t task = pool->queue[pool->front];
        pool->front = (pool->front + 1) % MAX_QUEUE;
        pool->size--;
        pthread_mutex_unlock(&pool->mutex);
        task.func(task.arg);
    }
}

Starting a thread pool includes initializing the task queue, semaphore, starting the consumer thread;

Closing a thread pool, the most important thing is to wait for all threads to finish running;

Both are implemented as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
void thread_pool_init(thread_pool_t *pool, int num_threads) {
    pool->num_threads = num_threads;
    pool->front = pool->rear = pool->size = 0;
    pool->shutdown = false;
    pthread_mutex_init(&pool->mutex, NULL);
    pthread_cond_init(&pool->cond, NULL);
    for (int i = 0; i < num_threads; i++)
        pthread_create(&pool->threads[i], NULL, thread_pool_worker, pool);
}

void thread_pool_shutdown(thread_pool_t *pool) {
    pthread_mutex_lock(&pool->mutex);
    pool->shutdown = true;
    pthread_cond_broadcast(&pool->cond);
    pthread_mutex_unlock(&pool->mutex);
    for (int i = 0; i < pool->num_threads; i++)
        pthread_join(pool->threads[i], NULL);
    pthread_mutex_destroy(&pool->mutex);
    pthread_cond_destroy(&pool->cond);
}

Design a simple task, output the task id and thread id, and run it in the thread pool:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
void my_task(void *arg){
    int *num = (int *)arg;
    printf("Task %d by thread %lu\n", *num, pthread_self());
    free(num);
}

int main() {
    thread_pool_t pool;
    thread_pool_init(&pool, 8);
    for (int i = 0; i < 20; i++) {
        int *num = malloc(sizeof(int));
        *num = i;
        thread_pool_enqueue(&pool, my_task, num);
    }
    printf("main thread %lu\n", pthread_self());
    thread_pool_shutdown(&pool);
    return 0;
}

Run Results

Compiled in Ubuntu 20 using gcc tp.c -o tp -lpthread, the run results are described as follows:

1
2
3
4
Task 0 by thread 140422668744478
Task 4 by thread 140422668744456
Task 5 by thread 140422668744434
...

Thread Pool

Use the thread pool to complete matrix addition $a+b=c$, the corresponding task function is as follows:

1
2
3
4
5
6
7
8
void add_matrix(void *arg) {
    int *num = (int *)arg;
    int st = *num, ed = st + 5;
    for (int i = st; i < ed; i++)
        c[i] = a[i] + b[i];
    printf("Task [add_matrix] %d by thread %lu, range = %d ~ %d \n", *num /5, pthread_self(), st, ed);
    free(num);
}
Buy me a coffee~
Tim AlipayAlipay
Tim PayPalPayPal
Tim WeChat PayWeChat Pay
0%