API Stream rozróżnia operacje bezstanowe (filtr, map) i operacje ze stanem (sortowanie, unikalność, limit) na podstawie tego, czy muszą przetworzyć całe dane wejściowe przed wytworzeniem wyjścia. Podczas wykonywania w sposób równoległy, framework dzieli dane źródłowe na wiele wątków, z których każdy przetwarza segment niezależnie. Jeśli Spliterator zgłasza cechę ORDERED, framework zakłada, że kolejność napotkania (kolejność, w jakiej elementy pojawiają się w źródle) ma znaczenie i musi być zachowana w całym pipeline.
Jednakże operacje ze stanem, takie jak unikalność, polegają na globalnym stanie (zbiorze już widzianych elementów), aby odfiltrować duplikaty. Bez wyraźnego egzekwowania kolejności napotkania, równoległe wątki mogą rywalizować o to, który element zostanie uznany za „pierwsze” wystąpienie, co prowadzi do arbitralnego wyboru, który duplikat przetrwa. Podobnie, sortowanie wymaga globalnego posortowania, ale jeśli strumień jest oznaczony jako nieuporządkowany lub źródło nie ma cechy ORDERED, wstępne wyniki z równoległych wątków mogą być scalane bez zachowania pozycji. Może to prowadzić do różnych względnych porządków równych elementów lub, w skrajnych przypadkach, do pozornych niedeterministycznych wyników w sekwencji wyjściowej.
Rozwiązanie polega na przestrzeganiu umowy Spliteratora: jeśli kolejność ma znaczenie, źródło musi zadeklarować ORDERED, a pipeline nie może wywoływać unordered() przed operacją ze stanem. Dla unikalności zapewnia to, że „pierwsze” wystąpienie w kolejności napotkania jest deterministycznie wybierane przez przetwarzanie segmentów strumienia w kolejności sekwencyjnej, co skutecznie zmniejsza równoległość dla tego etapu. Jeśli kolejność jest nieistotna, wyraźne wywołanie unordered() pozwala frameworkowi zoptymalizować wybór arbitralnych duplikatów i scalanie wyników częściowych bez synchronizacji, poprawiając wydajność kosztem deterministyczności.
System przetwarzania telemetrycznego przetwarzał miliony zdarzeń czujników, z których każde miało znacznik nanosekundowy i unikalny identyfikator czujnika. Wymogiem było odfiltrowanie duplikatów zdarzeń według identyfikatora czujnika, zachowując jednocześnie chronologicznie pierwsze zdarzenie dla każdego identyfikatora, a następnie sortując resztę według znaczników czasowych. Początkowa implementacja wykorzystywała sensorReadings.parallelStream().distinct().sorted(), zakładając, że źródło ArrayList utrzymuje porządek wstawiania i że unikalność naturalnie zachowa pierwsze wystąpienie.
Problem objawił się jako sporadyczne błędy testów, gdy „pierwsze” zdarzenie dla danego identyfikatora czujnika losowo było drugą lub trzecią wystąpieniem na oryginalnej liście podczas uruchamiania na sprzęcie wielordzeniowym. Po przeprowadzeniu dochodzenia problem został przypisany do unikalności działającej równolegle bez egzekwowania kolejności napotkania; każdy wątek przetwarzał kawałek listy i przechowywał swoje własne lokalne „pierwsze” napotkanie każdego identyfikatora. Gdy framework scalał te częściowe wyniki, globalne porządki wątków nie były gwarantowane, powodując arbitralny wybór spośród lokalnych pierwszych.
Ocena obejmowała trzy rozwiązania. Pierwsze podejście całkowicie porzuciło równoległość, przywracając sekwencyjny strumień. Przywróciło to deterministyczne zachowanie, zapewniając, że najwcześniejsze zdarzenie na liście zawsze wygrywa. Jednak zwiększyło to latencję przetwarzania o 400% podczas szczytowego obciążenia, naruszając umowy SLA dotyczące przepustowości i wymuszając modernizacje sprzętowe, które nie były budżetowane.
Drugie podejście wprowadziło .unordered() przed unikalnością, wyraźnie sygnalizując, że każdy duplikat jest akceptowalny. To zmaksymalizowało przepustowość, pozwalając wątkom na odrzucanie dowolnych duplikatów bez koordynacji. Niestety, naruszyło to wymóg biznesowy dotyczący zachowania najwcześniejszego odczytu, czyniąc to podejście nieakceptowalnym dla ścieżki audytowej.
Trzecie podejście wykorzystało LinkedHashSet jako zbieracz w dół za pomocą Collectors.toCollection(LinkedHashSet::new) w ramach operacji collect. To zmaterializowało strumień w uporządkowanym zbiorze, jednocześnie pozwalając na równoległe dekompozycje dla wcześniejszych operacji filtrujących. Jednak wymagało to porzucenia pośredniej operacji unikalności i pochłonęło znacznie więcej pamięci, aby przechować pełny zestaw roboczy przed deduplikacją.
Wybrane rozwiązanie polegało na przekształceniu pipeline w celu oddzielenia uporządkowanych i nieuporządkowanych faz. System najpierw zastosował bezstanowe filtrowanie i mapowanie równolegle, a następnie wyraźnie przeszedł do sekwencyjnego strumienia za pomocą .sequential() przed wywołaniem unikalności i sortowania. To hybrydowe podejście ograniczyło sekwencyjny wąskie gardło tylko do stateful terminowego fragmentu, zachowując 70% przepustowości równoległej przy jednoczesnym gwarantowaniu kolejności napotkania.
Wynikiem był stabilny, deterministyczny pipeline, który poprawnie identyfikował pierwsze wystąpienie każdego zdarzenia czujnika. Prędkości przetwarzania pozostały akceptowalne, a wskaźnik defektów zmniejszył się do zera, podczas gdy latencja pozostała w granicach progów operacyjnych.
Dlaczego operacja końcowa forEachOrdered generuje znacznie wyższe narzuty niż forEach w strumieniach równoległych, i kiedy jest ściśle konieczna?
forEach przetwarza elementy w miarę ich dostępności z równoległych wątków bez koordynacji. To podejście maksymalizuje przepustowość, ale potencjalnie produkuje wyniki w kolejności przybycia wątków. forEachOrdered, w przeciwieństwie, musi odbudować oryginalną kolejność napotkania, co wymaga od frameworka buforowania wyników i potencjalnego wstrzymywania szybkich wątków, aby czekały na wolniejsze, które mają wcześniejsze elementy, tworząc wąskie gardło synchronizacji.
Jest ściśle konieczne tylko wtedy, gdy efekty uboczne przetwarzania muszą obserwować kolejność źródła. Przykłady obejmują zapis do wrażliwego na pozycję wyjścia, takiego jak plik lub model listy GUI. Dla efektów ubocznych, które nie są wrażliwe na kolejność, takich jak logowanie lub sumowanie w kolekcji współbieżnej, preferowany jest forEach.
Jak wymaganie operacji reduce dotyczące asocjacyjnej funkcji akumulatora zapobiega subtelnym warunkom wyścigu podczas wykonania równoległego i co się stanie, jeśli to ograniczenie zostanie naruszone?
Operacja reduce dzieli strumień na segmenty, stosuje akumulator do każdego segmentu w izolacji, aby uzyskać wyniki częściowe, a następnie łączy te wyniki częściowe przy użyciu tego samego akumulatora (lub osobnego łącza). Asocjacyjność zapewnia, że ((a op b) op c) równa się (a op (b op c)). Ta właściwość jest wymagana, ponieważ grupowanie elementów w segmentach i kolejność łączenia wyników częściowych jest niedeterministyczna i zależna od implementacji.
Jeśli operacja jest nieasocjacyjna (np. konkatenacja ciągów z separatorem, który różni się w zależności od pozycji), równoległe wykonanie może grupować elementy inaczej niż sekwencyjne wykonanie. Może to prowadzić do błędnych wyników, takich jak pomieszane separatorem lub matematycznie błędne sumy dla nieregularnych typów liczbowych.
Jaka konkretna interakcja pomiędzy operacjami krótkozasięgowymi, takimi jak findFirst, a nieskończonymi strumieniami powoduje, że równoległy strumień potencjalnie może zawiesić się na czas nieokreślony, podczas gdy sekwencyjny strumień zakończyłby się natychmiast?
W sekwencyjnym strumieniu findFirst może zakończyć się, gdy tylko pasuje do predykatu, nawet na nieskończonym strumieniu. W równoległym strumieniu framework rozdziela źródło na wiele segmentów przetwarzanych przez różne wątki. Jeśli odpowiadający element znajduje się w segmencie przetwarzanym przez wolny wątek, findFirst musi czekać, aż ten wątek zakończy swój segment (lub znajdzie element), aby upewnić się, że żaden wcześniejszy element nie istnieje w innych segmentach, ponieważ musi respektować kolejność napotkania.
Jeśli strumień jest nieuporządkowany lub użyto zamiast tego findAny, operacja może zakończyć się natychmiast po jakimkolwiek dopasowaniu, pozwalając głównemu wątkowi na anulowanie oczekujących zadań. Kandydaci często przeoczają, że findFirst w uporządkowanych równoległych nieskończonych strumieniach jest w rzeczywistości globalną barierą, która może zablokować, jeśli segmenty przed dopasowaniem są nieskończone lub obliczeniowo nieograniczone.