Windows の C で Guarded Suspension パターンを実現する (条件変数を実装する)

マルチスレッドプログラミングをしているときに、「ある条件が満たされているかどうか調べ、満たされていなければ条件が満たされるまで待機する」 という処理が必要になることがあります。 このような処理はパターン化されており、『増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編』 では Guarded Suspension パターンと呼ばれています。

POSIX のマルチスレッドが使える環境では、条件変数 (condition variable) pthread_cond を使って Guarded Suspension パターンを実現することができます。 しかし、Windows のスレッド機能には条件変数に相当するものはありません。

本記事では、Windows が提供している CriticalSection と Event を使って条件変数を実装し、Windows の C で Guarded Suspension パターンを実現する方法を説明します。

基本的な考え方

条件変数を使って Guarded Suspension パターンを実現する際の基本的な流れを説明します。

  • ミューテックスのロックを取得する
  • 条件が満たされているかどうか調べる (*)
    • 条件が満たされていれば次の処理に移る
    • 条件が満たされていなければ 条件変数の上で待つ → 他のスレッドから信号が来たらスレッド再開し (*) の処理に戻る
  • 何らかの処理をする
  • ミューテックスのロックを解放する

条件というのは、何らかの処理をするために満たされている必要がある条件です。 例えば、「変数 count が 0 以上である」 というような条件です。 当然ながら、条件に含まれる変数 (この例では count) は複数のスレッドから読み書きされるものなので、ミューテックスによる排他制御をしなければいけません。 上の説明で出てくるミューテックスは、条件に含まれる変数を保護するためのものです。

なお、条件変数の上で待つ際に、ミューテックスのロックを解放しなければ他のスレッドが条件に含まれる変数の値を変更できなくなってしまいデッドロックしてしまいます。 そうならないように、条件変数の上で待つときに自動的にミューテックスのロックを解放し、再開するときにミューテックスのロックを取得するようになっています。

条件変数の実装

さて、それでは Windows で条件変数を実装する手法を説明します。 条件変数を実装する上で重要なのは次の処理です。

  • 他のスレッドからシグナルが送られてくるのを待つ処理
  • 他のスレッドからシグナルが送られてくるのを待つときに、ミューテックスのロックを解放し、スレッドを再開するときにロックを取得する処理
  • 条件に含まれる変数を書き換えたときにシグナルを送信する処理

Windows ではシグナルのやりとりは Event を使って行います。 ミューテックスは CriticalSection を使用します *1。 また、pthread_cond では、条件変数の上で待機しているスレッドの中から 1 つだけ再開させるのか、待機しているスレッドを全て再開させるのか選ぶことができます。 今回はこの処理も同じように実装します。

まず条件変数を表す構造体ですが、シグナルのやり取りのための Event だけを扱えればいいので次のようになります。 Event が 2 つありますが、これは待機しているスレッドの中から 1 つだけ再開させるためのシグナルと、全てのスレッドを再開させるシグナルに対応しています。

typedef struct {
    HANDLE hEventsWakeup[2];
} COND_VARIABLE;

構造体 COND_VARIABLE の初期化処理と終了処理のための関数は次のようになります。

// 初期化処理を行う関数
void CondVariable_init( COND_VARIABLE *self ) {
    // 1 つのスレッドを起こすためのイベント
    self->hEventsWakeup[0] = CreateEvent( NULL, FALSE, FALSE, NULL );
    // 全てのスレッドを起こすためのイベント
    self->hEventsWakeup[1] = CreateEvent( NULL, TRUE,  FALSE, NULL );
}
// 終了処理を行う関数
void CondVariable_final( COND_VARIABLE *self ) {
    CloseHandle( self->hEventsWakeup[0] );
    CloseHandle( self->hEventsWakeup[1] );
}

また、条件が満たされていない場合に待機するための関数は次のようになります。 第 2 引数はロック済みのミューテックスをあらわします。 第 3 引数は最大の待ち時間です。 他のスレッドが起こすまでいつまでも待ち続けるならば INFINITE を渡します。

// 条件変数の上で待つための関数
void CondVariable_wait( COND_VARIABLE *self, CRITICAL_SECTION *cs_ptr, DWORD dwMilliseconds ) {
    ResetEvent( self->hEventsWakeup[0] );
    ResetEvent( self->hEventsWakeup[1] );
    LeaveCriticalSection( cs_ptr );
    WaitForMultipleObjects( 2, self->hEventsWakeup, FALSE, dwMilliseconds );
    EnterCriticalSection( cs_ptr );
}

条件に含まれる変数の値を変更した際に待機しているスレッドに通知を行うための関数は次のようになります。

// 待機しているスレッドのうち 1 つのスレッドを起こす関数
void CondVariable_notify( COND_VARIABLE *self ) {
    SetEvent( self->hEventsWakeup[0] );
}
// 待機しているスレッド全てを起こす関数
void CondVariable_notifyAll( COND_VARIABLE *self ) {
    SetEvent( self->hEventsWakeup[1] );
}

使用例

「変数 shCount が 0 より大きい」 というのを条件にして、条件が満たされていれば shCount をデクリメントして "process..." という文字列を出力するスレッドを 5 個動かすプログラムです。 メインスレッドでは約 1 秒ごとに shCount をインクリメントするので、約 1 秒ごとに "process..." という文字列が出力されます。

#include <stdio.h>
#include <process.h>
#include <windows.h>

// VARIABLE_COND の構造体と関数
typedef struct {
    HANDLE hEventsWakeup[2];
} COND_VARIABLE;
void CondVariable_init( COND_VARIABLE *self ) {
    // notify のためのイベント
    self->hEventsWakeup[0] = CreateEvent( NULL, FALSE, FALSE, NULL );
    // notifyAll のためのイベント
    self->hEventsWakeup[1] = CreateEvent( NULL, TRUE,  FALSE, NULL );
}
void CondVariable_final( COND_VARIABLE *self ) {
    CloseHandle( self->hEventsWakeup[0] );
    CloseHandle( self->hEventsWakeup[1] );
}
void CondVariable_wait( COND_VARIABLE *self, CRITICAL_SECTION *cs_ptr, DWORD dwMilliseconds ) {
    ResetEvent( self->hEventsWakeup[0] );
    ResetEvent( self->hEventsWakeup[1] );
    LeaveCriticalSection( cs_ptr );
    WaitForMultipleObjects( 2, self->hEventsWakeup, FALSE, dwMilliseconds );
    EnterCriticalSection( cs_ptr );
}
void CondVariable_notify( COND_VARIABLE *self ) {
    SetEvent( self->hEventsWakeup[0] );
}
void CondVariable_notifyAll( COND_VARIABLE *self ) {
    SetEvent( self->hEventsWakeup[1] );
}

// スレッド間で共有されるデータ
int shCount = 0;
// スレッド間で共有されるデータを保護するための排他処理, 同期機構
CRITICAL_SECTION shCS;
COND_VARIABLE    shCV;

// 子スレッドで行う処理
static unsigned __stdcall run( void *arg ) {
    // ミューテックスのロックを取得
    EnterCriticalSection( &shCS );
    // 条件が満たされているかどうかチェック
    while( shCount <= 0 ) {
        printf( "wait...\n" );
        fflush( stdout );
        // 条件が満たされていないので待機する
        CondVariable_wait( &shCV, &shCS, INFINITE );
        printf( "wakeup!\n" );
    }
    shCount--;
    printf( "decrease shCount: %d\n", shCount );
    printf( "process...\n" );
    fflush( stdout );
    // ミューテックスのロックを解放
    LeaveCriticalSection( &shCS );
    _endthreadex(0);
    return 0; // コンパイラの警告を殺す
}

int main( int argc, char **argv ) {
    int numSign = 5;
    HANDLE hThreads[numSign];
    InitializeCriticalSection( &shCS );
    CondVariable_init( &shCV );
    for( int i = 0; i < numSign; i++ ) {
        hThreads[i] = (HANDLE)_beginthreadex( NULL, 0, run, NULL, 0, NULL );
    }
    // スレッドの数だけカウンタをインクリメントする
    for( int i = 0; i < numSign; i++ ) {
        Sleep( 1000 );
        // 共有変数をいじるのでミューテックスのロックを取得
        EnterCriticalSection( &shCS );
        shCount++;
        printf( "increase shCount: %d\n", shCount );
        fflush( stdout );
        // 条件に含まれる変数を変更したので待機しているスレッドを起こす
        CondVariable_notify( &shCV );
        // ミューテックスのロックを解放
        LeaveCriticalSection( &shCS );
    }
    // 全てのスレッドが終了するまで待つ
    WaitForMultipleObjects( numSign, hThreads, TRUE, INFINITE );
    DeleteCriticalSection( &shCS );
    CondVariable_final( &shCV );
    return 0;
}

エラー処理など

この記事のコードは勉強用に書いたものなので、エラー処理等はちゃんとしていません。 実際に Windows で条件変数を使う場合はもうちょっとエラー処理等をちゃんとしないとだめだと思います。

*1:Windows には CriticalSection のほかに Mutex がありますが、プロセス間の排他処理をしないのであれば CriticalSection を使ったほうが高速なので良いようです。