本文では、2つの例を用いてpthreadを使用したマルチスレッドプログラミングを実践し、主に2つの部分を含みます:
- データ分割方式による円周率の並列計算
- 生産者-消費者モデルに基づくスレッドプール開発、具体的なビジネス処理ロジックは簡略化され、スレッド管理と同期に重点を置く
1 円周率の計算
1.1 思考の概要
ライプニッツの公式に基づき、多くの回数をマルチスレッドで計算し、$\pi$ に近づけます。マルチスレッド方式でデータを分割し、各スレッドが部分データを処理することで加速します。同時に、マルチスレッドがグローバルな結果にアクセスする際に競合が発生する可能性があるため、ミューテックスとセマフォを使用してスレッドがローカル結果をグローバル結果に秩序正しく加算するようにします。
1.2 コード実装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
const int BLOCK_SIZE = 100000;
double sum = 0;
int num_threads;
// ミューテックスと条件変数の定義
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
void *calculate_block(void *thread_id)
{
long id = (long)thread_id;
int start = id * BLOCK_SIZE;
int end = start + BLOCK_SIZE;
double block_sum = 0;
for (int i = start; i < end; i++) {
double term = pow(-1, i) / (2 * i + 1);
block_sum += term;
}
pthread_mutex_lock(&mutex);
sum += block_sum;
pthread_mutex_unlock(&mutex);
pthread_cond_signal(&cond);
}
int main()
{
num_threads = 8;
pthread_t threads[num_threads];
for (long i = 0; i < num_threads; i++)
pthread_create(&threads[i], NULL, calculate_block, (void *)i);
for(int i = 0; i < num_threads; i ++)
pthread_join(threads[i], NULL);
printf("%lf", sum * 4);
}
|
2 スレッドプール設計
2.1 スレッドプールの実装
タスクキューを生産者と消費者の間のバッファとして使用し、タスクキューの各要素には実行する関数とそのパラメータが含まれています。対応するコードは以下の通りです:
1
2
3
4
|
typedef struct task_t {
void (*func)(void *);
void *arg;
} task_t;
|
スレッドプールにはタスクキュー、いくつかのスレッド、ミューテックスとセマフォ、およびその他の重要な属性が含まれています。定義は以下の通りです:
1
2
3
4
5
6
7
8
9
|
typedef struct thread_pool_t {
pthread_t threads[MAX_THREADS];
int num_threads;
task_t queue[MAX_QUEUE];
int front, rear, size;
pthread_mutex_t mutex;
pthread_cond_t cond;
bool shutdown;
} thread_pool_t;
|
生産者側では、thread_pool_enqueue
関数を使用してタスクをタスクキューに追加します。生産者がタスクを生成した後、まずスレッドプールのミューテックスpool->mutex
を使用してタスクキューを保護し、複数のスレッドが同時にタスクキューを変更するのを防ぎ、次に条件変数pool->cond
を使用して消費者に新しいタスクが到着したことを通知します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
bool thread_pool_enqueue(thread_pool_t *pool, void (*func)(void *), void *arg){
pthread_mutex_lock(&pool->mutex);
if (pool->size == MAX_QUEUE) {
pthread_mutex_unlock(&pool->mutex);
return false;
}
task_t task = { .func = func, .arg = arg };
pool->queue[pool->rear] = task;
pool->rear = (pool->rear + 1) % MAX_QUEUE;
pool->size++;
pthread_cond_signal(&pool->cond);
pthread_mutex_unlock(&pool->mutex);
return true;
}
|
消費者側では、thread_pool_worker
関数を使用してタスクキューからタスクを取り出して実行します。消費者がタスクキューからタスクを取り出すとき、ミューテックスpool->mutex
を使用してタスクキューを保護します。タスクキューが空の場合、消費者は条件変数pool->mutex
によってブロックされ、生産者がタスクキューに新しいタスクを追加するのを待ちます。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
void *thread_pool_worker(void *arg){
thread_pool_t *pool = (thread_pool_t *)arg;
while (true) {
pthread_mutex_lock(&pool->mutex);
while (pool->size == 0 && !pool->shutdown)
pthread_cond_wait(&pool->cond, &pool->mutex);
if (pool->size == 0 && pool->shutdown) {
pthread_mutex_unlock(&pool->mutex);
pthread_exit(NULL);
}
task_t task = pool->queue[pool->front];
pool->front = (pool->front + 1) % MAX_QUEUE;
pool->size--;
pthread_mutex_unlock(&pool->mutex);
task.func(task.arg);
}
}
|
スレッドプールの起動には、タスクキューの初期化、セマフォ、消費者スレッドの起動が含まれます。スレッドプールの閉鎖では、すべてのスレッドの実行終了を待つことが最も重要です。両者の実装は以下の通りです:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
void thread_pool_init(thread_pool_t *pool, int num_threads) {
pool->num_threads = num_threads;
pool->front = pool->rear = pool->size = 0;
pool->shutdown = false;
pthread_mutex_init(&pool->mutex, NULL);
pthread_cond_init(&pool->cond, NULL);
for (int i = 0; i < num_threads; i++)
pthread_create(&pool->threads[i], NULL, thread_pool_worker, pool);
}
void thread_pool_shutdown(thread_pool_t *pool) {
pthread_mutex_lock(&pool->mutex);
pool->shutdown = true;
pthread_cond_broadcast(&pool->cond);
pthread_mutex_unlock(&pool->mutex);
for (int i = 0; i < pool->num_threads; i++)
pthread_join(pool->threads[i], NULL);
pthread_mutex_destroy(&pool->mutex);
pthread_cond_destroy(&pool->cond);
}
|
簡単なタスクを設計し、タスクIDとスレッドIDを出力してスレッドプールで実行します:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
void my_task(void *arg){
int *num = (int *)arg;
printf("Task %d by thread %lu\n", *num, pthread_self());
free(num);
}
int main() {
thread_pool_t pool;
thread_pool_init(&pool, 8);
for (int i = 0; i < 20; i++) {
int *num = malloc(sizeof(int));
*num = i;
thread_pool_enqueue(&pool, my_task, num);
}
printf("main thread %lu\n", pthread_self());
thread_pool_shutdown(&pool);
return 0;
}
|
2.2 実行結果
Ubuntu 20でgcc tp.c -o tp -lpthread
をコンパイルし、実行結果は次のように記述されます:
1
2
3
4
|
Task 0 by thread 140422668744478
Task 4 by thread 140422668744456
Task 5 by thread 140422668744434
...
|
2.3 スレッドプール
スレッドプールを使用して行列の加算$a+b=c$を完了し、対応するタスク関数は以下の通りです:
1
2
3
4
5
6
7
8
|
void add_matrix(void *arg) {
int *num = (int *)arg;
int st = *num, ed = st + 5;
for (int i = st; i < ed; i++)
c[i] = a[i] + b[i];
printf("Task [add_matrix] %d by thread %lu, range = %d ~ %d \n", *num /5, pthread_self(), st, ed);
free(num);
}
|