Stream API 区分无状态操作(filter、map)和有状态操作(sorted、distinct、limit),这取决于它们是否必须在生成输出之前处理整个输入。当在并行执行时,框架将源数据划分到多个线程,每个线程独立处理一个段。如果源 Spliterator 报告 ORDERED 特性,则框架假定 encounter order(元素在源中出现的顺序)是重要的,并且必须在整个管道中保持。
然而,有状态操作如 distinct 依赖于全局状态(一个已见元素的 Set)来过滤重复项。如果没有显式的 encounter order 强制执行,并行线程可能竞相声称元素为“第一个”出现,从而导致任意选择哪个重复项存活。类似地,sorted 需要全局排序,但如果流被标记为无序或源缺乏 ORDERED 特性,则来自并行线程的中间结果可能会在不保留位置的情况下合并。这可能导致相等元素的不同相对顺序,或者在极端情况下,输出序列中的明显非确定性。
解决方案在于遵循 Spliterator 合同:如果 encounter order 重要,则源必须声明 ORDERED,而管道在有状态操作之前不得调用 unordered()。对于 distinct,这确保在 encounter order 中“第一个”出现是通过按顺序处理流段确定性选择,从而有效降低该阶段的并行性。如果顺序无关,显式调用 unordered() 可以让框架通过选择任意重复项和在没有同步的情况下合并部分结果来进行优化,从而提高性能,但以确定性为代价。
一个遥测处理系统处理数百万个传感器事件,每个事件都有一个纳秒时间戳和唯一的传感器 ID。要求是按传感器 ID 去重,同时保留每个 ID 的按时间排序的第一个事件,然后按时间戳对其余事件进行排序。最初的实现使用 sensorReadings.parallelStream().distinct().sorted(),假设 ArrayList 源维护插入顺序,并且 distinct 会自然保留第一个出现。
问题表现为间歇性的测试失败,其中给定传感器 ID 的“第一个”事件在多核硬件上运行时会随机是原始列表中的第二或第三个出现。经过调查,问题追溯到 distinct 在没有强制执行 encounter order 的情况下并行执行;每个线程处理列表的一部分,并保留其自己的本地“第一个”遇到的每个 ID。当框架合并这些部分结果时,线程的全局顺序不再得到保证,从而导致在线程本地的第一个元素之间的任意选择。
评估了三种解决方案。第一种方法完全放弃了并行性,回归到顺序流。这恢复了确定性行为,确保列表中的最早事件始终获胜。然而,它在高负载下将处理延迟提高了 400%,违反了吞吐量 SLA,并迫使进行不在预算内的硬件升级。
第二种方法在 distinct 之前插入 .unordered(),明确表示任何重复都是可以接受的。这通过允许线程在没有协调的情况下丢弃任意重复项来最大化吞吐量。不幸的是,这违反了保持最早读取的业务要求,使该方法在审计追踪中不可接受。
第三种方法通过 Collectors.toCollection(LinkedHashSet::new) 在 collect 操作中利用 LinkedHashSet 作为下游收集器。这将流物化为有序集合,同时仍允许对前面的过滤操作进行并行分解。然而,这需要放弃中间的 distinct 操作,并消耗显著更多的内存以存储去重之前的完整工作集。
选择的解决方案涉及重构管道以分离有序和无序阶段。系统首先在并行中应用无状态过滤和映射,然后通过 .sequential() 显式过渡到顺序流,然后调用 distinct 和 sorted。这种混合方法将顺序瓶颈限制在仅有状态的终端部分,保持了 70% 的并行吞吐量,同时保证了 encounter order。
结果是一个稳定、确定性的管道,正确识别每个传感器事件的首次出现。处理速度保持在可接受范围内,缺陷率降至零,同时延迟保持在操作阈值之内。
为什么在并行流中,forEachOrdered 终端操作会产生显著更高的开销,而 forEach 何时是绝对必要的?
forEach 根据来自并行线程的元素变得可用的顺序处理元素,而无需协调。这种方法最大化了吞吐量,但可能会按线程到达顺序输出。相比之下,forEachOrdered 必须重建原始的遇到顺序,这要求框架缓冲结果,并可能使快速线程等待较慢线程拥有的早期元素,从而创建同步瓶颈。
仅当处理的副作用必须遵循源顺序时,这才是绝对必要的。示例包括写入到位置敏感的输出中,如文件或 GUI 列表模型。对于无序侧效应,例如记录或汇总到并发集合中,优先使用 forEach。
减少操作对可结合累加器函数的要求如何防止在并行执行中出现微妙的竞争条件?如果违反此约束,会发生什么?
reduce 操作将流划分为段,在隔离状态下对每个段应用累加器以生成部分结果,然后使用相同的累加器(或单独的合并器)合并这些部分结果。结合性确保 ((a op b) op c) 等于 (a op (b op c))。这个属性是必需的,因为元素分组到段中的方式和组合部分结果的顺序是非确定性和依赖于实现的。
如果该操作是非结合性(例如,带有变动分隔符的字符串连接),并行执行可能以不同于顺序执行的方式分组元素。这会导致错误的结果,例如混淆的分隔符或非结合自定义数值类型的数学错误和。
短路操作例如 findFirst 与无限流之间的具体交互如何导致并行流可能无限挂起,而顺序流会立即终止?
在顺序流中,只要谓词匹配,findFirst 就可以立即终止,即便在无限流上。在并行流中,框架将源分成多个由不同线程处理的段。如果匹配元素位于由慢线程处理的段中,findFirst 必须等待该线程完成其段(或找到元素),以保证其他段中不存在更早的元素,因为它必须尊重遇到顺序。
如果流是无序的或使用 findAny,该操作可以在任何匹配时立即终止,允许主线程取消待处理的任务。候选人常常忽略有序的并行无限流上的 findFirst 实际上是一个全局障碍,如果匹配之前的段是无限或计算上没有界限的,可能会导致死锁。