はじめに
Groovy でマルチスレッド処理をする場合は、Executors
を利用すると便利。
Executors
の基本的な使い方
Executors.newSingleThreadExecutor
メソッドなどでExecutorService
のインスタンスを取得する。ExecutorService#execute
メソッドなどで非同期処理を実行する。ExecutorService#shutdown
メソッドで非同期処理の受付を終了する。ExecutorService#awaitTermination
メソッドで非同期処理の終了を待機する。
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
// ExecutorService を取得
def service = Executors.newSingleThreadExecutor()
// 非同期処理の実行
service.execute {
Thread.sleep(1000)
}
// 非同期処理の受付を終了
service.shutdown()
// 非同期処理の終了を待機
service.awaitTermination(1, TimeUnit.HOURS)
利用するクラスは以下の3クラス。
1. Executors
2. ExecutorService
3. Future
Executors
の使い方
Executors
で利用するメソッドは以下。
以下のメソッドはすべて static
で、ExecutorService
のインスタンスが取得できる。
メソッド | 説明 |
---|---|
newSingleThreadExecutor | 単一のワーカー・スレッドを使用する。 |
newCachedThreadPool | 必要に応じ、新規スレッドを作成するスレッド・プールを作成する。利用可能な場合には以前に構築されたスレッドを再利用する。 |
newFixedThreadPool | 固定数のスレッドを再利用するスレッド・プールを作成します。 |
newWorkStealingPool | すべての使用可能なプロセッサをターゲット並列性レベルとして使用して、work-stealingスレッド・プールを作成します。 |
テスト用共通メソッド
各メソッドのサンプルを動作させるための共通メソッドは以下。
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
def log(status) {
println "[${Thread.currentThread().name}][${new Date().format('HH:mm:ss.SSS')}] ${status}"
}
def execute(method, service) {
println method
long l = System.currentTimeMillis()
(0..<10).each { i ->
service.execute {
log '開始'
Thread.sleep(100)
log '終了'
}
Thread.sleep(20)
}
service.shutdown()
log '待機中'
service.awaitTermination(1, TimeUnit.MINUTES)
log '待機完了'
}
newSingleThreadExecutor
メソッド
単一のワーカー・スレッドを使用する。
execute 'newSingleThreadExecutor', Executors.newSingleThreadExecutor()
1個のワーカー・スレッドで実行される。
newSingleThreadExecutor
[pool-1-thread-1][09:51:20.741]開始
[pool-1-thread-1][09:51:20.885]終了
[pool-1-thread-1][09:51:20.886]開始
[main][09:51:20.942]待機中
[pool-1-thread-1][09:51:20.988]終了
[pool-1-thread-1][09:51:20.988]開始
[pool-1-thread-1][09:51:21.098]終了
[pool-1-thread-1][09:51:21.099]開始
[pool-1-thread-1][09:51:21.204]終了
[pool-1-thread-1][09:51:21.205]開始
[pool-1-thread-1][09:51:21.317]終了
[pool-1-thread-1][09:51:21.317]開始
[pool-1-thread-1][09:51:21.422]終了
[pool-1-thread-1][09:51:21.424]開始
[pool-1-thread-1][09:51:21.533]終了
[pool-1-thread-1][09:51:21.536]開始
[pool-1-thread-1][09:51:21.650]終了
[pool-1-thread-1][09:51:21.651]開始
[pool-1-thread-1][09:51:21.766]終了
[pool-1-thread-1][09:51:21.768]開始
[pool-1-thread-1][09:51:21.881]終了
[main][09:51:21.882]待機完了
newCachedThreadPool
メソッド
必要に応じ、新規スレッドを作成するスレッド・プールを作成する。
利用可能な場合には以前に構築されたスレッドを再利用する。
execute 'newCachedThreadPool', Executors.newCachedThreadPool()
再利用可能なワーカー・スレッドが存在する場合は、ワーカー・スレッドを再利用している。
newCachedThreadPool
[pool-2-thread-1][09:51:21.889]開始
[pool-2-thread-2][09:51:21.908]開始
[pool-2-thread-3][09:51:21.929]開始
[pool-2-thread-4][09:51:21.949]開始
[pool-2-thread-5][09:51:21.970]開始
[pool-2-thread-1][09:51:21.989]終了
[pool-2-thread-6][09:51:21.993]開始
[pool-2-thread-2][09:51:22.010]終了
[pool-2-thread-2][09:51:22.013]開始
[pool-2-thread-3][09:51:22.031]終了
[pool-2-thread-3][09:51:22.034]開始
[pool-2-thread-4][09:51:22.051]終了
[pool-2-thread-4][09:51:22.054]開始
[pool-2-thread-5][09:51:22.072]終了
[pool-2-thread-5][09:51:22.074]開始
[main][09:51:22.094]待機中
[pool-2-thread-6][09:51:22.097]終了
[pool-2-thread-2][09:51:22.114]終了
[pool-2-thread-3][09:51:22.139]終了
[pool-2-thread-4][09:51:22.154]終了
[pool-2-thread-5][09:51:22.175]終了
[main][09:51:22.176]待機完了
newFixedThreadPool
固定数のスレッドを再利用するスレッド・プールを作成する。
以下は最大5個のワーカー・スレッドで実行するサンプル。
execute 'newFixedThreadPool(5)', Executors.newFixedThreadPool(5)
最大5個のワーカー・スレッドで実行されていて、
一度使ったワーカー・スレッドを再利用している。
newFixedThreadPool(5)
[pool-3-thread-1][09:51:22.179]開始
[pool-3-thread-2][09:51:22.199]開始
[pool-3-thread-3][09:51:22.219]開始
[pool-3-thread-4][09:51:22.240]開始
[pool-3-thread-5][09:51:22.259]開始
[pool-3-thread-1][09:51:22.279]終了
[pool-3-thread-1][09:51:22.280]開始
[pool-3-thread-2][09:51:22.300]終了
[pool-3-thread-2][09:51:22.301]開始
[pool-3-thread-3][09:51:22.321]終了
[pool-3-thread-3][09:51:22.322]開始
[pool-3-thread-4][09:51:22.342]終了
[pool-3-thread-4][09:51:22.343]開始
[pool-3-thread-5][09:51:22.360]終了
[pool-3-thread-5][09:51:22.361]開始
[pool-3-thread-1][09:51:22.381]終了
[main][09:51:22.382]待機中
[pool-3-thread-2][09:51:22.402]終了
[pool-3-thread-3][09:51:22.422]終了
[pool-3-thread-4][09:51:22.448]終了
[pool-3-thread-5][09:51:22.462]終了
[main][09:51:22.463]待機完了
newWorkStealingPool
すべての使用可能なプロセッサをターゲット並列性レベルとして使用して、work-stealingスレッド・プールを作成する。
println "プロセッサ数: ${Runtime.runtime.availableProcessors()}"
execute 'newWorkStealingPool', Executors.newWorkStealingPool()
プロセッサ数を最大ワーカー・スレッド数として実行される。
プロセッサ数: 4
newWorkStealingPool
[ForkJoinPool-1-worker-1][10:01:17.844] 開始
[ForkJoinPool-1-worker-2][10:01:17.856] 開始
[ForkJoinPool-1-worker-3][10:01:17.877] 開始
[ForkJoinPool-1-worker-0][10:01:17.897] 開始
[ForkJoinPool-1-worker-1][10:01:17.960] 終了
[ForkJoinPool-1-worker-2][10:01:17.960] 終了
[ForkJoinPool-1-worker-1][10:01:17.961] 開始
[ForkJoinPool-1-worker-2][10:01:17.963] 開始
[ForkJoinPool-1-worker-3][10:01:17.978] 終了
[ForkJoinPool-1-worker-3][10:01:17.978] 開始
[ForkJoinPool-1-worker-0][10:01:17.998] 終了
[ForkJoinPool-1-worker-0][10:01:17.998] 開始
[main][10:01:18.040] 待機中
[ForkJoinPool-1-worker-1][10:01:18.064] 終了
[ForkJoinPool-1-worker-1][10:01:18.065] 開始
[ForkJoinPool-1-worker-2][10:01:18.065] 終了
[ForkJoinPool-1-worker-2][10:01:18.066] 開始
[ForkJoinPool-1-worker-3][10:01:18.079] 終了
[ForkJoinPool-1-worker-0][10:01:18.100] 終了
[ForkJoinPool-1-worker-1][10:01:18.166] 終了
[ForkJoinPool-1-worker-2][10:01:18.167] 終了
[main][10:01:18.168] 待機完了
ExecutorService
の使い方
ExecutorService
で非同期処理を実行するメソッドは以下。
メソッド | 説明 |
---|---|
execute | 指定された処理を非同期で実行する。戻り値は取得できない。 |
submit | 値を返す処理を非同期で実行し、処理結果を取得可能にする Future を返す。 |
invokeAll | 指定された処理を全て非同期で実行し、すべての処理の終了を待機し、処理結果を取得可能な Future のリストを返す。 |
invokeAny | 指定された全ての処理を非同期で実行し、例外をスローせずに正常に終了した最初の処理の結果を返す。最初に正常終了した処理が発生した時点時点で、終了していない処理では InterruptedException が発生し、処理が取り消される。 |
テスト用共通メソッド
各メソッドのサンプルを動作させるための共通メソッドは以下。
def log(status) {
println "[${Thread.currentThread().name}][${new Date().format('HH:mm:ss.SSS')}] ${status}"
}
execute
メソッド
指定された処理を非同期で実行する。
def service = Executors.newSingleThreadExecutor()
service.execute {
log '開始'
Thread.sleep(1000)
log '終了'
}
service.shutdown()
log '待機中'
service.awaitTermination(1, TimeUnit.HOURS)
log '待機終了'
実行結果。
[main][10:34:57.627] 待機中
[pool-1-thread-1][10:34:57.627] 開始
[pool-1-thread-1][10:34:58.668] 終了
[main][10:34:58.669] 待機終了
submit
メソッド
値を返す処理を非同期で実行し、処理結果を取得可能にする Future
を返す。groovy
で値を返す必要がある場合は、クロージャを明示的に Callable<E>
に変換する必要がある。
def service = Executors.newSingleThreadExecutor()
def future = service.submit {
log '開始'
Thread.sleep(1000)
log '終了'
"終了 (${Thread.currentThread().name})"
} as Callable<String>
service.shutdown()
log '結果取得'
log "結果: ${future.get()}"
log '待機中'
service.awaitTermination(1, TimeUnit.HOURS)
log '待機終了'
実行結果。
[main][12:10:42.284] 結果取得
[pool-1-thread-1][12:10:42.284] 開始
[pool-1-thread-1][12:10:43.339] 終了
[main][12:10:43.340] 結果: 終了 (pool-1-thread-1)
[main][12:10:43.341] 待機中
[main][12:10:43.348] 待機終了
invokeAll
メソッド
指定された処理を全て非同期で実行し、すべての処理の終了を待機し、処理結果を取得可能な Future
のリストを返す。invokeAll
を使うサンプルは以下。
def service = Executors.newFixedThreadPool(5)
def list = (0..<5).collect { i ->
({
log '開始'
Thread.sleep(1000 * (i + 1))
log '終了'
"終了 (${Thread.currentThread().name})"
} as Callable<String>)
}
def futures = service.invokeAll(list)
service.shutdown()
log '結果取得'
futures.each { future ->
log "結果: ${future.get()}"
}
service.awaitTermination(1, TimeUnit.HOURS)
log '待機終了'
[pool-1-thread-3][12:15:56.237] 開始
[pool-1-thread-1][12:15:56.237] 開始
[pool-1-thread-5][12:15:56.237] 開始
[pool-1-thread-4][12:15:56.237] 開始
[pool-1-thread-2][12:15:56.237] 開始
[pool-1-thread-1][12:15:57.315] 終了
[pool-1-thread-2][12:15:58.315] 終了
[pool-1-thread-3][12:15:59.300] 終了
[pool-1-thread-4][12:16:00.315] 終了
[pool-1-thread-5][12:16:01.312] 終了
[main][12:16:01.315] 結果取得
[main][12:16:01.325] 結果: 終了 (pool-1-thread-1)
[main][12:16:01.326] 結果: 終了 (pool-1-thread-2)
[main][12:16:01.327] 結果: 終了 (pool-1-thread-3)
[main][12:16:01.330] 結果: 終了 (pool-1-thread-4)
[main][12:16:01.330] 結果: 終了 (pool-1-thread-5)
[main][12:16:01.338] 待機終了
invokeAny
メソッド
指定された全ての処理を非同期で実行し、例外をスローせずに正常に終了した最初の処理の結果を返す。最初に正常終了した処理が発生した時点時点で、終了していない処理では InterruptedException
が発生し、処理が取り消される。
※ 使い道がよくわからない。
def service = Executors.newFixedThreadPool(5)
def list = (0..<5).collect { i ->
({
try {
log '開始'
Thread.sleep(1000 * i)
log '終了'
"終了 (${Thread.currentThread().name})"
} catch (Exception e) {
log '異常終了'
}
} as Callable<String>)
}
def result = service.invokeAny(list)
log '結果取得'
log "結果: ${result}"
service.shutdown()
log '待機中'
service.awaitTermination(1, TimeUnit.HOURS)
log '待機終了'
1個目が正常終了したタイミングで、その他が異常終了している。
[pool-1-thread-2][12:16:40.167] 開始
[pool-1-thread-1][12:16:40.167] 開始
[pool-1-thread-5][12:16:40.167] 開始
[pool-1-thread-4][12:16:40.167] 開始
[pool-1-thread-3][12:16:40.167] 開始
[pool-1-thread-1][12:16:41.231] 終了
[pool-1-thread-2][12:16:41.232] 異常終了
[pool-1-thread-5][12:16:41.232] 異常終了
[pool-1-thread-3][12:16:41.232] 異常終了
[main][12:16:41.233] 結果取得
[pool-1-thread-4][12:16:41.233] 異常終了
[main][12:16:41.236] 結果: 終了 (pool-1-thread-1)
[main][12:16:41.238] 待機中
[main][12:16:41.243] 待機終了
コメント