Kolejkowanie wątków

0

Witam,

Moja aplikacja wykonuje na zbiorze pewnych danych operację X w pętli for.
Operacja X jest jednak czasochłonna, dlatego chcę aby była wykonywana jednocześnie dla określonej liczby danych - oczywiście mogę to zrealizować dzięki wątkom.

I teraz pytanie: jak to zrobić?

Powiedzmy, że mam zbiór danych - 1000 elementów na których muszę przeprowadzić tę operację, ale nie mogę naraz odpalić 1000 wątków.

Czy dobrym pomysłem jest stworzenie tych wątków, zapisanie do kolejki, a następnie zdejmowanie z niej np po 5 wątków i uruchamianie ich, dopóki kolejka nie będzie pusta?

0

może od drugiej strony, wrzucić kilka wątków i jedną kolejkę z operacjami:

-wątek sprawdza kolejkę

brak operacji w kolejce = koniec wątku
-zdejmuje 1 operacje
-wykonuje i zaczyna od nowa

kolejka musi być threadsafe (np. blockingqueue) i może ona zawierać obiekty na wzór IOperacja

0

No tak, lepsze rozwiązanie :)

Pytanie: Czy dobrym pomysłem (bezpiecznym) jest sprawdzanie kolejki przez wątek w celu ustalenia czy zawiera ona jeszcze jakieś elementy w ten sposób:
while (!kolejka.isEmpty()) {
robi cos...
}

Używam obiektu LinkedBlockingQueue, czyli threadsafe.

0

To jest blokujaca kolejka, wiec nie musisz uzywac pollingu - wywlujesz metode take() i jesli cos jest, zostaje zwrocone, jesli nie, watek blokuje az cos bedzie dostepne.
Aby zakonczyc caly proces mozesz np wpuszczac do kolejki specjalny task, ktory jesli zostanie ropzoznany oznacza ze nastapil koniec. To bedzie ladnie wspolpracowac z ta kolejka. Jesli masz kilka watkow, i jeden z nich dostanie poison, to mozesz np wylaczyc np ExecutorService.shutdown(), jesli go uzywasz.
Cos takiego bym pewnie probowal zrobic.

0

Zrobiłem w ten sposób:

Wątki pobierają elementy (Stringi) z kolejki za pomocą take() w pętli while. While działa dopóki pobrany element jest różny od "end".

Ale mam problem z shutdown() obiektu ExecutorService. Wywołując ją i tak nie kończę wątku, który oczekuje od pustej już kolejki następnych elementów.

Natomiast shutdownNow() wyrzuca:
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(Unknown Source)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(Unknown Source)
at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

EDIT:
Klasa main:

public class Main {

	private static ExecutorService exe;
	
	public static void main(String[] args) {
		LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>();
                exe = Executors.newFixedThreadPool(2);
		
		for (int i = 0; i < 100; i++) {
			queue.add("word"+i);
		}
		queue.add("end");
		
		MyThread myth1 = new MyThread(1, queue);
		MyThread myth2 = new MyThread(2, queue);
		
		exe.execute(myth1);
		exe.execute(myth2);
	}
	
	public static void metoda() {
	    try {
			exe.shutdown();
			exe.awaitTermination(5, TimeUnit.SECONDS);
			exe.shutdownNow();
	    }
	    catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

metoda run wątku:

public void run() {
	int count = 0;
	String word = "";
		
	try {
		while (!(word = queue.take()).equals("end")) {
			word += "#"+rand.nextInt(10000);
			System.out.println("Thread "+id+": "+word);
			count++;
		}
	}
	catch (InterruptedException e) {
		e.printStackTrace();
	}
		
	System.out.println("### Elements in queue: "+queue.size()+".\n" +
		"### Thread "+id+" process "+count+" elements.");
		
	Main.metoda();
}
0

Obczaj sobie pakiet java.util.concurrent. I tam są już gotowe Executory do tego co chcesz zrobić np. ThreadPoolExecutor.

0

OK, ale mam jeszcze inny problem...
Otóż całą pracę programu chciałbym powtórzyć n razy, ale nie wiem jak zgrać działanie pętli z wątkami

jeśli mam coś takiego

do {
  > dodanie elementów do kolejki
  > Stworzenie i uruchomienie wątków pracujących na kolejce
} while (warunek);

to zanim skończy się wykonywanie wątków to przelecą kolejne iteracje pętli while i kolejka na której pracują wątki zmieni swój stan... a chyba Thread.sleep() na końcu bloku do chyba nie jest najlepszym rozwiązaniem...

0

Thread.sleep nigdy nie jest dobrym rozwiązaniem do synchronizacji.
Użyj shutdown i awaitTermination.

http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.html#awaitTermination%28long,%20java.util.concurrent.TimeUnit%29

0

Dzięki za podpowiedź. Na początku pętli inicjuję obiekt executora, a na koniec, tak jak napisałeś, robię shutdown i awaitTermination.
I działa :)

1 użytkowników online, w tym zalogowanych: 0, gości: 1