Kiedy ThreadPoolExecutor saturuje swoje główne wątki i ograniczoną kolejkę, CallerRunsPolicy deleguje odrzucone zadanie do wątku zgłaszającego do natychmiastowego wykonania. Jeśli ten wątek zgłaszający wywołał Future.get(), aby synchronizacyjnie czekać na wynik zadania, które właśnie zgłosił, a logika zgłoszonego zadania wewnętrznie zgłasza dodatkowe zadania do tego samego wykonawcy i czeka na ich zakończenie, dochodzi do cyklicznego oczekiwania.
Wątek zgłaszający nie może powrócić z get() do momentu, gdy jego zadanie zostanie zakończone, jednak zadanie nie może się zakończyć, ponieważ czeka na podzadania, które są wciąż w kolejce za nim. Żadne wątki robocze nie są dostępne do opróżnienia kolejki, ponieważ wszystkie są zajęte innymi zadaniami. Efektywnie blokuje to wątek zgłaszający, gdyż jest zarówno jedynym wątkiem zdolnym do wykonania kolejnych podzadań (w ramach polityki), jak i jednocześnie zablokowanym w oczekiwaniu na zakończenie tych podzadań.
Zmierzyliśmy się z tym w rozproszonym potoku przetwarzania dokumentów, gdzie ThreadPoolExecutor z CallerRunsPolicy obsługiwał zadania renderowania PDF. Każde zadanie dokumentu analizowało metadane i generowało podzadania do ekstrakcji obrazów, a następnie natychmiast wywoływało Future.get() na tych podzadaniach, aby zebrać ostateczny wynik.
Pod wysokim obciążeniem kolejka nasyciła się, co spowodowało, że CallerRunsPolicy wykonał zadanie dokumentu w wątku obsługi żądań internetowych. Ten wątek następnie zgłosił zadania ekstrakcji obrazów i zablokował się na get(), ale wszystkie wątki robocze były zajęte innymi dokumentami. Nowe podzadania siedziały na końcu kolejki, nieprzypisane.
Wątek obsługi nie mógł wykonać podzadań, ponieważ był zablokowany w oczekiwaniu na nie, a podzadania nie mogły się wykonać, ponieważ żadne wątki nie były wolne. To stworzyło wzmacniające się zablokowanie, które sparaliżowało usługę, aż ręczna interwencja ponownie uruchomiła JVM.
Poniższy kod ilustruje niebezpieczny wzór:
ExecutorService executor = new ThreadPoolExecutor( 2, 2, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(2), new ThreadPoolExecutor.CallerRunsPolicy() ); // Zgłoszone z głównego wątku obsługi żądań Future<?> parent = executor.submit(() -> { // Kiedy pula jest nasycona, to działa w wątku obsługi (CallerRunsPolicy) Future<?> child = executor.submit(() -> "wyekstrahowany obraz"); // Wątek obsługi blokuje się tutaj, czekając na dziecko // Ale dziecko jest w kolejce, a żadne wątki robocze nie są wolne // Obsługa nie może uruchomić dziecka, ponieważ jest zablokowana return child.get(); }); parent.get(); // Zablokowanie: wątek obsługi czeka wiecznie
Oceniliśmy cztery różne rozwiązania architektoniczne. Pierwsze podejście zastąpiło CallerRunsPolicy na AbortPolicy i wdrożyło pętlę ponownego próbowania z wykładniczym zwiększeniem w kliencie. To zachowało dostępność wątku zgłaszającego, ale wprowadziło przejrzyste błędy i skomplikowaną logikę ponownego próbowania, co skomplikowało gwarancje idempotencji.
Drugie rozwiązanie rozszerzyło na nieograniczoną LinkedBlockingQueue, aby całkowicie zapobiec nasyceniu. Chociaż to wyeliminowało odrzucenie, narażało na OutOfMemoryError w czasie skoków ruchu i maskowało sygnały odrzucenia, prowadząc do nadmiernej latencji zamiast wyraźnej awarii.
Trzecia opcja zachowała ograniczoną kolejkę, ale znacznie zwiększyła maximumPoolSize powyżej corePoolSize, polegając na proliferacji wątków, aby wchłonąć obciążenie. To poprawiło przepustowość kosztem nadmiernego przełączania kontekstu i zużycia pamięci, ostatecznie pogarszając wydajność z powodu trzaskania pamięci podręcznej CPU.
Czwarte podejście przekształciło przepływ pracy za pomocą ExecutorCompletionService i asynchronicznych wywołań zwrotnych zamiast synchronicznego Future.get(). To pozwoliło oryginalnemu zadaniu dokumentu zwolnić wątek roboczy po zgłoszeniu podzadania i wznowić tylko wtedy, gdy CompletionService zasygnalizowało zakończenie.
Wybraliśmy czwarte rozwiązanie, ponieważ zasadniczo rozdzieliło zgłoszenie od zakończenia. To zachowało ciśnienie zwrotne z ograniczonej kolejki, eliminując warunek cyklicznego oczekiwania, umożliwiając wątkom roboczym recykling do przetwarzania podzadań, podczas gdy oryginalne zadanie czekało na powiadomienie na lekkiej zmiennej warunkowej.
Ta zmiana rozwiązała zablokowania, zredukowała średnią latencję o czterdzieści procent i utrzymała stabilne wykorzystanie pamięci w czasie szczytowego obciążenia bez poświęcania semantyki błędu ograniczonej kolejki.
Dlaczego ThreadPoolExecutor odmawia tworzenia wątków poza corePoolSize przy skonfigurowanej nieograniczonej BlockingQueue?
Wykonawca próbuje utworzyć nowe wątki tylko wtedy, gdy execute() nie może natychmiast przekazać zadania do oczekującego wątku roboczego lub wprowadzić go do kolejki. Metoda offer() nieograniczonej kolejki nigdy nie zwraca false, więc wykonawca nigdy nie dostrzega nasycenia i w konsekwencji nigdy nie alokuje wątków powyżej liczby podstawowej. Ten projekt zakłada, że kolejkowanie jest preferowane w stosunku do tworzenia wątków w zarządzaniu zasobami, jednak tworzy to pustą przestrzeń, gdzie pula wydaje się być niedostatecznie wykorzystywana pomimo oczekującej pracy. Kandydaci często błędnie zakładają, że maximumPoolSize działa jako twardy sufit niezależnie od pojemności kolejki, nie rozpoznając, że ograniczenia kolejki działają jako strażnik rozwoju wątku.
Jak działa CallerRunsPolicy jako domyślny mechanizm kontroli przepływu, a nie tylko jako obsługa odrzucania?
Wykonując zadanie w wątku zgłaszającym, polityka zmusza ten wątek do wstrzymania swojego wskaźnika zgłoszeń i wykonania pracy, naturalnie ograniczając przepływ przychodzący do dopasowania pojemności przetwarzania puli. To ciśnienie zwrotne propaguje się w górę stosu wywołań do pierwotnego producenta, spowalniając go bez wyraźnego kodu limitującego. Wiele osób postrzega politykę tylko jako zabezpieczenie dla utraconych zadań, nie dostrzegając, że celowo blokuje producenta, aby zapobiec wyczerpaniu zasobów. Zrozumienie tej różnicy semantycznej jest kluczowe dla projektowania systemów, gdzie latencja jest preferowana w stosunku do całkowitego odrzucenia pod ciężkimi obciążeniami.
Jakie subtelne interakcje między shutdown() a CallerRunsPolicy zapobiegają eleganckiej degradacji podczas zakończenia wykonawcy?
Po wywołaniu shutdown(), wykonawca przechodzi do stanu, w którym nowe zgłoszenia są odrzucane za pomocą RejectedExecutionException, omijając całkowicie skonfigurowaną politykę odrzucania. Kandydaci często zakładają, że CallerRunsPolicy będzie nadal wykonywać zadania w wywołującym podczas wyłączenia, ale wykonawca sprawdza stan zamknięcia przed skonsultowaniem polityki. Oznacza to, że zadania zgłoszone podczas fazy eleganckiego zamknięcia nie udają się natychmiast, zamiast być wykonywane przez wywołującego, co potencjalnie prowadzi do utraty pracy w ruchu, jeśli klient nie obsługuje wyjątku. Prawidłowe sekwencjonowanie wyłączania wymaga opróżnienia kolejki za pomocą awaitTermination() lub uchwycenia odrzuconych zadań w strukturze awaryjnej, ponieważ mechanizm polityki jest dezaktywowany po ustawieniu flagi wyłączenia.