Spring Webflux - REST i wyścigi wątków

0

Po przeczytaniu sąsiedniego wątku, napisałem sobie prostą aplikacyjkę w webfluxie z licznikiem, aby sprawdzić kolejność wykonywania wątków.

Counter.java:

public class Counter {

    public final long counter;

    public Counter(long counter) {
        this.counter = counter;
    }

    public Counter updateCounter() {
        return new Counter(this.counter + 1);
    }
}

CounterController.java

@Configuration
public class CounterController {

    private final CounterService counterService;

    public CounterController() {
        this.counterService = new CounterService(new Counter(0));
    }

    @Bean
    RouterFunction<ServerResponse> counterRoutes() {
        return nest(path("/api"),
                route(GET("/counter"), counterValue())
        .andRoute(PATCH("/counter"), increaseCounterValue()));
    }

    private HandlerFunction<ServerResponse> counterValue() {
        return request -> {
            final long counterValue = this.counterService.getCounterValue();
            return ServerResponse.ok().body(fromObject(counterValue));
        };
    }

    private HandlerFunction<ServerResponse> increaseCounterValue() {
        return request -> {
            final long counterValue = this.counterService.increaseCounterValue();
            return ServerResponse.ok().body(fromObject(counterValue));
        };
    }
}

CounterService.java

public class CounterService {

    private Counter counter;

    public CounterService(Counter counter) {
        this.counter = counter;
    }

    public long getCounterValue() {
        System.out.println("pobieram countera simple: " + this.counter.counter);
        return this.counter.counter;
    }

    public long increaseCounterValue() {
        this.counter = this.counter.updateCounter();
        System.out.println("zaktualizowalem countera simple: " + this.counter.counter);
        return this.counter.counter;
    }
}

Następnie puściłem JMeterem 100 wątków x 100 requestów. W konsoli ostatni wpis:
zaktualizowalem countera simple: 8749

Zmieniając na public synchronized long increaseCounterValue() {...} w CounterService.java, ostatni wpis w konosli:
zaktualizowalem countera synchronized: 10000

Zmieniając long na AtomicLong w Counter.java (i oczywiście bez synchronized w serwisie)
zaktualizowalem countera atomic: 10000

Robiąc tak jak w pierwszym przykładzie, tylko definiując counter w CounterService.java private volatile Counter counter;
zaktualizowalem countera volatile: 8873

Pytanie

W jaki sposób poprawnie użyć volatile?
W tym temacie Dostarczanie danych przez REST i ich aktualizacja @jarekr000000 mnie zbił z tropu stwierdzeniem, że synchronized jest niewystarczające. W powyższym przykładzie wygląda OK, a nie bardzo wiem czym się różni od kodu @eL ?

PS. Bez sensu podawać czasy benchmarków dla wszystkich 4 przypadków (bez warmupów, kupe innych rzeczy w tle odpalonych itp), ale jak kogoś bardzo to interesuje to wszystkie wykonywały się niemal identycznie na poziomie:
GET (sr, min, max): 16, 0, 32 ms i PATCH(sr, min, max): 1, 0, 9 ms

1

W Twoim przykładzie nowa wartość zależy od poprzedniej. volatile z czymś takim sobie nie radzi.

0

volatile zapewnia nam widoczność zmiany między wątkami (odczyt bezpośrednio z pamięci, a nie z cache procesora). Dodatkowo jest to informacja do tego, aby JIT nie próbował optymalizować tej zmiennej (np zauważa, że warunek pętli jest stały więc wrzuca stałą, zamiast czytać zmienną - nie jest świadomy tego, że inny wątek tę zmienną może zmienić).

0
pustypawel napisał(a):

volatile zapewnia nam widoczność zmiany między wątkami (odczyt bezpośrednio z pamięci, a nie z cache procesora).

Z tym odczytem bezpośrednio z pamięci - to tak jest w C++ (i ma to czasem znaczenie, np. przy czytaniu IO).
W javie to akurat nie ma sensu. Technicznie być może JVM nawet wymuszać odczyt z pamięci, ale to jest szczegół implementacyjny.
Druga część OK.

0

volatile nadaję się tylko do podstawowych operacji tj., odczyt i zapis pamięci. Jeśli chcesz użyć bardziej zaawansowanych operacji, to zostają klasy Atomic*. Co do działania volatile/ogólnie atomiców, to trzeba pamiętać, że sam odczyt z pamięci nie wystarczy. Przykładowo przy zapisie do pamięci wymagana jest synchronizacja: https://gcc.godbolt.org/z/rSsS2R tutaj fajnie widać instrukcję mfence . Dodatkowo wymagane jest odpowiednie zachowanie interpretera/jita: zakazane są zmiany kolejności instrukcji, które mogłyby zmienić kolejność widoczności kolejnych zmian w innych wątkach, co nie jest prawdą w przypadku normalnych zmiennych.

0

OK dzięki. Rozumiem, że volatile nie pomoże, jeśli nowa wartość zależy od poprzedniej. Zadziała, jeśli tylko jeden wątek zmienia wartość, a drugi wątek tylko odczytuje (nie zmienia). W kontekście Spirng Webflux nie mam pojęcia jak to zapewnić.

Zmieniłem kod z "auto countera" na taki, który POSTem ustawia licznik, GETem go zwraca:

W Controllerj.java doałem

    private HandlerFunction<ServerResponse> setCounterValue() {
        return request -> {
            final Mono<String> longMono = request.bodyToMono(String.class);
            return longMono
                    .map(stringValue -> Long.parseLong(stringValue))
                    .flatMap(longValue -> {
                        final long counterValue = this.counterService.setCounterValue(longValue);
                        return ServerResponse.ok().body(fromObject(counterValue));
                    });
        };
    }

CounterService zmieniłem na:

public class CounterService {

    private volatile Counter counter;

    public CounterService(Counter counter) {
        this.counter = counter;
    }

    public long getCounterValue() {
        System.out.println("pobieram countera simple post volatile: " + this.counter.counter);
        return this.counter.counter;
    }

    public long setCounterValue(long newValue) {
        this.counter = this.counter.updateCounter(newValue);
        System.out.println("zaktualizowalem countera simple post volatile: " + this.counter.counter);
        return this.counter.counter;
    }
}

A klasa Counter.java

public class Counter {

    public final long counter;

    public Counter(long counter) {
        this.counter = counter;
    }

    public Counter updateCounter(long i) {
        return new Counter(i);
    }
}

Puściłem JMeterem 100 wątków x 100 requestów na przemian POST z GETem, gdzie POST ustawia randomową liczbę. Do tego napisałem sobie analizatora logów:

  1. Bez volatile, synchronized itp:
    Łączna ilość znalezionych nieprawidłowych danych: 224

  2. synchronized na samego GETa lub POSTa nie wystarcza, ustawione na obydwóch metodach:
    Łączna ilość znalezionych nieprawidłowych danych: 0

  3. Zamiana w Counter.java long na AtomicLong oraz metody update:

    public Counter updateCounter(long i) {
        counter.set(i);
        return this;
    }

Łączna ilość znalezionych nieprawidłowych danych: 287 (!?)

  1. Z volatile tak jak wklejony kod:
    Łączna ilość znalezionych nieprawidłowych danych: 217

Pytanie

  1. Jak poprawnie użyć volatile?
  2. Czemu z AtomicLong też mi się krzaczy? :D

Za nieprawidłowe dane uznaję coś takiego w konsoli:

zaktualizowalem countera simple post volatile: 282
zaktualizowalem countera simple post volatile: 3249
pobieram countera simple post volatile: 282
pobieram countera simple post volatile: 282
0

Z atomic longiem, jeśli dobrze widzę, robisz dwie operacje, pobranie i ustawienie wartości, nie ma gwarancji, że one się razem wykonają w jednym wątku. Musiałbyś użyć którejś z atomowych operacji.

0
        this.counter = this.counter.updateCounter(newValue);
        System.out.println("zaktualizowalem countera simple post volatile: " + this.counter.counter);

Przy wypisywaniu powinieneś użyć newValue - wartość countera może już być inna - równolegle zmodyfikowana.
To samo przy pobieraniu - może zdarzyć się, że na konsolę wypiszesz jedną liczbę, a zwrócisz inną

0

@pustypawel: niestety, to też nie pomogło.

Wpakowałem się w temat wątków, o którym nie mam pojęcia, i teraz się męczę :D

Sprawdzanie wartości wpisanych w konsolę od początku wydawało mi się śmieszne i głupie. Pomyślałem, że skoro strzelam requestami z JMetera to tam powinienem sprawdzać i porównywać odpowiedzi. Dodałem więc Response Assertion i Assertion Results, porównałem wartości z odpowiedzi i hmm.. zawsze dostaję faile, nawet jak cały kod mam w synchronized. Pewnie coś źle sprawdzam, nie wiem.

Zamiast czepiać się Webfluxa pomyślałem, że jeśli CounterService będzie napisany dobrze to nie będzie miało to znaczenia, czy użyję go w Webfluxie czy w innym g^&nie, dlatego napisałem taki unit test:

@Test
    public void multipleThreadSetAndGetShouldReturnTheSameValue() throws ExecutionException, InterruptedException {
        int threads = 10;
        final Counter counter = new Counter(0);
        final CounterService counterService = new CounterService(counter);
        CountDownLatch latch = new CountDownLatch(10);
        ExecutorService executorService = Executors.newFixedThreadPool(threads);
        Collection<Future<String>> results = new ArrayList<>();
        AtomicLong failCounter = new AtomicLong(0);
        final Random random = new Random();

        for (int i = 0; i < threads; i++) {
            final long randomLong = random.nextLong();
            results.add(executorService.submit(() -> {
                latch.await(1, TimeUnit.SECONDS);
                latch.countDown();
                return checkCounterValue(counterService, randomLong);
            }));
        }

        for (Future<String> result : results) {
            if (result.get().startsWith("FAIL")) failCounter.incrementAndGet();
        }

        assertEquals(0, failCounter.get());

    }

    private String checkCounterValue(CounterService counterService, long randomLong) {
        counterService.setCounterValue(randomLong);
        final long counterValue = counterService.getCounterValue();
        if (counterValue != randomLong)
            return "FAIL = should be " + randomLong + ", actual " + counterValue + " w watku " + Thread.currentThread().getName();
        return "ok";
    }

Niestety, ten test raz przechodzi raz nie :) Czasem nawet wywali się jak w CounterService mam wszystkie metody synchronized(). Wklejam kod Counter i CounterService, czysta java, może ktoś będzie miał chwilę i podpowie jak poprawić ten test.

public class Counter {

    public  long counter;

    public Counter(long counter) {
        this.counter = counter;
    }

    public Counter updateCounter(long i) {
        return new Counter(i);
    }
}

public class CounterService {

    private Counter counter;

    public  CounterService(Counter counter) {
        this.counter = counter;
    }

    public synchronized long getCounterValue() {
        return this.counter.counter;
    }

    public synchronized long setCounterValue(long newValue) {
        this.counter = this.counter.updateCounter(newValue);
        return this.counter.counter;
    }
}
1

I znowu:

        counterService.setCounterValue(randomLong);
        final long counterValue = counterService.getCounterValue();

Między tymi wywołaniami inny wątek może wartość zmienić, dlatego nie ma pewności, że będą takie same :) Może inaczej - co chcesz uzyskać, bo teraz to już w ogóle nie rozumiem jaki jest cel tego testu?

        if (counterValue != randomLong)
            return "FAIL = should be " + randomLong + ", actual " + counterValue + " w watku " + Thread.currentThread().getName();

Ten fail jest zły - to jest normalna sytuacja, że inny wątek mógł wartość zmodyfikować, co więcej mamy 100% pewności że jeżeli zmodyfikował to ta zmiana będzie widoczna w naszym wątku (bo mamy synchronized)

0

Masz rację. To jak napisać test, który sprawdzi, że pobrana wartość jest na pewno aktualna?

Czyli sytuacja taka jak opisujesz. Kilka wątków zmienia wartość, ale pobrana wartość jest zawsze aktualna. Taki test da się w ogóle napisać? :)

0
@Test
    public void multipleThreadSetAndGetShouldReturnTheSameValue() throws ExecutionException, InterruptedException {
        int threads = 10;
        final Counter counter = new Counter(0);
        final CounterService counterService = new CounterService(counter);
        CountDownLatch latch = new CountDownLatch(threads);
        ExecutorService executorService = Executors.newFixedThreadPool(threads);
        Collection<Future<Long>> results = new ArrayList<>();
        AtomicLong sequence = new AtomicLong(0);

        for (int i = 0; i < threads; i++) {
            results.add(executorService.submit(() -> {
                latch.await(1, TimeUnit.SECONDS);
                latch.countDown();
                counterService.setCounterValue(sequence.getAndIncrement());
                return counterService.getCounterValue();
            }));
        }

        final Set<Long> uniqueResult = new HashSet<>();
        for (Future<Long> result : results) {
            uniqueResult.add(result.get());

        }

        assertEquals(threads, uniqueResult.size());
    }

Ten test już jest bliższy temu co chcę uzyskać.

Czy da się napisać taki test, który w 100/100 przypadków się wywali, jeśli zdejmę z metod synchronized w CounterService oraz 100/100 przypadków przejdzie, jeśli dodam synchronized do tych metod?

0

100/100 przypadków się wywali, jeśli zdejmę z metod synchronized

Otóż nie :) volatile/synchronized dają gwarancję widoczności zmiany (synchronized daje jeszcze blokadę), co nie znaczy że bez nich zmiana zawsze będzie niewidoczna (w szczególnym przypadku, gdy masz jeden procesor z jednym wątkiem to prawdopodobnie, volatile w ogóle nie potrzebujesz)

0
pustypawel napisał(a):

(w szczególnym przypadku, gdy masz jeden procesor z jednym wątkiem to prawdopodobnie, volatile w ogóle nie potrzebujesz)

Otóż nie. Nadal potrzebujesz. Możesz mieć więcej szcześcia, problem nie będzie często widoczny, ale java to nie c++. JVM to świnia.

0

No dobra, skoro nie mogę napisać testu, który 100/100 się wywali bez synchronized, to jak napisać test który 100/100 przejdzie z synchronized? Ten powyższy sporadycznie się wywala, mimo że kod CounterService jest prawidłowy. Chodzi o przypadki, kiedy wątki kilka razy pod rząd ustawią wartość.

Chodzi mi o test, który sprawdzi, że aktualnie pobrana wartość jest na pewno ostatnią ustawioną przez dowolny inny wątek.

Próbowałem jeszcze testy new Thread() i Thread.sleep(). Jeden wątek pobiera wartość i zasypia, drugi zmienia wartość, pierwszy wstaje i znowu pobiera i sprawdza czy jest nowa. Oczywiśćie to też mi nie działa :D

1 użytkowników online, w tym zalogowanych: 0, gości: 1