2020/05/02

Java - 割り込み可能な反復処理スレッドを作る

Java にはスレッドに対する割り込み機構があります。
スレッドからスレッドに interrupt シグナルを送って、割り込みたいという意思を伝える仕組みです。しかし、どんなスレッドでも interrupt を送れば割り込める、というような簡単なものではなく、割り込まれる側のスレッドが、interrupt シグナルを受け取ったときの処理を正しく実装していなければ、割り込むことはできません。
具体的には、InterruptedException や Thread.interrupted() のハンドリングを正しく行う必要があります。
今回はサンプルとして、繰り返し処理をし続けるスレッドを割り込み可能として作るためのちょっとした基底クラスを用意しましたので、見ていきたいと思います。

基底クラス InterruptableLoopThread

次のようなクラスを用意しました。

public abstract class InterruptableLoopThread extends Thread {
  @Override
  public void run() {
    try {
      while (true) {
        try {
          if (!execute()) {
            break;
          }
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
        }
        if (Thread.interrupted()) {
          doWhenInterrupted();
          break;
        }
      }
    } finally {
      doWhenFinished();
    }
  }
  public abstract boolean execute() throws InterruptedException;
  public abstract void doWhenInterrupted();
  public abstract void doWhenFinished();
}

仕様

InterruptableLoopThread は繰り返し同じ処理をし続けるスレッドを作るための抽象基底クラスになっています。処理内容は execute() をオーバーライドして定義します。execute() の戻り値は処理を継続するかどうかを示しており、false を返すと処理が終了します。処理が終了すると doWhenFinished() が呼ばれるようになっているので、後処理が定義できます。
execute() 中に interrupt シグナルを受け取ったときは、実行中の execute() が終わった時点で処理が終了します。execute() 内で InterruptedException が発生したときは、すぐに処理が終了します。いずれの場合も doWhenInterrupted() が呼ばれるようになっており、割り込まれたときの処理を定義できます。

仕組み

Thread のサブクラスとして定義しており、run() をオーバーライドしているので、start() を呼べば処理が開始します。
基本的には while(true) {execute();} という無限ループの構造ですが、execute() が false を返せば終了するようにしています (7〜9行目)。
ループ1回ごとに、execute() の後ろで Thread.interrupted() を呼んでいます (13行目)。これは、このスレッドが interrupt シグナルを受け取った状態にあるかどうかをチェックし、受け取っていれば true を返した上で、状態を未受信に戻す、という処理です。つまり、割り込みがかかると、仕掛かりの execute() が終わってから doWhenInterrupted() が呼ばれ、その後処理を終了する、ということになります (14〜15行目)。
execute() 内で InterruptedException が発生した場合は、一旦拾ってあげます。execute() 内で wait() や Thread.sleep() をしていたときに割り込みがかかると、InterruptedException が発生しますので、そのケアです。catch した時点では interrupt シグナルが未受信の状態に戻っているので、このままでは doWhenInterruped() に入りません。そのため Thread.currentThread().interrupt() で改めて interrupted 状態に入れてあげています (11行目)。
run() 全体には try {...} finally {doWhenFinished();} が施されているので、どんな終わり方であっても (execute() が false を返しても、割り込みがかかっても)、doWhenFinished() が呼ばれます。

使用例

前述の InterruptedLoopThread を使用した例を見ていただきましょう。
14行目からの execute() の中に、反復したい処理内容が記述されています。
1回実行されるごとに counter を増やし、5回実行したら終了するようになっています (28行目)。各回の処理の最初と最後にメッセージを出力しています。処理の中身は、0〜99999 までの数値を素因数分解する処理 (17〜25行目) を80回繰り返していて、1回ごとに「.」を出力しています。
doWhenInterrupted() は、割り込みが掛かったときにメッセージを出力するようにしており、doWhenFinished() は終了したときにメッセージを出力しています。

public class Loop extends InterruptableLoopThread {
  public static void main(final String[] args) throws InterruptedException {
    Thread t = new Loop();
    t.start();
    Thread.sleep(10000);
    t.interrupt();
    System.err.print("[do interrupt()]");
  }

  private int counter = 0;

  @Override
  public boolean execute() throws InterruptedException {
    System.out.println("#" + (counter + 1) + " processing started.");
    for (int i = 0; i < 80; i++) {
      System.out.print(".");
      for (int j = 0; j < 100000; j++) {
        int x = j;
        int m = (int) Math.sqrt(j);
        for (int k = 2; k < m; k++) {
          if (x % k == 0) {
            x /= k;
          }
        }
      }
    }
    System.out.println("\n#" + (counter + 1) + " processing finished.");
    return ++counter < 5;
  }

  @Override
  public void doWhenInterrupted() {
    System.out.println("Interrupted!");
  }

  @Override
  public void doWhenFinished() {
    System.out.println("All processing finished! " + counter + " times processed.");
  }
}

main() メソッドでは、このスレッドを走らせた後、10000ミリ秒待ってから、割り込みをかけています。

実行結果を見てみましょう。まず先に、6〜7行目を取り除いて割り込みを掛けなくしたときの出力を確認しておきます。

#1 processing started.
................................................................................
#1 processing finished.
#2 processing started.
................................................................................
#2 processing finished.
#3 processing started.
................................................................................
#3 processing finished.
#4 processing started.
................................................................................
#4 processing finished.
#5 processing started.
................................................................................
#5 processing finished.
All processing finished! 5 times processed.

これに対し、10000ミリ秒で割り込みを掛けると、このようになります。

#1 processing started.
................................................................................
#1 processing finished.
#2 processing started.
.............................................................................[do interrupt()]...
#2 processing finished.
Interrupted!
All processing finished! 2 times processed.

[do interrupt()] と出ているところで割り込みが掛かっています。
後ろに「.」の出力が継続していることや、#2 の finished が出力されていることから分かる通り、実行中の execute() は中断せず、execute() が終わったところで終了しています。
ここで、25行目と26行目の間に Thread.sleep(0, 0); を入れてみます。そうすると、以下のように振舞いが変わります。

#1 processing started.
................................................................................
#1 processing finished.
#2 processing started.
................................................................................
#2 processing finished.
#3 processing started.
...[do interrupt()]Interrupted!
All processing finished! 2 times processed.

#3 の途中で割り込みが掛かり、execute() の途中で中断しました。これは、Thread.sleep() の箇所で interrupt シグナルが消化されて InterruptedException が発生したためです。このように、Thread.sleep() を入れることで、割り込み可能なタイミングを作ってあげることも可能です。

なかなか普段使わない interrupt の仕組みですが、バックグラウンドスレッドを作るときにはこのように、interrupt の受け入れを考慮しておくと、綺麗に止めることができて運用しやすいことがあります。こういうことができるということを頭の片隅にでも置いておいていただければと思います。