Zrównolegnienie web crawlera

0

Mam taką metodę:

public List<org.bson.Document> crawlForAllMongoDoc() {
		List<org.bson.Document> allDocuments = new ArrayList<>();

		for (Page page : forumTopic) {
			LOG.log(Level.INFO, "downloading url=" + page.url);
			for (Post post : page) {
				allDocuments.add(post.toMongoDocument());
			}
		}
		return allDocuments;
	}

Pobiera html ze strony internetowej i przerabia. Jak przerobi to pobiera następny html z kolejnej strony i znów przerabia. itd.

Chciałbym to sparaleryzować:
1 wątek pobiera html ze strony i nie czeka, aż html zostanie przerobiony tylko pobiera od razu następny.
2 wątek przerabia pobrane html, jeśli są jakieś pobrane.

public List<org.bson.Document> crawlForAllMongoDocParraler() {
		List<org.bson.Document> allDocuments = new ArrayList<>();

		Queue<Page> pagesQueue = new LinkedList<>();

		Thread pagerThread = new Thread(() -> {
			for (Page page : forumTopic) {
				LOG.log(Level.INFO, "downloading url=" + page.url);
				pagesQueue.add(page);
				synchronized (this) {
					notify();
				}
			}
		});

		Thread posterThread = new Thread(() -> {
			while (!pagesQueue.isEmpty() || pagerThread.isAlive()) {
				if (pagesQueue.isEmpty()) {
					synchronized (this) {
						try {
							wait();
						} catch (InterruptedException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}
					}
				} else {
					for (Post post : pagesQueue.poll()) {
						allDocuments.add(post.toMongoDocument());
					}
				}
			}
		});

		pagerThread.start();
		posterThread.start();

		try {
			pagerThread.join();
			posterThread.join();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		return allDocuments;
	}

czy to jest w miarę OK? Zszedłem ze 150 sekund do 130 dzięki temu.
Widzę ryzyko, że jak w 17 linijce wątek 1 będzie żywy, ale już w 21 martwy to wątek 2 nigdy się nie zatrzyma.

4

W sensie chcesz wysmażyć sobie coś w rodzaju takiego pipeline'a?

screenshot-20190802204835.png

No to tak w wariancie najbardziej "na pałę" tzn rzeźbiąc (prawie) wszystko sam

  • robisz sobie jakieś tam implementacje Runnable/Taska czy coś takiego dla każdego etapu jaki chcesz wykonać (pobieranie HTMLi, jakieś parsowanie itd)
  • robisz sobie jakiś ThreadPoolExecutor czy inny ProcessPoolExecutor...
  • ...do którego subskrybujesz sobie asynchronicznie zadanie ściągnięcia dokumentu
  • dla każdego zadania dostajesz jakiś tam Future, przepychasz go dalej jako nowego taska...
  • ...który z kolei czeka na tego Future, wyciąga z niego cukierka i go konsumuje, po czym wypluwa orzeszki
  • dostajesz z niego kolejny Future tym razem z orzeszkami itp itd. który przepychasz do taska "wyjmij orzeszki z papierka i zmiel na mączkę"
  • I na końcu robisz jakiś waiter który czeka aż wszystkie Futury z mąką z orzeszków czy co to tam miało być albo coś zwrócą, albo się wysypią
processPool = ProcessPool()
futures = []

for url in urlSource.getAll():
  htmlFuture = processPool.executeAsync(DownloadHtmlTask(url))
  parsedDataFuture = processPool.executeAsync(ParseHtmlTask(htmlFuture))
  somethingElseFuture = processPool.executeAsync(DoSomethingElseTask(parsedDataFuture))
  futures.append(somethingElseFuture)

results, errors = waitUntilAllCompleteOrFail(futures)

// bla bla bla

I nie bawisz się w jakieś żonglowanie joinami i pilnowanie, który zjoinować przed którym

1

Korzystanie bezpośrednio z klasy Thread nie jest optymalne, od Javy 6/7 są na to lepsze sposoby (pakiet java.util.concurrent, ExecutorService).
Poczytaj o Fork-Join Pool: https://www.baeldung.com/java-fork-join, https://howtodoinjava.com/java7/forkjoin-framework-tutorial-forkjoinpool-example/
Przykład użycia (crawler): https://www.javaworld.com/article/2078440/java-tip-when-to-use-forkjoinpool-vs-executorservice.html, https://github.com/niravshah/wd_webcrawler

3
  1. ExecutorService jakis weź jak człowiek
  2. ConcurrentQueue zamiast synchronizy
  3. Puść to na wielu wątkach bo i tak wąskie gardło to pobieranie tych wyników. Możesz mieć N wątków ściagających i dużo mniej przetwrzajacych
  4. Tu sie to nie przyda, ale jakbyś chciał być ultra low latency to mozna by w ogóle wywalić te synchronizacje i nawet concurrentqueue, robiąc tam AtomicReference na zwykłą listę i jak wątek przetwarzający chce sobie pobrać listę zadań to robi atomową wymianę na pustą listą, a tą pełną zaczyna sobie przetwarzać ;]
2

Podam inne rozwiązanie mam nadzieję, że nie będzie hipsterskie, będziesz potrzebował Reactora i jakiś klient http, może być bajerancki asynchroniczny i reaktywny, ale nie musi, uwaga pisane z pamięci

val allDocuments = Flux.fromIterable(urls)
        .flatMap(url -> download(url))
        .map(page -> fancyStuff(page))
        .collectList()
        .block();

to wystarczy jak będzie klient reaktywny klient, jak nie to można blokujące wywołanie download opakować przy użyciu np. Mono.fromCallable() i na końcu zasubskrybować się schedulerem z jakąs pulą wątków, polecam Schedulers.fromExecutorService() albo znaleźć coś dla siebie z klasy Schedulers. Można też bardziej idiomatycznie i nie blokować się na końcu tylko dalej zwrócić Fluxa

1 użytkowników online, w tym zalogowanych: 0, gości: 1