在软件开发领域,高效处理大型数据集变得至关重要,特别是随着多核处理器的出现。Java Stream 接口通过允许对集合进行顺序和并行操作,迈出了一大步。然而,在保留 Stream API 的简洁性的同时,充分利用现代处理器的能力却带来了挑战。
鉴于这一情况,我创建了一个旨在尝试一种新的并行化流操作方法的开源库。这个库通过处理每个流元素在自己的虚拟线程中,与传统的批处理方法有所不同,提供了更精细的并行度。
在这篇文章中,我将谈论这个库及其设计。这比你使用这个库所需的信息更详尽。
该库在 GitHub 上提供,同时也作为 Maven Central 中的一个依赖项。
<dependency>
<groupId>com.github.verhas</groupId>
<artifactId>vtstream</artifactId>
<version>1.0.1</version>
</dependency>
在 Maven Central 网站或 GitHub 上查看实际版本号。本文基于该库的 1.0.1 版本。
并行计算
并行计算并不是一件新鲜事物。它已经存在数十年。最初的计算机是批处理执行任务的,因此是一种串行执行方式,但很快时间共享的概念就出现了。
第一个时间共享的计算机系统安装在 1961 年的麻省理工学院 (MIT)。这个系统,被称为 Compatible Time-Sharing System (CTSS),允许多个用户同时登录到一台大型计算机上,看起来是在一个私人会话中工作。CTSS 是计算机科学领域的重大发展,奠定了支持多任务处理和多用户操作的现代操作系统和计算环境的基础。
实际上,这并不是一个并行计算系统。CTSS 旨在在 MIT 的一台 IBM 7094 大型计算机上运行。它只有一个 CPU,因此代码是以串行方式执行的。
今天,我们拥有多核处理器和单台计算机上拥有多个处理器的情况。我是在一台拥有 10 个处理器内核的计算机上编辑这篇文章的。
要同时执行任务,有两种方法:
- 以并发的方式定义算法,例如,响应式编程。
- 以传统的顺序方式定义算法,并让一些程序决定并发性。
- 将这两种方式结合起来。
当我们编写一些响应式算法或者像 Java 8 流那样定义流时,我们帮助应用程序并发执行任务。我们定义小的部分及它们的相互依赖,以使环境可以决定哪些部分可以并行执行。实际执行是由框架完成的,而当我们使用
两者之间区别在于调度器:谁决定下一时刻哪个处理器应该执行哪个任务。在线程或者进程的情况下,执行者是操作系统。线程和进程执行之间的区别在于属于同一进程的线程共享相同的内存空间。每个进程有自己的内存空间。相似地,属于同一操作系统线程的虚拟线程共享相同的堆栈。
从进程转变为虚拟线程,我们遇到了共享资源的减少,因此也降低了开销。这使得虚拟线程相对于传统线程来说成本大大降低。虽然一台机器可能支持数千个线程和进程,但它能容纳数百万个虚拟线程。
通过 with 阶段中定义一个任务流,你本质上正在详细说明要在多个元素上执行的一系列操作。并发执行这些操作的决定权由框架负责,它可能选择执行也可能不选择执行。然而,在 Java 中的 Stream 作为一个高级接口,提供了灵活性,使我们能够实现一个支持任务并发执行的版本。
在线程中实现流
该库含有位于主目录中的两个主要类,分别是:
ThreadedStream
是负责实现 Stream
接口的类。
public class ThreadedStream<T> implements Stream<T> {
Command
类包含了实现流操作功能的嵌套类。
public static class Filter<T> extends Command<T, T> {
public static class AnyMatch<T> extends Command<T, T> {
public static class FindFirst<T> extends Command<T, T> {
public static class FindAny<T> extends Command<T, T> {
public static class NoOp<T> extends Command<T, T> {
public static class Distinct<T> extends Command<T, T> {
public static class Skip<T> extends Command<T, T> {
public static class Peek<T> extends Command<T, T> {
public static class Map<T, R> extends Command<T, R> {
所有提到的操作符都是中介操作。终端操作符都是在 ThreadedStream
类中实现的,在对该流做终端操作前,将该线程流转换为普通流。这种方法的实现示例是 collect
方法。
@Override
public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
return toStream().collect(supplier, accumulator, combiner);
}
元素的来源也是一个流,这意味着线程功能是在现有的流实现之上构建的。这种设置允许利用流作为数据源和处理后数据的目的地。并行处理发生在中间过程,从而实现中介命令的并行执行。
因此,该实现的核心—以及最有趣的方面—在于结构的构建和其随后的执行。
我们首先来查看流数据的结构,然后探讨该类如何利用虚拟线程执行操作。
数据流结构
ThreadedStream
类通过以下成员变量维护其数据:
private final Command<Object, T> command;
private final ThreadedStream<?> downstream;
private final Stream<?> source;
private long limit = -1;
private boolean chained = false;
command
代表要在数据上执行的 Command
对象。它可能是一个无操作 (NoOp) 命令,或者是 null
,如果没有特定的命令要执行的话。
downstream
变量指向处理链中前一个 ThreadedStream
。一个 ThreadedStream
要么从直接的 downstream
流中检索数据(如果可用),要么直接从初始链中的 source
中检索数据。
source
是初始数据流。即使指定了 downstream
,在这种情况下两个流的 source
仍然相同。
limit
指定此流配置为处理的最大元素数量。实现限制需使用变通方法,因为流元素处理是立即开始而不是被终端操作"拉取"的。因此,无限流不能馈送到 ThreadedStream
中。
chained
是一个布尔标记,表示该流是否属于处理链的一部分。当为 true
时,表示有一个后续流依赖于该流的输出,防止在出现处理分支时执行。这个机制反映了 JVM 标准流实现中发现的方法。
构建流
随着中介操作被链接在一起,流数据结构会动态构建。这个过程的启动始于通过调用 ThreadedStream
类上的静态方法 threaded
来创建起始元素。单元测试中的一个示例行说明了这个启动:
final var k = ThreadedStream.threaded(Stream.of(1, 2, 3));
This line demonstrates the creation of a ThreadedStream
instance named k
, initialized with a source stream consisting of the elements 1, 2, and 3. The threaded
method serves as the entry point for transforming a regular stream into a ThreadedStream
, setting the stage for further operations that can leverage virtual threads for concurrent execution.
When an intermediary operation is appended, it results in the creation of a new ThreadedStream
instance. This new instance designates the preceding ThreadedStream
as its downstream
. Moreover, the source stream for this newly formed ThreadedStream
remains identical to the source stream of its predecessor. This design ensures a seamless flow of data through the chain of operations, facilitating efficient processing in a concurrent environment.
For example, when we call:
final var t = k.map(x -> x * 2);
The map
method is called, which is:
public <R> ThreadedStream<R> map(Function<? super T, ? extends R> mapper) {
return new ThreadedStream<>(new Command.Map<>(mapper), this);
}
It generates a new ThreadedStream
object wherein the preceding ThreadedStream
acts as the downstream
. Additionally, the command
field is populated with a new instance of the Command
class, configured with the specified mapper function.
This process effectively constructs a linked list composed of ThreadedStream
objects. This linked structure comes into play during the execution phase, triggered by invoking one of the terminal operations on the stream. This method ensures that each ThreadedStream
in the sequence can process data in a manner that supports concurrent execution, leveraging the capabilities of virtual threads for efficient data processing.
It’s crucial to understand that the ThreadedStream
class refrains from performing any operations on the data until a terminal operation is called. Once execution commences, it proceeds concurrently. To facilitate independent execution of these operations, ThreadedStream
instances are designed to be immutable. They are instantiated during the setup phase and undergo a single mutation when they are linked together. During execution, these instances serve as a read-only data structure, guiding the flow of operation execution. This immutability ensures thread safety and consistency throughout concurrent processing, allowing for efficient and reliable stream handling.
Stream Execution
The commencement of stream execution is triggered by invoking a terminal operation. These terminal operations are executed by first transforming the threaded stream back into a conventional stream, upon which the terminal operation is then performed.
The collect
method serves as a prime example of this process, as previously mentioned. This method is emblematic of how terminal operations are seamlessly integrated within the ThreadedStream
framework, bridging the gap between concurrent execution facilitated by virtual threads and the conventional stream processing model of Java. By converting the ThreadedStream
into a standard Stream
, it leverages the rich ecosystem of terminal operations already available in Java, ensuring compatibility and extending functionality with minimal overhead.
@Override
public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
return toStream().collect(supplier, accumulator, combiner);
}
The toStream()
method represents the core functionality of the library, marking the commencement of stream execution by initiating a new virtual thread for each element in the source stream. This method differentiates between ordered and unordered execution through two distinct implementations:
toUnorderedStream()
toOrderedStream()
The choice between these methods is determined by the isParallel()
status of the source stream. It’s worth noting that executing an ordered stream in parallel can be advantageous. Although the results may be produced out of order, parallel processing accelerates the operation. Ultimately, care must be taken to collect the results in a sequential manner, despite the unordered processing potentially yielding higher efficiency by allowing elements to be passed to the resulting stream as soon as they become available, eliminating the need to wait for the preceding elements.
The implementation of toStream()
is designed to minimize an unnecessary collection of elements. Elements are forwarded to the resulting stream immediately upon readiness in the case of unordered streams, and in sequence upon the readiness and previous element’s forwarding in ordered streams.
In subsequent sections, we delve into the specifics of these two execution methodologies.
Unordered Stream Execution
Unordered execution promptly forwards results as they become prepared. This approach employs a concurrent list for result storage, facilitating simultaneous result deposition by threads and retrieval by the target stream, preventing excessive list growth.
The iteration over the source stream initiates the creation of a new virtual thread for each element. When a limit is imposed, it’s applied directly to the source stream, diverging from traditional stream implementations where limit
acts as a genuine intermediary operation.
The implementation of the unordered stream execution is as follows:
private Stream<T> toUnorderedStream() {
final var result = Collections.synchronizedList(new LinkedList<Command.Result<T>>());
final AtomicInteger n = new AtomicInteger(0);
final Stream<?> limitedSource = limit >= 0 ? source.limit(limit) : source;
limitedSource.forEach(
t -> {
Thread.startVirtualThread(() -> result.add(calculate(t)));
n.incrementAndGet();
});
return IntStream.range(0, n.get())
.mapToObj(i -> {
while (result.isEmpty()) {
Thread.yield();
}
return result.removeFirst();
})
.filter(f -> !f.isDeleted())
.peek(r -> {
if (r.exception() != null) {
throw new ThreadExecutionException(r.exception());
}
})
.map(Command.Result::result);
}
The counter n
is utilized to tally the number of threads initiated. The resulting stream is constructed using this counter by mapping the numbers 0 to n
-1 to the elements of the concurrent list as they become ready. If the list lacks elements at any point, the process pauses, awaiting the availability of the next element. This waiting mechanism is implemented within a loop that incorporates a yield
call to prevent unnecessary CPU consumption by halting the loop’s execution until it’s necessary to proceed. This efficient use of resources ensures that the system remains responsive and minimizes the potential for performance degradation during the execution of parallel tasks.
Ordered Stream Execution
Ordered stream execution introduces a more nuanced approach compared to its unordered counterpart. It incorporates a local class named Task
, designed specifically to await the readiness of a particular thread. Similar to the unordered execution, a concurrent list is utilized, but with a key distinction: the elements of this list are the tasks themselves, rather than the results.
This list is populated by the code responsible for thread creation, rather than by the threads themselves. The presence of a fully populated list eliminates the need for a separate counter to track thread initiation. Consequently, the process transitions to sequentially waiting on each thread as dictated by their order in the list, thereby ensuring that each thread’s output is relayed to the target stream in a sequential manner. This method meticulously maintains the ordered integrity of the stream’s elements, despite the concurrent nature of their processing, by aligning the execution flow with the sequence of the original stream.
private Stream<T> toOrderedStream() {
class Task {
Thread workerThread;
volatile Command.Result<T> result;
/**
* Wait for the thread calculating the result of the task to be finished. This method is blocking.
* @param task the task to wait for
*/
static void waitForResult(Task task) {
try {
task.workerThread.join();
} catch (InterruptedException e) {
task.result = deleted();
}
}
}
final var tasks = Collections.synchronizedList(new LinkedList<Task>());
final Stream<?> limitedSource = limit >= 0 ? source.limit(limit) : source;
limitedSource.forEach(
sourceItem -> {
Task task = new Task();
tasks.add(task);
task.workerThread = Thread.startVirtualThread(() -> task.result = calculate(sourceItem));
}
);
return tasks.stream()
.peek(Task::waitForResult)
.map(f -> f.result)
.peek(r -> {
if (r.exception() != null) {
throw new ThreadExecutionException(r.exception());
}
}
)
.filter(r -> !r.isDeleted()).map(Command.Result::result);
}
Summary and Takeaway
Having explored an implementation that facilitates the parallel execution of stream operations, it’s noteworthy that this library is open source, offering you the flexibility to either utilize it as is or reference its design and implementation to craft your own version. The detailed exposition provided here aims to shed light on both the conceptual underpinnings and practical aspects of the library’s construction.