Kiedy funkcja Thread.interrupt() zadziała?

0

Witam

Chciałem przetestować interfejs BlockingQueue. W tym celu napisałem sobie taki prosty programik. Niestety zadania consumerów się nie kończą mimo że producer je przerywa.
Moje pytania są takie:

  1. Dlaczego zaproponowane przeze mnie rozwiązanie nie działa i jak je poprawić?
  2. Czytałem, że funkcja interrupt()** nie może** przerwać wątku, który oczekuje na operację wejścia wyjścia oraz oczekuje na zwolnienie blokady monitora obiektu, do którego chce się dobrać (ten wątek).Skąd mam wiedzieć czy dany wątek podjął próbę przejęcia monitora i że wywołanie interrupt() dla niego nie zadziała?
  3. Próba umieszczenia danych w zapchanej kolejce blokującej lub pobrania danych z pustej kolejki skutkuje zawieszeniem wątku. Czyli oddaje on blokadę monitora do obiektu kolejki? No bo inny wątek chyba musi wstawić wartość by ten zablokowany mógł ją pobrać, tak? Co znaczy, że wątek jest zablokowany?

Myślę, że odpowiedzi na te pytania pozwoliłby mi rozwiązać ten problem:
Dziękuję za wszelką pomoc.

class Consumer extends Thread {
    private LinkedBlockingQueue<Integer> queue;
    private ArrayList<Integer> consumedValues = new ArrayList<Integer>();

    public boolean endFlag = false;
    public Consumer(LinkedBlockingQueue<Integer> queue) {
        this.queue = queue;
    }
    @Override public synchronized void  run() {
        try {
              while(!Thread.interrupted()) {
                System.out.println("Przed konsumpcją... " + Thread.currentThread().getName());
                int value = queue.take();
                System.out.println("...po konsumpcji! " + Thread.currentThread().getName());
                consumedValues.add(value);
                System.out.println(Thread.currentThread().getName() + " konsumuje: " + value);
            }
        }
        catch (InterruptedException ex) {
            System.out.println("Przerwano Consumer: " + this);
        }
        finally {
            endFlag = true;
            System.out.println("Koniec zadania consumer: " + Thread.currentThread().getName());
            System.out.println("Skonsumowane wartosci: " + consumedValues);
        }
    }
}


class Producer implements Runnable {
    LinkedBlockingQueue<Integer> queue;
    Consumer[] consumers;
    public Producer(LinkedBlockingQueue<Integer> queue, Consumer... consumers) {
        this.queue = queue;
        this.consumers = consumers;
    }
    @Override public void run() {
        try {
            for(int i=0; i<10; ++i) {
                queue.put(i);
                Thread.sleep(100);
            }
            while(!consumers[0].endFlag || !consumers[1].endFlag) {
                //tutaj blokuje sie program, nie ma informacji zwrotnej od consumerow w postaic ustawienia flagi endFlag
                for(Consumer c : consumers) {
                    c.interrupt();
                }
            }
            
        } catch (InterruptedException ex) {
            System.out.println("Przerwano prace producenta!");
        }
        System.out.println("Koniec zadania producer!");
    }
}

public class ArrayQueueMachine {
    
    public static void main(String[] args) throws Exception {
        LinkedBlockingQueue queue = new LinkedBlockingQueue(10);
        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);
        Producer producer = new Producer(queue,consumer1,consumer2);
        
        
        ExecutorService exec = Executors.newCachedThreadPool();
        
        exec.execute(producer);
        exec.execute(consumer1);
        exec.execute(consumer2);
        
        exec.shutdown();
    }
    
}
0

W twoim kodzie consumer nigdy się nie zakończy.

  1. Gdy kolejka będzie pusta consumery będą czekać na wykonaniu metody take()
  2. Właśnie dlatego wątku w javie nie da się zatrzymać, można mu jedynie to zasugerować za pomoca Thread.interrupt(), ale kiedy zatrzymanie się wykona zalezy od aktualnie wykonywanej operacji.
  3. Wątek nigdy nie posiada blokady kolejki i nie musi jej oddawać. Po prostu gdy blokada jest zamknięta wątek czeka (nie ma przydzielonego czasu procesora.)

http://fuseyism.com/classpath/doc/java/util/concurrent/LinkedBlockingQueue-source.html

0

Ad 1. No tak, wiem że consumery zawieszą się na wywołaniu take. Ale dlaczego tak się dzieje? Przecież producer ma przerwać pracę consumerów po zakończeniu produkcji (nie ważne czy consumery mają jeszcze coś do odczytania). Producer w pętli wywołuje interrupt() aż consumery ustawią flagę endFlag potwierdzającą ich zakończenie. take() jest metodą rzucą InterruptedException, więc nie rozumiem dlaczego to nie działa.

W źródłach kolejki jest założenie locka jak poniżej co oznacza, że można przerwać taki lock.
putLock.lockInterruptibly();

Ad. 2 Skoro mówisz, że interrupt() to tylko sugestia to odnośnie tego co napisałem w Ad 1. może to działać albo nie działać, zależnie od widzimisie systemu. Na tyle razy co uruchomiłem ten program powinno choć raz zadziałać. Druga sprawa, że gdy z poziomu wątku main wywołam exec.shutdownNow() po pewnym czasie Thread.sleep() to wszystkie zadania się kończą bez wyjątku. Nie rozumiem dlaczego producer nie może wywołać przerwania na consumer skoro main może...

1

Alby rybki albo akwarium. Żeby skorzystać z interrupt nie korzystaj z egzekutorów:

   ...
  class Producer implements Runnable {
    LinkedBlockingQueue<Integer> queue;
    Consumer[] consumers;
  public Producer(LinkedBlockingQueue<Integer> queue, Consumer... consumers) {
    this.queue = queue;
    this.consumers = consumers;
  }
  @Override public void run() {
    try {
      for(int i=0; i<10; ++i) {
        queue.put(i);
        Thread.sleep(100);
      }
      for(Consumer c : consumers) {
        c.interrupt();
      }
    } catch (InterruptedException ex) {
      System.out.println("Przerwano prace producenta!");
    }
    System.out.println("Koniec zadania producer!");
    }
  }

   ...
    LinkedBlockingQueu queue = new LinkedBlockingQueue(10);
    Conumer consumer1 = new Cnsumer(queue);
    Consumer consumer2 = new Consumer(queue);
    Producer producer = new Producer(queue,consumer1,consumer2);

    consumer1.start();
    consumer2.start();
    Thread p = new Thread(producer);
    p.start();
    Scanner scan = new Scanner(System.in);
    String s = scan.next();
    consumer1.interrupt();
    consumer2.interrupt();
    p.interrupt();

Jeśli z egzekutorów to wywołuj submit i przypisuj wartość zwracaną z wątków do zmiennej Future i na nich wywołuj cancel:

    Future p = exec.submit(producer);
    Future c1 = exec.submit(consumer1);
    Future c2 = exec.submit(consumer2);
    exec.shutdown();

    Scanner scan = new Scanner(System.in);
    String s = scan.next();
    p.cancel(true);
    c1.cancel(true);
    c2.cancel(true);

1 użytkowników online, w tym zalogowanych: 0, gości: 1