Stream APIは、無状態操作(filter, map)と状態を持つ操作(sorted, distinct, limit)を入力全体を処理する必要があるかどうかに基づいて区別します。並列で実行されると、フレームワークはソースデータを複数のスレッドに分割し、それぞれが独立してセグメントを処理します。もしソースSpliteratorがORDERED特性を報告する場合、フレームワークは遭遇順序(ソース内に要素が現れる順序)が重要であり、パイプライン全体で保持する必要があると仮定します。
しかし、distinctのような状態を持つ操作は、重複をフィルタリングするためにグローバル状態(見た要素のSet)に依存します。明示的な遭遇順序の強制がない場合、並列スレッドは「最初」の発生を主張するために競争し、どの重複が生き残るかの選択が任意になる可能性があります。同様に、sortedはグローバルなソートを必要としますが、ストリームが順序無視としてマークされているか、ソースがORDERED特性を欠いている場合、並列スレッドからの中間結果が位置を保持せずにマージされる可能性があります。これにより、同等の要素の相対的な順序が異なる場合や、劣化したケースでは出力シーケンスに見える非決定性が生じることがあります。
この問題の解決策は、Spliteratorの契約を尊重することにあります:遭遇順序が重要であるならば、ソースはORDEREDを宣言する必要があり、パイプラインは状態を持つ操作の前に**unordered()を呼び出してはなりません。distinctに対して、これは遭遇順序の「最初」の発生が順序に従って決定的に選択されることを保証し、その段階では並列処理が効果的に減少します。順序が無関係な場合は、明示的にunordered()**を呼び出すことにより、フレームワークは任意の重複を選択して部分結果を同期なしでマージすることで最適化し、性能を向上させることができますが、決定性のコストがかかります。
テレメトリ処理システムは、ナノ秒のタイムスタンプとユニークなセンサーIDでタグ付けされた数百万のセンサーイベントを取り込みました。要件は、各IDに対して最初のイベントを時間順に保持しながら、センサーIDでイベントの重複を削除することでした。そして、残りをタイムスタンプでソートすることが求められました。最初の実装は、**sensorReadings.parallelStream().distinct().sorted()**を利用し、ArrayListソースが挿入順序を維持し、distinctが自然に最初の発生を保持することを前提としていました。
問題は、マルチコアハードウェアで実行した際に、特定のセンサーIDの「最初」のイベントが元のリストの2番目または3番目の出現でランダムに見える intermittently テスト失敗として現れました。調査の結果、問題はdistinctが遭遇順序の強制なしに並列で実行されたために起こったことがわかりました。それぞれのスレッドがリストのチャンクを処理し、各IDのローカル「最初」の遭遇を保持していました。フレームワークがこれらの部分結果をマージしたとき、スレッドのグローバルな順序は保証されず、スレッドローカルの最初が間で任意の選択を引き起こしました。
三つの解決策が評価されました。最初のアプローチは、完全に並列性を放棄し、逐次ストリームに戻るものでした。これにより、リスト内の最も古いイベントが常に勝つ決定論的な動作が復元されました。しかし、これによりピーク負荷時に処理遅延が400%増加し、スループットのSLAを違反し、予算に載せられなかったハードウェアのアップグレードを必要としました。
二つ目のアプローチは、distinctの前に**.unordered()**を挿入し、任意の重複が許容されることを明示的に示しました。これによりスレッドが調整なしに任意の重複を除外できるため、スループットを最大化しました。しかし、これにより、最初の読み取りを保存するというビジネス上の要件が違反され、このアプローチは監査トレイルには受け入れられませんでした。
三つ目のアプローチは、Collectors.toCollection(LinkedHashSet::new)を介した下流コレクタとしてLinkedHashSetを活用し、これはストリームを順序付けされたセットにマテリアライズしながら、前のフィルタ操作のための並列分解を許しました。しかし、これには中間のdistinct操作を放棄する必要があり、重複除外前にフルワーキングセットを保持するためにかなりのメモリを消費しました。
選ばれた解決策は、順序付けされたフェーズと無順序フェーズを分離するためにパイプラインを再構成することでした。このシステムは、最初に無状態フィルタリングとマッピングを並列で実行し、その後**.sequential()を介して明示的に逐次ストリームに移行し、次にdistinctとsorted**を呼び出しました。このハイブリッドアプローチにより、状態を持つ終端部分にのみ逐次ボトルネックを制限し、並列スループットの70%を保持しながら遭遇順序を保証しました。
結果として、各センサーイベントの最初の発生を正しく識別する安定した決定論的なパイプラインが得られました。処理速度は許容範囲内に保たれ、欠陥率はゼロに減少し、遅延は運用のしきい値内に留まりました。
なぜforEachOrdered終端操作は、並列ストリームにおいてforEachよりも大幅に高いオーバーヘッドを伴い、いつそれが厳密に必要なのか?
forEachは、並列スレッドから利用可能になる要素を調整なしに処理します。このアプローチはスループットを最大化しますが、出力はスレッド到着順に生成される可能性があります。forEachOrderedは対照的に、元の遭遇順序を再構築する必要があり、フレームワークは結果をバッファリングし、古い要素を所有している遅いスレッドを待たせる必要があるため、同期ボトルネックを生むことがあります。
それは、処理の副作用がソースの順序を観察する必要がある場合にのみ、厳密に必要です。例には、ファイルやGUIリストモデルのような位置敏感な出力への書き込みが含まれます。ログ記録や同時コレクションへの合計のような順序に無関係な副作用の場合は、forEachが好まれます。
reduce操作の非同次累積関数に対する要件が、並列実行中に微妙な競合条件を防ぐ方法、そしてこの制約が違反された場合に何が起こるかについて。
reduce操作は、ストリームをセグメントに分割し、各セグメントに対して累積器を独立して適用して部分結果を生成し、次にこれらの部分結果を同じ累積器(または別の結合器)を使って組み合わせます。結合性は、((a op b) op c) が (a op (b op c)) に等しいことを保証します。この特性は、非決定的で実装依存的な部分結果の結合順序と要素のセグメントへのグループ化が行われるために必要です。
もし操作が非結合性である場合(例えば、位置によって異なる区切りを持つ文字列の連結など)、並列実行中に要素がシーケンシャル実行と異なる方法でグループ化される可能性があります。これにより、混乱した区切りや、非結合性カスタム数タイプの誤った合計のような不正確な結果が生じます。
findFirstのような短絡操作と無限ストリーム間の特定の相互作用が、並列ストリームが無期限にハングする可能性を生む理由、そして逐次ストリームが直ちに終了する理由について。
逐次ストリームでは、findFirstは条件が一致するとすぐに終了できます。無限ストリームにおいても同様です。並列ストリームでは、フレームワークがソースを複数のセグメントに分割し、異なるスレッドによって処理されます。一致する要素が遅いスレッドによって処理されるセグメントにある場合、findFirstは、そのスレッドがそのセグメントを完了するまで待つ必要があります(または要素を見つけるまで)、他のセグメントに他の要素がないことを保証するため、遭遇順序を尊重しなければなりません。
ストリームが無秩序であるか、findAnyが代わりに使用される場合、操作は任意の一致で直ちに終了可能で、メインスレッドは保留中のタスクをキャンセルできます。候補者は、順序付けされた並列無限ストリームにおけるfindFirstが、マッチの前のセグメントが無限または計算的に無限の場合にデッドロックを引き起こす可能性があることを見逃しがちです。