Stream API'si, çıkış üretmeden önce tüm girişi işlemesi gerekip gerekmediğine dayanarak, durumdan bağımsız işlemleri (filter, map) ve durumsal işlemleri (sorted, distinct, limit) ayırır. Paralel olarak yürütüldüğünde, çerçeve kaynak verileri birden fazla iş parçacığına bölerek her biri bağımsız bir segment işlemesi sağlar. Eğer kaynak Spliterator ORDERED özelliğini bildirirse, çerçeve karşılaşma sırasının (elementlerin kaynakta göründüğü sıralama) önemli olduğu ve boru hattı boyunca korunması gerektiğini varsayar.
Ancak, distinct gibi durumsal işlemler, tekrarları filtrelemek için genel bir duruma (görülen elemanların bir Set'i) dayanır. Açık bir karşılaşma sırası zorunluluğu olmadan, paralel iş parçacıkları "ilk" gerçekleşme olarak elemanları talep etmek için yarışabilir, bu da hangi tekrarın hayatta kalacağını rastgele seçmekle sonuçlanabilir. Benzer şekilde, sorted küresel bir sıralama gerektirir, ancak eğer stream sırasız olarak işaretlenmişse veya kaynak ORDERED özelliğine sahip değilse, paralel iş parçacıklarının ara sonuçları, pozisyon korunmadan birleştirilebilir. Bu, eşit elemanların farklı göreli sıralamalarını veya, sıradan durumlarda, çıkış dizisinde belirgin bir belirsizlik oluşturabilir.
Çözüm, Spliterator sözleşmesini dikkate almaktan geçer: Eğer karşılaşma sırası önemliyse, kaynak ORDERED olarak bildirilmelidir ve boru hattı bir durumsal işlemden önce unordered() çağırmamalıdır. distinct işlemi için bu, karşılaşma sırasındaki "ilk" gerçekleşmenin belirleyici bir şekilde seçilmesini sağlar, bu da stream segmentlerinin sıralı sırayla işlenmesini gerektirir, bu aşamada paralelliği etkili bir şekilde azaltır. Eğer sıra önemsizse, açıkça unordered() çağrısı yaparak çerçevenin rastgele tekrarları seçmesine ve kısmi sonuçları senkronizasyon olmadan birleştirmesine izin verilir, bu da performansı artırır ancak belirlenebilirliğin maliyetiyle.
Bir telemetri işleme sistemi, her biri nan saniye zaman damgası ve benzersiz bir sensör ID'si ile etiketlenmiş milyonlarca sensör olayını tüketmiştir. Gereksinim, her ID için kronolojik olarak ilk olayı koruyarak olayları sensör ID'sine göre tekrarsız hale getirmek ve ardından geri kalanları zaman damgasına göre sıralamaktı. İlk uygulama, sensorReadings.parallelStream().distinct().sorted() kullanarak ArrayList kaynağının ekleme sırasını koruyacağını varsayarak distinct'in doğal olarak ilk gerçekleşmeyi koruyacağını düşündü.
Sorun, testlerde "ilk" olayın, çok çekirdekli donanımda çalıştığında, orijinal listede ikinci veya üçüncü gerçekleşme olabileceği şeklinde rastgele olarak belirmesiyle ortaya çıktı. İnceleme sonucunda, sorun, karşılaşma sırası zorunluluğu olmaksızın paralel olarak yürütülen distinct işleminin nedeniydi; her iş parçacığı listenin bir parçasını işleyerek kendi yerel "ilk" karşılaşmasını tuttu. Çerçeve bu kısmi sonuçları birleştirdiğinde, iş parçacıklarının küresel sıralaması garanti edilmediği için, iş parçacığına özgü birincilerin rastgele seçilmesi gerçekleşiyordu.
Üç çözüm değerlendirildi. İlk yaklaşım, paralelliği tamamen terk ederek sıralı bir akışa geri döndü. Bu, sıralı davranışı geri getirdi ve listenin en erken olayının her zaman kazanan olmasını sağladı. Ancak, bu, yüksek yük altında işlem gecikmesini %400 artırdı, bu da çıktıkları SLA'larını ihlal etti ve bütçelenmemiş donanım yükseltmelerini zorunlu kıldı.
İkinci yaklaşım, distinct'ten önce .unordered() ekleyerek, herhangi bir tekrarın kabul edilebilir olduğunu açıkça belirtti. Bu, iş parçacıklarının koordinasyon olmadan rastgele tekrarları atmalarına izin vererek verimliliği maksimize etti. Ne yazık ki, bu, en erken okumayı koruma iş gereksinimini ihlal etti ve denetim izini için kabul edilemez hale getirdi.
Üçüncü yaklaşım, bir LinkedHashSet kullanarak aşağı akış toplayıcı olarak Collectors.toCollection(LinkedHashSet::new) yoluyla bir collect işlemi gerçekleştirdi. Bu, akışı sıralı bir sete dönüştürdü, aynı zamanda önceki filtre işlemleri için paralel ayrıştırmaya izin verdi. Ancak, bu, ara distinct işlemini terk etmeyi gerektiriyordu ve tekrarsız hale getirmeden önce tam çalışma setini tutmak için önemli ölçüde daha fazla bellek tüketiyordu.
Seçilen çözüm, sıralı ve sırasız aşamaları ayıracak şekilde boru hattının yeniden yapılandırılmasını içeriyordu. Sistem ilk olarak stateless filtreleme ve haritalamayı paralel olarak uyguladı, ardından açıkça .sequential() ile sıralı bir akışa geçiş yaptı ve sonra distinct ve sorted çağrısı yaptı. Bu hibrit yaklaşım, sıralı tıkanıklığı yalnızca durumsal son kısmıyla sınırlayarak paralel verimliliğin %70'ini korurken karşılaşma sırasını garanti etti.
Sonuç, her sensör olayının ilk gerçekleşmesini doğru bir şekilde belirleyen, kararlı, belirlenebilir bir boru hattıydı. İşlem hızları kabul edilebilir düzeyde kaldı ve hata oranı sıfıra düşerken gecikme operasyonel sınırlar içinde kaldı.
Neden paralel akışlarda forEachOrdered terminal işleminin forEach'ten önemli ölçüde daha yüksek bir yük taşıdığını ve ne zaman kesinlikle gerekli olduğunu?
forEach, paralel iş parçacıklarından mevcut olan elemanları koordinasyonsuz işleyerek en yüksek verimliliği sağlar. Bu yaklaşım verimliliği maksimize eder ancak potansiyel olarak çıktının iş parçacığı-geliş sırasına göre üretilmesine neden olabilir. forEachOrdered, aksine, orijinal karşılaşma sırasını yeniden oluşturmak zorundadır, bu nedenle çerçeve, sonuçları tamponlamak zorunda kalır ve daha hızlı iş parçacıklarının daha yavaş olanları bekleyerek tıkanıklık yaratmasına neden olabilir.
Yan etkilerin işlenmesi, kaynak sırasını gözlemlemek zorunda olduğunda kesinlikle gereklidir. Örnekler arasında bir dosya veya GUI liste modeline duyarlı bir çıktıya yazmak yer alır. Sıralara duyarsız yan etkiler için, loglama veya eşzamanlı bir koleksiyona toplama gibi, forEach tercih edilir.
Paralel yürütme sırasında ince yarış koşullarını önleyen reduce işleminin bir birleşik akümülatör fonksiyonu gereksinimi nedir ve bu kısıtlama ihlal edildiğinde ne olur?
reduce işlemi, akışı segmentlere böler, her segmentin içine akümülatorü uygulayarak kısmi sonuçlar üretir ve ardından bu kısmi sonuçları aynı akümülatör (veya ayrı bir birleştirici) kullanarak birleştirir. Birleşilebilirlik, ((a op b) op c) = (a op (b op c)) şeklinde ifade edilir. Bu özellik, segmentlerde eleman gruplama ve kısmi sonuçları birleştirme sırasının belirsiz ve uygulamaya bağlı olmasından dolayı gereklidir.
Eğer işlem birleşilebilir değilse (örneğin, pozisyona göre değişken bir sınırlayıcı ile string birleştirme), paralel yürütme, elemanları sıralı bir yürütme ile farklı şekillerde gruplandırabilir. Bu, karışık sınırlayıcılar veya birleşilebilir olmayan özel sayı türleri için matematiksel olarak hatalı toplamlar gibi yanlış sonuçlara yol açabilir.
Kısa devre oluşturan işlemler ile findFirst ve sonsuz akışların belirli etkileşimi, bir paralel akışın potansiyel olarak sonsuza dek takılması neden olurken, sıralı bir akış hemen nasıl sona erer?
Sıralı bir akışta, findFirst koşul tekniği eşleştiğinde sonsuz bir akışta bile sona erebilir. Bir paralel akışta, çerçeve kaynakları birden fazla segmente böler ve farklı iş parçacıkları tarafından işlenir. Eğer eşleşen eleman, yavaş bir iş parçacığı tarafından işlenen bir segmentte yer alıyorsa, findFirst o iş parçacığının segmentinin tamamlanmasını (veya elemanı bulmasını) beklemek zorundadır; çünkü daha önce başka segmentlerde herhangi bir elemanın mevcut olmadığını garanti etmelidir, bu da karşılaşma sırasını koruma gereksinimidir.
Eğer akış sırasız ise ya da bunun yerine findAny kullanılıyorsa, işlem herhangi bir eşleşme üzerine hemen sona erebilir ve ana iş parçacığı bekleyen görevleri iptal edebilir. Adaylar genellikle, sıralı paralel sonsuz akışlardaki findFirst işleminin etkili bir küresel engel olduğunu ve karşılaşmadan önceki segmentlerin sonsuz veya hesaplama açısından sınırsız olması durumunda kilitlenebileceğini kaçırır.