Programmation Multithread Avec Pthread

Cet article va utiliser deux exemples pour illustrer l’utilisation pratique de pthread pour la programmation multithread, principalement en deux parties :

  1. Calcul parallèle de la valeur de PI par division des données
  2. Développement d’un pool de threads basé sur le modèle producteur-consommateur, où la logique de traitement des affaires sera simplifiée pour se concentrer sur la gestion et la synchronisation des threads

1 Calcul de Pi

1.1 Brève description de l’idée

Selon la formule de Leibniz, en effectuant un plus grand nombre de calculs multithread, on s’approche de $\pi$. La méthode multithread est utilisée pour diviser les données, chaque thread prenant en charge une partie des données pour accélérer le processus. Étant donné que l’accès multithread au résultat global peut entraîner des conflits, des mutex et des sémaphores sont utilisés pour organiser les threads afin qu’ils ajoutent de manière ordonnée les résultats locaux au résultat global.

1.2 Implémentation du code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <pthread.h>  
#include <stdio.h>  
#include <stdlib.h>  
#include <math.h>  

const int BLOCK_SIZE = 100000;  
double sum = 0;  
int num_threads;  
// Définir le mutex et la variable de condition  
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;  
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;  
void *calculate_block(void *thread_id)  
{  
    long id = (long)thread_id;  
    int start = id * BLOCK_SIZE;  
    int end = start + BLOCK_SIZE;  
    double block_sum = 0;  
    for (int i = start; i < end; i++) {  
        double term = pow(-1, i) / (2 * i + 1);  
        block_sum += term;  
    }  
    pthread_mutex_lock(&mutex);  
    sum += block_sum;    
    pthread_mutex_unlock(&mutex);  
    pthread_cond_signal(&cond);  
}  

int main()  
{  
    num_threads = 8;  
    pthread_t threads[num_threads];  
    for (long i = 0; i < num_threads; i++)
        pthread_create(&threads[i], NULL, calculate_block, (void *)i);  
    for(int i = 0; i < num_threads; i ++)
        pthread_join(threads[i], NULL);
    printf("%lf", sum * 4);
}

2 Conception du pool de threads

2.1 Implémentation du pool de threads

Utiliser une file d’attente de tâches comme tampon entre le producteur et le consommateur, chaque élément de la file d’attente de tâches contenant la fonction à exécuter et les paramètres de la fonction, le code correspondant est le suivant :

1
2
3
4
typedef struct task_t {
    void (*func)(void *);
    void *arg;
} task_t;

Le pool de threads contient une file d’attente de tâches, plusieurs threads, des mutex et des sémaphores ainsi que d’autres attributs clés, définis comme suit :

1
2
3
4
5
6
7
8
9
typedef struct thread_pool_t {
    pthread_t threads[MAX_THREADS];
    int num_threads;
    task_t queue[MAX_QUEUE];
    int front, rear, size;
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    bool shutdown;
} thread_pool_t;

Du côté du producteur, la fonction thread_pool_enqueue est utilisée pour ajouter des tâches à la file d’attente de tâches. Lorsqu’un producteur produit une tâche, il utilise d’abord le mutex pool->mutex du pool de threads pour protéger la file d’attente de tâches, empêchant plusieurs threads de modifier simultanément la file d’attente de tâches, puis utilise la variable de condition pool->cond pour notifier le consommateur de l’arrivée d’une nouvelle tâche.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
bool thread_pool_enqueue(thread_pool_t *pool, void (*func)(void *), void *arg){
    pthread_mutex_lock(&pool->mutex);
    if (pool->size == MAX_QUEUE) {
        pthread_mutex_unlock(&pool->mutex);
        return false;
    }
    task_t task = { .func = func, .arg = arg };
    pool->queue[pool->rear] = task;
    pool->rear = (pool->rear + 1) % MAX_QUEUE;
    pool->size++;
    pthread_cond_signal(&pool->cond);
    pthread_mutex_unlock(&pool->mutex);
    return true;
}

Du côté du consommateur, la fonction thread_pool_worker est utilisée pour extraire les tâches de la file d’attente de tâches et les exécuter. Lorsque le consommateur extrait une tâche de la file d’attente de tâches, il utilise le mutex pool->mutex pour protéger la file d’attente de tâches. Si la file d’attente de tâches est vide, le consommateur sera bloqué par la variable de condition pool->mutex, en attendant que le producteur ajoute de nouvelles tâches à la file d’attente de tâches.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
void *thread_pool_worker(void *arg){
    thread_pool_t *pool = (thread_pool_t *)arg;
    while (true) {
        pthread_mutex_lock(&pool->mutex);
        while (pool->size == 0 && !pool->shutdown)
            pthread_cond_wait(&pool->cond, &pool->mutex);
        if (pool->size == 0 && pool->shutdown) {
            pthread_mutex_unlock(&pool->mutex);
            pthread_exit(NULL);
        }
        task_t task = pool->queue[pool->front];
        pool->front = (pool->front + 1) % MAX_QUEUE;
        pool->size--;
        pthread_mutex_unlock(&pool->mutex);
        task.func(task.arg);
    }
}

Le démarrage du pool de threads comprend l’initialisation de la file d’attente de tâches, des sémaphores, le démarrage des threads consommateurs ; la fermeture du pool de threads consiste principalement à attendre la fin de l’exécution de tous les threads ; les deux sont implémentés comme suit :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
void thread_pool_init(thread_pool_t *pool, int num_threads) {
    pool->num_threads = num_threads;
    pool->front = pool->rear = pool->size = 0;
    pool->shutdown = false;
    pthread_mutex_init(&pool->mutex, NULL);
    pthread_cond_init(&pool->cond, NULL);
    for (int i = 0; i < num_threads; i++)
        pthread_create(&pool->threads[i], NULL, thread_pool_worker, pool);
}

void thread_pool_shutdown(thread_pool_t *pool) {
    pthread_mutex_lock(&pool->mutex);
    pool->shutdown = true;
    pthread_cond_broadcast(&pool->cond);
    pthread_mutex_unlock(&pool->mutex);
    for (int i = 0; i < pool->num_threads; i++)
        pthread_join(pool->threads[i], NULL);
    pthread_mutex_destroy(&pool->mutex);
    pthread_cond_destroy(&pool->cond);
}

Concevoir une tâche simple, afficher l’identifiant de la tâche et l’identifiant du thread, et l’exécuter dans le pool de threads :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
void my_task(void *arg){
    int *num = (int *)arg;
    printf("Task %d by thread %lu\n", *num, pthread_self());
    free(num);
}

int main() {
    thread_pool_t pool;
    thread_pool_init(&pool, 8);
    for (int i = 0; i < 20; i++) {
        int *num = malloc(sizeof(int));
        *num = i;
        thread_pool_enqueue(&pool, my_task, num);
    }
    printf("main thread %lu\n", pthread_self());
    thread_pool_shutdown(&pool);
    return 0;
}

2.2 Résultat de l’exécution

Compiler sous Ubuntu 20 avec gcc tp.c -o tp -lpthread, le résultat de l’exécution est décrit comme suit :

1
2
3
4
Task 0 by thread 140422668744478
Task 4 by thread 140422668744456
Task 5 by thread 140422668744434
...

2.3 Pool de threads

Utiliser le pool de threads pour effectuer l’addition de matrices $a+b=c$, la fonction de tâche correspondante est la suivante :

1
2
3
4
5
6
7
8
void add_matrix(void *arg) {
    int *num = (int *)arg;
    int st = *num, ed = st + 5;
    for (int i = st; i < ed; i++)
        c[i] = a[i] + b[i];
    printf("Task [add_matrix] %d by thread %lu, range = %d ~ %d \n", *num /5, pthread_self(), st, ed);
    free(num);
}
Buy me a coffee~
Tim AlipayAlipay
Tim PayPalPayPal
Tim WeChat PayWeChat Pay
0%