Pthreadで実現するマルチスレッドプログラミング

本文では、2つの例を用いてpthreadを使用したマルチスレッドプログラミングを実践し、主に2つの部分を含みます:

  1. データ分割方式による円周率の並列計算
  2. 生産者-消費者モデルに基づくスレッドプール開発、具体的なビジネス処理ロジックは簡略化され、スレッド管理と同期に重点を置く

1 円周率の計算

1.1 思考の概要

ライプニッツの公式に基づき、多くの回数をマルチスレッドで計算し、$\pi$ に近づけます。マルチスレッド方式でデータを分割し、各スレッドが部分データを処理することで加速します。同時に、マルチスレッドがグローバルな結果にアクセスする際に競合が発生する可能性があるため、ミューテックスとセマフォを使用してスレッドがローカル結果をグローバル結果に秩序正しく加算するようにします。

1.2 コード実装

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <pthread.h>  
#include <stdio.h>  
#include <stdlib.h>  
#include <math.h>  

const int BLOCK_SIZE = 100000;  
double sum = 0;  
int num_threads;  
// ミューテックスと条件変数の定義  
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;  
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;  
void *calculate_block(void *thread_id)  
{  
    long id = (long)thread_id;  
    int start = id * BLOCK_SIZE;  
    int end = start + BLOCK_SIZE;  
    double block_sum = 0;  
    for (int i = start; i < end; i++) {  
        double term = pow(-1, i) / (2 * i + 1);  
        block_sum += term;  
    }  
    pthread_mutex_lock(&mutex);  
    sum += block_sum;    
    pthread_mutex_unlock(&mutex);  
    pthread_cond_signal(&cond);  
}  

int main()  
{  
    num_threads = 8;  
    pthread_t threads[num_threads];  
    for (long i = 0; i < num_threads; i++)
        pthread_create(&threads[i], NULL, calculate_block, (void *)i);  
    for(int i = 0; i < num_threads; i ++)
        pthread_join(threads[i], NULL);
    printf("%lf", sum * 4);
}

2 スレッドプール設計

2.1 スレッドプールの実装

タスクキューを生産者と消費者の間のバッファとして使用し、タスクキューの各要素には実行する関数とそのパラメータが含まれています。対応するコードは以下の通りです:

1
2
3
4
typedef struct task_t {
    void (*func)(void *);
    void *arg;
} task_t;

スレッドプールにはタスクキュー、いくつかのスレッド、ミューテックスとセマフォ、およびその他の重要な属性が含まれています。定義は以下の通りです:

1
2
3
4
5
6
7
8
9
typedef struct thread_pool_t {
    pthread_t threads[MAX_THREADS];
    int num_threads;
    task_t queue[MAX_QUEUE];
    int front, rear, size;
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    bool shutdown;
} thread_pool_t;

生産者側では、thread_pool_enqueue関数を使用してタスクをタスクキューに追加します。生産者がタスクを生成した後、まずスレッドプールのミューテックスpool->mutexを使用してタスクキューを保護し、複数のスレッドが同時にタスクキューを変更するのを防ぎ、次に条件変数pool->condを使用して消費者に新しいタスクが到着したことを通知します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
bool thread_pool_enqueue(thread_pool_t *pool, void (*func)(void *), void *arg){
    pthread_mutex_lock(&pool->mutex);
    if (pool->size == MAX_QUEUE) {
        pthread_mutex_unlock(&pool->mutex);
        return false;
    }
    task_t task = { .func = func, .arg = arg };
    pool->queue[pool->rear] = task;
    pool->rear = (pool->rear + 1) % MAX_QUEUE;
    pool->size++;
    pthread_cond_signal(&pool->cond);
    pthread_mutex_unlock(&pool->mutex);
    return true;
}

消費者側では、thread_pool_worker関数を使用してタスクキューからタスクを取り出して実行します。消費者がタスクキューからタスクを取り出すとき、ミューテックスpool->mutexを使用してタスクキューを保護します。タスクキューが空の場合、消費者は条件変数pool->mutexによってブロックされ、生産者がタスクキューに新しいタスクを追加するのを待ちます。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
void *thread_pool_worker(void *arg){
    thread_pool_t *pool = (thread_pool_t *)arg;
    while (true) {
        pthread_mutex_lock(&pool->mutex);
        while (pool->size == 0 && !pool->shutdown)
            pthread_cond_wait(&pool->cond, &pool->mutex);
        if (pool->size == 0 && pool->shutdown) {
            pthread_mutex_unlock(&pool->mutex);
            pthread_exit(NULL);
        }
        task_t task = pool->queue[pool->front];
        pool->front = (pool->front + 1) % MAX_QUEUE;
        pool->size--;
        pthread_mutex_unlock(&pool->mutex);
        task.func(task.arg);
    }
}

スレッドプールの起動には、タスクキューの初期化、セマフォ、消費者スレッドの起動が含まれます。スレッドプールの閉鎖では、すべてのスレッドの実行終了を待つことが最も重要です。両者の実装は以下の通りです:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
void thread_pool_init(thread_pool_t *pool, int num_threads) {
    pool->num_threads = num_threads;
    pool->front = pool->rear = pool->size = 0;
    pool->shutdown = false;
    pthread_mutex_init(&pool->mutex, NULL);
    pthread_cond_init(&pool->cond, NULL);
    for (int i = 0; i < num_threads; i++)
        pthread_create(&pool->threads[i], NULL, thread_pool_worker, pool);
}

void thread_pool_shutdown(thread_pool_t *pool) {
    pthread_mutex_lock(&pool->mutex);
    pool->shutdown = true;
    pthread_cond_broadcast(&pool->cond);
    pthread_mutex_unlock(&pool->mutex);
    for (int i = 0; i < pool->num_threads; i++)
        pthread_join(pool->threads[i], NULL);
    pthread_mutex_destroy(&pool->mutex);
    pthread_cond_destroy(&pool->cond);
}

簡単なタスクを設計し、タスクIDとスレッドIDを出力してスレッドプールで実行します:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
void my_task(void *arg){
    int *num = (int *)arg;
    printf("Task %d by thread %lu\n", *num, pthread_self());
    free(num);
}

int main() {
    thread_pool_t pool;
    thread_pool_init(&pool, 8);
    for (int i = 0; i < 20; i++) {
        int *num = malloc(sizeof(int));
        *num = i;
        thread_pool_enqueue(&pool, my_task, num);
    }
    printf("main thread %lu\n", pthread_self());
    thread_pool_shutdown(&pool);
    return 0;
}

2.2 実行結果

Ubuntu 20でgcc tp.c -o tp -lpthreadをコンパイルし、実行結果は次のように記述されます:

1
2
3
4
Task 0 by thread 140422668744478
Task 4 by thread 140422668744456
Task 5 by thread 140422668744434
...

2.3 スレッドプール

スレッドプールを使用して行列の加算$a+b=c$を完了し、対応するタスク関数は以下の通りです:

1
2
3
4
5
6
7
8
void add_matrix(void *arg) {
    int *num = (int *)arg;
    int st = *num, ed = st + 5;
    for (int i = st; i < ed; i++)
        c[i] = a[i] + b[i];
    printf("Task [add_matrix] %d by thread %lu, range = %d ~ %d \n", *num /5, pthread_self(), st, ed);
    free(num);
}
Buy me a coffee~
Tim 支付宝支付宝
Tim 贝宝贝宝
Tim 微信微信
0%