在 Java 8 之前,集合处理的并行化需要手动管理 Thread 或显式提交 ExecutorService,迫使开发者手动处理工作分配和同步。Java 8 引入的 Stream API 通过 Spliterator 接口抽象了并行操作,该接口依赖于像 SIZED 这样的特征来指示已知的元素数量。这个特征使框架能够执行平衡的二叉拆分,为 ForkJoinPool 创建最佳的任务树。
当 Spliterator 缺少 SIZED 特征——这在生成器函数、基于 Iterator 的流或无限序列中很常见——框架无法进行二叉拆分(按二分之一拆分)来创建平衡的任务树。盲目的拆分将产生数百万个微小任务(粒度膨胀),导致协调开销主导执行时间,或者产生过大的任务块,使得工作线程在一个线程处理大量待处理任务时处于空闲状态。这种不可预测性破坏了分叉-合并并行性的基本假设:工作可以划分为大致相等的子任务。
框架通过默认的 IteratorSpliterator 实现采用几何批处理。它不是按一半拆分,而是使用指数增长的批量大小(1, 2, 4, 8,直到 MAX_BATCH),这摊销了拆分成本,同时将任务创建限制在对数深度。ForkJoinPool 通过工作窃取来弥补未知大小的影响,优先选择轻量级任务,而 AbstractTask 在不需要总大小信息的情况下计算完成信号。对于有序的无大小流,在拆分期间,管道将元素缓冲到一个 ArrayList 中,以保持遇到的顺序,在并行安全性与内存之间进行权衡。
一个遥测系统处理通过 Socket 连接实时到达的传感器数据。这些数据以 JSON 对象的连续流形式到达,业务需求要求在存储之前对这些对象进行并行解析和过滤,以最小化延迟。挑战在于到达速率和数据总量的不确定性。
最初的实现将 InputStream 包装在 BufferedReader 中,并使用 lines().parallel()。然而,性能分析显示,平行流的性能明显低于顺序处理,原因是在创建任务时开销过大。根本原因是来自 BufferedReader.lines() 的底层 Spliterator 缺少 SIZED 特征,并最初报告 Long.MAX_VALUE 作为估算,导致框架为每个单独的行创建微型任务。
一种方法是先将整个流缓冲到一个 ArrayList<String> 中,然后再进行并行处理。这将提供 SIZED 特征并在 CPU 核心之间实现完美的二分拆分。然而,这引入了不可接受的延迟——在整个批次到达之前无法处理数据,并且在每分钟处理数百万事件时会造成严重的内存压力,有效地否定了流处理的范式。
另一个考虑的解决方案是实现一个自定义的 Spliterator,无论底层流如何,总是拆分出固定大小的1000行块。虽然这提供了可预测的任务大小,但在每行的处理时间变化显著时,它失败了;一个工作线程可能收到 1000 个复杂对象,而另一个可能收到 1000 个简单对象,从而导致严重的负载不平衡和空闲的 CPU 核心等待最慢的任务。
选定的解决方案是实现一个自定义的 Spliterator,模拟标准库的几何批处理策略。它跟踪一个 batch 变量,从 1 开始,在每次成功拆分后加倍,直到最大1024,允许框架适应实际的流长度而不需要事先了解。这种方法平衡了小任务的初始开销与流进展时大批次的效率。
几何批处理方法在 8 核心系统上实现了比顺序处理快 3.5 倍的加速。无论流持续时间如何,内存使用保持不变,延迟保持较低,因为处理在不等待完全成形的情况下立即开始。自适应大小防止了初始实现中存在的粒度膨胀。
为什么在并行流中包装同步集合相比于顺序等效的性能通常降低,即使对于 CPU 密集型操作也是如此?
许多候选人认为 Collections.synchronizedList() 或同步 Map 实现对于并行流是安全的。但是,虽然这些集合的 Spliterator 报告 SIZED,但每次访问固有的同步会产生巨大的缓存一致性流量。当多个 ForkJoinPool 线程在对每个元素争用同一监视器时,时钟开销和上下文切换的成本超过了任何并行增益。正确的方法要求使用 ConcurrentHashMap 或 CopyOnWriteArrayList(如果写入很少),或者确保源集合是无干扰的,并通过线程安全的 Spliterator 特征如 CONCURRENT 进行访问。
ORDERED 特征如何与无大小流交互,可能会序列化终端操作,而为什么 sorted() 会加剧这一点?
候选人常常忽视 ORDERED 与缺乏 SIZED 的结合强迫框架在处理完成之前缓冲所有元素,特别是对于有状态的操作如 sorted() 或 distinct()。如果不知道总大小,框架无法为 toArray() 或合并排序缓冲区提前分配最终数组。它而是将元素累积到一个链表或动态调整大小的 ArrayList 中,有效地序列化了管道完成阶段。这意味着并行加速仅限于映射/过滤阶段,而终端阶段成为一个单线程的瓶颈,等待完整的数据集。
如果自定义 Spliterator 的 trySplit() 方法返回一个报告与父节点不同特征集的 Spliterator,会发生什么特定的合同违规?
当开发者重写 trySplit() 但未能保持特征一致性时,会发生一个微妙的错误。Spliterator 合同要求返回的 spliterator 必须在顺序、独特性和排序性方面具有相同的特征。如果父节点报告 ORDERED 但子节点(拆分结果)不报告,则 Stream 框架的优化过程可能会消除排序步骤或重新排列操作,导致结果错误。特征必须在拆分过程中保持稳定,因为管道基于这些标志优化融合(例如,组合 filter 和 map),不一致的标志破坏了并行正确性所需的发生前关系。