Groovyでマルチスレッド

はじめに

Groovy でマルチスレッド処理をする場合は、Executors を利用すると便利。

Executors の基本的な使い方

  1. Executors.newSingleThreadExecutor メソッドなどで ExecutorService のインスタンスを取得する。
  2. ExecutorService#execute メソッドなどで非同期処理を実行する。
  3. ExecutorService#shutdown メソッドで非同期処理の受付を終了する。
  4. ExecutorService#awaitTermination メソッドで非同期処理の終了を待機する。
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

// ExecutorService を取得
def service = Executors.newSingleThreadExecutor()

// 非同期処理の実行
service.execute {
    Thread.sleep(1000)
}

// 非同期処理の受付を終了
service.shutdown()

// 非同期処理の終了を待機
service.awaitTermination(1, TimeUnit.HOURS)

利用するクラスは以下の3クラス。
1. Executors
2. ExecutorService
3. Future

Executors の使い方

Executors で利用するメソッドは以下。
以下のメソッドはすべて static で、ExecutorService のインスタンスが取得できる。

メソッド説明
newSingleThreadExecutor単一のワーカー・スレッドを使用する。
newCachedThreadPool必要に応じ、新規スレッドを作成するスレッド・プールを作成する。利用可能な場合には以前に構築されたスレッドを再利用する。
newFixedThreadPool固定数のスレッドを再利用するスレッド・プールを作成します。
newWorkStealingPoolすべての使用可能なプロセッサをターゲット並列性レベルとして使用して、work-stealingスレッド・プールを作成します。

テスト用共通メソッド

各メソッドのサンプルを動作させるための共通メソッドは以下。

import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

def log(status) {
    println "[${Thread.currentThread().name}][${new Date().format('HH:mm:ss.SSS')}] ${status}"
}

def execute(method, service) {
    println method
    long l = System.currentTimeMillis()
    (0..<10).each { i ->
        service.execute {
            log '開始'
            Thread.sleep(100)
            log '終了'
        }
        Thread.sleep(20)
    }

    service.shutdown()
    log '待機中'
    service.awaitTermination(1, TimeUnit.MINUTES)
    log '待機完了'
}

newSingleThreadExecutor メソッド

単一のワーカー・スレッドを使用する。

execute 'newSingleThreadExecutor', Executors.newSingleThreadExecutor()

1個のワーカー・スレッドで実行される。

newSingleThreadExecutor
[pool-1-thread-1][09:51:20.741]開始
[pool-1-thread-1][09:51:20.885]終了
[pool-1-thread-1][09:51:20.886]開始
[main][09:51:20.942]待機中
[pool-1-thread-1][09:51:20.988]終了
[pool-1-thread-1][09:51:20.988]開始
[pool-1-thread-1][09:51:21.098]終了
[pool-1-thread-1][09:51:21.099]開始
[pool-1-thread-1][09:51:21.204]終了
[pool-1-thread-1][09:51:21.205]開始
[pool-1-thread-1][09:51:21.317]終了
[pool-1-thread-1][09:51:21.317]開始
[pool-1-thread-1][09:51:21.422]終了
[pool-1-thread-1][09:51:21.424]開始
[pool-1-thread-1][09:51:21.533]終了
[pool-1-thread-1][09:51:21.536]開始
[pool-1-thread-1][09:51:21.650]終了
[pool-1-thread-1][09:51:21.651]開始
[pool-1-thread-1][09:51:21.766]終了
[pool-1-thread-1][09:51:21.768]開始
[pool-1-thread-1][09:51:21.881]終了
[main][09:51:21.882]待機完了

newCachedThreadPool メソッド

必要に応じ、新規スレッドを作成するスレッド・プールを作成する。
利用可能な場合には以前に構築されたスレッドを再利用する。

execute 'newCachedThreadPool', Executors.newCachedThreadPool()

再利用可能なワーカー・スレッドが存在する場合は、ワーカー・スレッドを再利用している。

newCachedThreadPool
[pool-2-thread-1][09:51:21.889]開始
[pool-2-thread-2][09:51:21.908]開始
[pool-2-thread-3][09:51:21.929]開始
[pool-2-thread-4][09:51:21.949]開始
[pool-2-thread-5][09:51:21.970]開始
[pool-2-thread-1][09:51:21.989]終了
[pool-2-thread-6][09:51:21.993]開始
[pool-2-thread-2][09:51:22.010]終了
[pool-2-thread-2][09:51:22.013]開始
[pool-2-thread-3][09:51:22.031]終了
[pool-2-thread-3][09:51:22.034]開始
[pool-2-thread-4][09:51:22.051]終了
[pool-2-thread-4][09:51:22.054]開始
[pool-2-thread-5][09:51:22.072]終了
[pool-2-thread-5][09:51:22.074]開始
[main][09:51:22.094]待機中
[pool-2-thread-6][09:51:22.097]終了
[pool-2-thread-2][09:51:22.114]終了
[pool-2-thread-3][09:51:22.139]終了
[pool-2-thread-4][09:51:22.154]終了
[pool-2-thread-5][09:51:22.175]終了
[main][09:51:22.176]待機完了

newFixedThreadPool

固定数のスレッドを再利用するスレッド・プールを作成する。

以下は最大5個のワーカー・スレッドで実行するサンプル。

execute 'newFixedThreadPool(5)', Executors.newFixedThreadPool(5)

最大5個のワーカー・スレッドで実行されていて、
一度使ったワーカー・スレッドを再利用している。

newFixedThreadPool(5)
[pool-3-thread-1][09:51:22.179]開始
[pool-3-thread-2][09:51:22.199]開始
[pool-3-thread-3][09:51:22.219]開始
[pool-3-thread-4][09:51:22.240]開始
[pool-3-thread-5][09:51:22.259]開始
[pool-3-thread-1][09:51:22.279]終了
[pool-3-thread-1][09:51:22.280]開始
[pool-3-thread-2][09:51:22.300]終了
[pool-3-thread-2][09:51:22.301]開始
[pool-3-thread-3][09:51:22.321]終了
[pool-3-thread-3][09:51:22.322]開始
[pool-3-thread-4][09:51:22.342]終了
[pool-3-thread-4][09:51:22.343]開始
[pool-3-thread-5][09:51:22.360]終了
[pool-3-thread-5][09:51:22.361]開始
[pool-3-thread-1][09:51:22.381]終了
[main][09:51:22.382]待機中
[pool-3-thread-2][09:51:22.402]終了
[pool-3-thread-3][09:51:22.422]終了
[pool-3-thread-4][09:51:22.448]終了
[pool-3-thread-5][09:51:22.462]終了
[main][09:51:22.463]待機完了

newWorkStealingPool

すべての使用可能なプロセッサをターゲット並列性レベルとして使用して、work-stealingスレッド・プールを作成する。

println "プロセッサ数: ${Runtime.runtime.availableProcessors()}"
execute 'newWorkStealingPool', Executors.newWorkStealingPool()

プロセッサ数を最大ワーカー・スレッド数として実行される。

プロセッサ数: 4
newWorkStealingPool
[ForkJoinPool-1-worker-1][10:01:17.844] 開始
[ForkJoinPool-1-worker-2][10:01:17.856] 開始
[ForkJoinPool-1-worker-3][10:01:17.877] 開始
[ForkJoinPool-1-worker-0][10:01:17.897] 開始
[ForkJoinPool-1-worker-1][10:01:17.960] 終了
[ForkJoinPool-1-worker-2][10:01:17.960] 終了
[ForkJoinPool-1-worker-1][10:01:17.961] 開始
[ForkJoinPool-1-worker-2][10:01:17.963] 開始
[ForkJoinPool-1-worker-3][10:01:17.978] 終了
[ForkJoinPool-1-worker-3][10:01:17.978] 開始
[ForkJoinPool-1-worker-0][10:01:17.998] 終了
[ForkJoinPool-1-worker-0][10:01:17.998] 開始
[main][10:01:18.040] 待機中
[ForkJoinPool-1-worker-1][10:01:18.064] 終了
[ForkJoinPool-1-worker-1][10:01:18.065] 開始
[ForkJoinPool-1-worker-2][10:01:18.065] 終了
[ForkJoinPool-1-worker-2][10:01:18.066] 開始
[ForkJoinPool-1-worker-3][10:01:18.079] 終了
[ForkJoinPool-1-worker-0][10:01:18.100] 終了
[ForkJoinPool-1-worker-1][10:01:18.166] 終了
[ForkJoinPool-1-worker-2][10:01:18.167] 終了
[main][10:01:18.168] 待機完了

ExecutorService の使い方

ExecutorService で非同期処理を実行するメソッドは以下。

メソッド説明
execute指定された処理を非同期で実行する。戻り値は取得できない。
submit値を返す処理を非同期で実行し、処理結果を取得可能にする Future を返す。
invokeAll指定された処理を全て非同期で実行し、すべての処理の終了を待機し、処理結果を取得可能な Future のリストを返す。
invokeAny指定された全ての処理を非同期で実行し、例外をスローせずに正常に終了した最初の処理の結果を返す。最初に正常終了した処理が発生した時点時点で、終了していない処理では InterruptedException が発生し、処理が取り消される。

テスト用共通メソッド

各メソッドのサンプルを動作させるための共通メソッドは以下。

def log(status) {
    println "[${Thread.currentThread().name}][${new Date().format('HH:mm:ss.SSS')}] ${status}"
}

execute メソッド

指定された処理を非同期で実行する。

def service = Executors.newSingleThreadExecutor()

service.execute {
    log '開始'
    Thread.sleep(1000)
    log '終了'
}

service.shutdown()
log '待機中'
service.awaitTermination(1, TimeUnit.HOURS)
log '待機終了'

実行結果。

[main][10:34:57.627] 待機中
[pool-1-thread-1][10:34:57.627] 開始
[pool-1-thread-1][10:34:58.668] 終了
[main][10:34:58.669] 待機終了

submit メソッド

値を返す処理を非同期で実行し、処理結果を取得可能にする Future を返す。
groovy で値を返す必要がある場合は、クロージャを明示的に Callable<E> に変換する必要がある。

def service = Executors.newSingleThreadExecutor()

def future = service.submit {
    log '開始'
    Thread.sleep(1000)
    log '終了'

    "終了 (${Thread.currentThread().name})"
} as Callable<String>

service.shutdown()

log '結果取得'
log "結果: ${future.get()}"

log '待機中'
service.awaitTermination(1, TimeUnit.HOURS)
log '待機終了'

実行結果。

[main][12:10:42.284] 結果取得
[pool-1-thread-1][12:10:42.284] 開始
[pool-1-thread-1][12:10:43.339] 終了
[main][12:10:43.340] 結果: 終了 (pool-1-thread-1)
[main][12:10:43.341] 待機中
[main][12:10:43.348] 待機終了

invokeAll メソッド

指定された処理を全て非同期で実行し、すべての処理の終了を待機し、処理結果を取得可能な Future のリストを返す。
invokeAll を使うサンプルは以下。

def service = Executors.newFixedThreadPool(5)

def list = (0..<5).collect { i -> 
    ({
        log '開始'
        Thread.sleep(1000 * (i + 1))
        log '終了'

        "終了 (${Thread.currentThread().name})"
    } as Callable<String>)
}

def futures = service.invokeAll(list)

service.shutdown()

log '結果取得'
futures.each { future ->
    log "結果: ${future.get()}"
}

service.awaitTermination(1, TimeUnit.HOURS)
log '待機終了'
[pool-1-thread-3][12:15:56.237] 開始
[pool-1-thread-1][12:15:56.237] 開始
[pool-1-thread-5][12:15:56.237] 開始
[pool-1-thread-4][12:15:56.237] 開始
[pool-1-thread-2][12:15:56.237] 開始
[pool-1-thread-1][12:15:57.315] 終了
[pool-1-thread-2][12:15:58.315] 終了
[pool-1-thread-3][12:15:59.300] 終了
[pool-1-thread-4][12:16:00.315] 終了
[pool-1-thread-5][12:16:01.312] 終了
[main][12:16:01.315] 結果取得
[main][12:16:01.325] 結果: 終了 (pool-1-thread-1)
[main][12:16:01.326] 結果: 終了 (pool-1-thread-2)
[main][12:16:01.327] 結果: 終了 (pool-1-thread-3)
[main][12:16:01.330] 結果: 終了 (pool-1-thread-4)
[main][12:16:01.330] 結果: 終了 (pool-1-thread-5)
[main][12:16:01.338] 待機終了

invokeAny メソッド

指定された全ての処理を非同期で実行し、例外をスローせずに正常に終了した最初の処理の結果を返す。最初に正常終了した処理が発生した時点時点で、終了していない処理では InterruptedException が発生し、処理が取り消される。

※ 使い道がよくわからない。

def service = Executors.newFixedThreadPool(5)

def list = (0..<5).collect { i -> 
    ({
        try {
            log '開始'
            Thread.sleep(1000 * i)
            log '終了'

            "終了 (${Thread.currentThread().name})"
        } catch (Exception e) {
            log '異常終了'
        }
    } as Callable<String>)
}

def result = service.invokeAny(list)

log '結果取得'
log "結果: ${result}"

service.shutdown()

log '待機中'
service.awaitTermination(1, TimeUnit.HOURS)
log '待機終了'

1個目が正常終了したタイミングで、その他が異常終了している。

[pool-1-thread-2][12:16:40.167] 開始
[pool-1-thread-1][12:16:40.167] 開始
[pool-1-thread-5][12:16:40.167] 開始
[pool-1-thread-4][12:16:40.167] 開始
[pool-1-thread-3][12:16:40.167] 開始
[pool-1-thread-1][12:16:41.231] 終了
[pool-1-thread-2][12:16:41.232] 異常終了
[pool-1-thread-5][12:16:41.232] 異常終了
[pool-1-thread-3][12:16:41.232] 異常終了
[main][12:16:41.233] 結果取得
[pool-1-thread-4][12:16:41.233] 異常終了
[main][12:16:41.236] 結果: 終了 (pool-1-thread-1)
[main][12:16:41.238] 待機中
[main][12:16:41.243] 待機終了

コメント