Przed Javą 8, równoległe przetwarzanie kolekcji wymagało ręcznego zarządzania Thread lub jawnego przesyłania do ExecutorService, zmuszając programistów do ręcznego dzielenia pracy i synchronizacji. Wprowadzenie API Stream w Javie 8 zastało zbudowane na paralelizmie poprzez interfejs Spliterator, który opiera się na cechach takich jak SIZED, aby wskazać znane liczby elementów. Ta cecha pozwala frameworkowi na wykonanie zrównoważonego dzielenia binarnego, tworząc optymalne drzewa zadań dla ForkJoinPool.
Gdy Spliterator nie ma cechy SIZED — co jest częste w funkcjach generatorowych, strumieniach opartych na Iterator lub nieskończonych sekwencjach — framework nie może wykonać dzielenia binarnego (podział na dwa), aby utworzyć zrównoważone drzewa zadań. Ślepe dzielenie może generować miliony małych zadań (eksplozja granularności), powodując narzut koordynacji, który dominowałby czas wykonania, lub zbyt duże kawałki, które pozostawiają wątki robocze bezczynne, podczas gdy jeden wątek przetwarza ogromny zapas. Ta nieprzewidywalność łamie zasadnicze założenie równoległości fork-join: że praca może być podzielona na z grubsza równe podzadania.
Framework stosuje geometralne grupowanie przez domyślną implementację IteratorSpliterator. Zamiast dzielić na pół, wykorzystuje wykładniczo rosnące rozmiary partii (1, 2, 4, 8, aż do MAX_BATCH), co amortyzuje koszty dzielenia, ograniczając tworzenie zadań do logarytmicznej głębokości. ForkJoinPool rekompensuje nieznane rozmiary, stosując kradzież pracy, gdzie preferowane są lekkie zadania, a AbstractTask oblicza sygnały zakończenia bez potrzeby całkowitych informacji o rozmiarze. Dla uporządkowanych strumieni nienadzorowanych potok buforuje elementy w ArrayList podczas dzielenia, aby zachować kolejność napotkania, zamieniając pamięć na bezpieczeństwo równoległości.
System telemetrii przetwarza dane z czujników w czasie rzeczywistym, które przychodzą przez połączenie Socket. Dane są przekazywane jako ciągły strumień obiektów JSON, a wymaganie biznesowe wymaga równoległej analizy i filtrowania tych obiektów w celu minimalizacji opóźnienia przed przechowywaniem. Wyzwanie polega na nieprzewidywalnej szybkości przybywania i całkowym wolumenie danych.
Pierwsza implementacja otaczała InputStream w BufferedReader i wykorzystywała lines().parallel(). Jednak profilowanie wydajności ujawniło, że równoległy strumień był znacznie wolniejszy niż przetwarzanie sekwencyjne z powodu nadmiernego narzutu przy tworzeniu zadań. Przyczyną były leżący u podstaw Spliterator z BufferedReader.lines(), który nie ma cechy SIZED i początkowo raportuje Long.MAX_VALUE jako szacunkową wartość, co sprawia, że framework tworzy mikrozadania dla poszczególnych linii.
Jednym z podejść było buforowanie całego strumienia w ArrayList<String> przed równoległym przetwarzaniem. To dałoby cechę SIZED i umożliwiłoby idealne dzielenie binarne na rdzenia CPU. Jednak wprowadziło to nieakceptowalne opóźnienie — dane nie mogły być przetwarzane, dopóki nie dotarła cała partia — oraz spowodowało poważne problemy pamięciowe przy obsłudze milionów wydarzeń na minutę, praktycznie negując paradygmat strumieniowy.
Innym rozważanym rozwiązaniem była implementacja niestandardowego Spliterator, który zawsze wydzielałby kawałki o stałej wielkości wynoszące dokładnie 1000 linii, niezależnie od leżącego u podstaw strumienia. Chociaż to zapewniało przewidywalne rozmiary zadań, zawiodło, gdy czas przetwarzania na linię znacząco się różnił; jeden pracownik mógł otrzymać 1000 złożonych obiektów, podczas gdy inny otrzymał 1000 prostych, co prowadziło do poważnej nierównowagi obciążenia i bezczynnych rdzeni CPU czekających na najwolniejsze zadanie.
Wybrane rozwiązanie polegało na wdrożeniu niestandardowego Spliterator naśladującego standardową strategię geometralnego grupowania. Śledził zmienną batch zaczynającą od 1, podwajającą się przy każdym udanym dzieleniu, aż do maksymalnej wartości 1024, co pozwala frameworkowi dostosować się do rzeczywistej długości strumienia bez wcześniejszej wiedzy. To podejście zrównoważyło początkowy narzut małych zadań z wydajnością większych partii w miarę postępu strumienia.
Podejście geometralnego grupowania osiągnęło 3,5-krotne przyspieszenie na systemie z 8 rdzeniami w porównaniu do przetwarzania sekwencyjnego. Użycie pamięci pozostało stałe niezależnie od czasu trwania strumienia, a opóźnienie pozostało niskie, gdyż przetwarzanie rozpoczęło się natychmiast, bez czekania na pełną materializację. Adaptacyjne rozmiarowanie uniemożliwiło eksplozję granularności, która dręczyła pierwotną implementację.
Dlaczego owijanie synchronizowanej kolekcji w równoległym strumieniu często zmniejsza wydajność w porównaniu do sekwencyjnego odpowiednika, nawet w przypadku operacji wymagających dużej mocy CPU?
Wielu kandydatów zakłada, że Collections.synchronizedList() lub synchronizowane implementacje Map są bezpieczne dla równoległych strumieni. Jednak, podczas gdy Spliterator tych kolekcji raportuje SIZED, synchronizacja wewnętrzna przy każdym dostępie tworzy ogromny ruch związany z spójnością pamięci podręcznej. Kiedy wiele wątków ForkJoinPool współzawodniczy o ten sam monitor dla każdego elementu, koszt synchronizacji i przełączania kontekstu przewyższa wszelkie zyski z równoległości. Prawidłowe podejście wymaga użycia ConcurrentHashMap lub CopyOnWriteArrayList (jeśli zapisy są rzadkie) lub zapewnienia, że źródłowa kolekcja jest niezakłócona i dostępna za pomocą cech Spliterator bezpiecznych dla wątków, takich jak CONCURRENT.
Jak cecha ORDERED współdziała z nienadzorowanymi strumieniami, aby potencjalnie serializować operację końcową, a dlaczego to pogarsza sort() ?
Kandydaci często nie zauważają, że ORDERED w połączeniu z brakiem SIZED zmusza framework do buforowania wszystkich elementów przed zakończeniem przetwarzania, szczególnie dla operacji stanowych, takich jak sorted() lub distinct(). Bez wiedzy o całkowym rozmiarze, framework nie może z góry przydzielić końcowej tablicy dla toArray() ani buforów do sortowania, a zamiast tego gromadzi elementy w liście powiązanej lub dynamicznie zmieniającym się ArrayList, co skutkuje efektywnym seriowym zakończeniem potoku. To oznacza, że przyspieszenie równoległe jest ograniczone do etapów mapujących/filtrowania, podczas gdy etap końcowy staje się wąskim gardłem jednowątkowym, czekającym na pełny zbiór danych.
Jakie konkretne naruszenie umowy występuje, jeśli metoda trySplit() niestandardowego Spliteratora zwraca Spliterator, który raportuje inny zestaw cech niż nadrzędny?
Delikatny błąd występuje, gdy programiści nadpisują trySplit(), ale nie zachowują spójności cech. Umowa Spliterator wymaga, aby zwrócony spliterator miał te same cechy dotyczące porządku, unikalności i sortowania. Jeśli nadrzędny raportuje ORDERED, ale dziecko (wynik podziału) nie, optymalizacje frameworka Stream mogą eliminować kroki sortowania lub zmieniać kolejność operacji, prowadząc do błędnych wyników. Cechy muszą być stabilne w trakcie podziałów, ponieważ potok optymalizuje fuzję (np. łączenie filter i map) w oparciu o te flagi, a niespójne flagi łamią relacje wymagań przed i po, wymagane dla poprawności równoległej.