当前位置:Java -> 线程、线程池和执行器:Java中的多线程处理
并发是我们(开发人员)在日常工作中可能面对的最复杂的问题之一。此外,它也是我们在解决日常问题时可能遇到的最常见问题之一。这两个因素的结合才真正使并发和多线程成为软件工程师可能遇到的最危险的问题。
更重要的是,用低级抽象解决并发问题可能是一个相当具有认知挑战性的过程,并且会导致复杂的、不确定的错误。这就是为什么大多数语言引入了更高级的抽象,允许我们相对容易地解决与并发相关的问题,而不用花时间调整低级开关。
在本文中,我想进一步深入研究Java标准库提供的这些抽象,即ExecutorService
接口及其实现。我还希望本文成为我下一篇关于Java Streams基准测试的入门文章。
在那之前,让我们快速回顾一下进程和线程:
它是可以在计算机内部独立执行的最简单的单元。由于进程,我们可以将计算机内部正在进行的工作分解成更小的、更模块化、更易管理的部分。
这种方法使我们能够使特定部分更加专注,从而更加高效。这样的拆分也使得我们能够充分利用我们CPU中内置的多个内核。
通常情况下,每个进程都是特定程序的一个实例 —— 例如,我们正在运行的Java进程就是一个JVM程序实例。
此外,每个进程都根植在操作系统内部,并且具有其独特的资源、访问权限和行为(程序代码) —— 类似于我们的应用程序用户。
每个进程可以有多个线程(至少在大多数操作系统中),这些线程共同完成进程分配的共同任务。
它可以被视为代码的一个分支,具有一组特定的指令,可以并行执行,同其他应用程序的工作一起进行。线程使单个进程内的多个指令序列能够并发执行。
在软件级别上,我们可以区分两种类型的线程:
在这里,我将主要关注应用线程。我还将提到与CPU相关的线程。这些是我们CPU的硬件线程特征,其数量描述了我们的CPU同时处理多个线程的能力。
原则上,与进程相比,线程可以以更轻的方式共享资源。进程内的所有线程都可以访问其父进程拥有的所有数据。
此外,每个线程都可以拥有自己的数据,更常见的是线程本地变量(或者在Java中更新和更推荐的scoped values)。更重要的是,在线程之间进行切换要比处理进程要容易得多。
线程池比线程和进程更具体的一个术语。它与应用线程相关,描述了我们可以在应用程序内部使用的一组线程。
它基于一个非常简单的行为。我们从池中一个接一个地取出线程,直到池为空为止。就是这样。但是,这个规则还有一个附加的假设,特别是:一旦线程完成了它们的任务,它们将被返回到池中。
当然,应用程序可能有多个线程池,事实上,我们的线程池越专业化,对我们就越有利。借助这种方法,我们可以限制应用程序内的争用,并消除单点故障。目前的行业标准至少是要为数据库连接单独创建一个线程池。
在旧版本的Java(在Java 21 之前),应用程序内部使用的所有线程都绑定到CPU线程。因此,它们是非常昂贵和繁重的。
如果您在Java应用程序中无意(或故意)产生了太多线程,例如通过调用“new Thread()
”。那么您很快就会耗尽资源,应用程序的性能将迅速下降-因为在其他情况下,CPU需要进行大量的上下文切换。
Project Loom,Java 21发布版的一部分,旨在通过向Java标准库添加虚拟线程 - 即不绑定到CPU线程的线程,即所谓的绿色线程 - 来解决这个问题。如果您想了解更多关于Loom及其对Java线程带来的改变,请参阅本文。
在Java中,线程池的概念由ThreadPoolExecutor
来实现-这是一个表示具有由类构造函数的maximumPoolSize
参数描述的有限大小的线程池的类。
顺便说一句,我想补充的是,这个执行程序在更复杂的执行程序中进一步用作内部线程池。
在我们开始描述使用ThreadPoolExecutor
的执行器接口的更复杂的实现之前,我想回答另一个问题:即Executor
和ExecutorService
分别是什么?
执行器是一个只公开一个被执行的方法的接口,其签名为:void execute(Runnable command)
。这个接口旨在描述一种非常简单的操作-准确来说,实现它的类可以执行这样的操作:执行一个提供的可运行的任务。该接口的目的是提供一种从任务提交中解耦任务将如何运行的机制。
ExecutorService是另一个接口,是Executor
接口的扩展。它的合同比Executor
强大得多。
如果我们决定实现它,它具有大约13个要重写的方法,它的主要目的是通过将此类任务封装在Java Future中来帮助管理和运行异步任务。
此外,ExecutorService
扩展了Autocloseable
接口。这允许我们在try-with-resource语法中使用ExecutorService
,并以有序的方式关闭资源。
Executors
类,另一方面,是一种类型的工具类。这是生成执行器实例的推荐方法-大多数执行器不推荐使用新关键字。更重要的是,它提供了创建可调用实例变体的方法,例如具有静态返回值的可调用实例。
有了这三个基本概念的描述,我们可以转向各种执行器服务实现。
到目前为止,Java标准库支持ExecutorService
接口的4个主要实现。每个都提供一组更多或更少独特的功能。
它们分别是:
ThreadPoolExecutor
ForkJoinPool
ScheduledThreadPoolExecutor
ThreadPerTaskExecutor
此外,Executors
类中还有三种私有静态实现,它们实现了ExecutorService
:
DelegatedExecutorService
DelegatedScheduledExecutorService
AutoShutdownDelegatedExecutorService
总的来说,这些类之间的依赖关系图大致如下:
正如我之前所说,它是Java中线程池概念的一种实现。这个执行器代表了一个具有动态线程数的有界线程池。它的确切含意是,TheadPoolExecutor
将使用有限数量的线程,但使用的线程数永远不会超过在池创建时指定的数量。
为了实现这一点,ThreadPoolExecutor
使用了两个变量:corePoolSize
和maximumPoolSize
。第一个变量corePoolSize
描述了池中线程的最小数量,因此即使线程处于空闲状态,池也会保持它们的活动状态。
另一方面,第二个变量maximumPoolSize
描述了,你可能已经猜到了,池拥有的线程的最大数量。这是池内线程的上限。池中的线程数永远不会超过这个参数的值。
此外,ThreadPoolExecutor
在底层使用BlockingQueue
来跟踪传入的任务。
默认情况下,如果当前运行的线程数小于corePoolSize
,调用execute
方法将导致生成一个新线程,其中传入的任务是线程的首要工作 — 即使在那一刻池中有空闲线程存在。如果由于某种原因,池无法添加新线程,那么池将转入第二种行为。
如果运行的线程数大于或等于corePoolSize
,或者池无法生成新线程,则调用execute
方法将尝试向队列中添加一个新任务:isRunning(c) && workQueue.offer(command)
。
如果池仍在运行,我们会首先向池中添加一个没有任何任务的新线程 — 唯一的情况是我们在没有任何任务的情况下生成一个新线程:addWorker(null, false);
。
相反地,如果池停止了运行,我们将从队列中移除新命令:
!isRunning(recheck) && remove(command)
然后池子用RejectedExecutionException
拒绝了这个命令:reject(command);
。
如果由于某种原因,我们无法将任务添加到队列中,池子会尝试使用任务来启动新线程:else if (!addWorker(command, false))
。
如果失败了,任务就会被拒绝,并且会抛出带有类似信息的RejectedExecutionException
。
Task X rejected from java.util.concurrent.ThreadPoolExecutor@3a71f4dd[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
如上所示,你可以看到ThreadPoolExecutor
内部状态的非常简化的可视化图。从这里开始,你可以期待两种结果:要么在将Task6
提交到池之前,Task4
(或Task5
)将会被处理,要么在Task4
(或Task5
)处理结束之前,Task6
將會被提交到池子。
第一种情况相当无聊,从执行器的角度来看一切都一样,所以我只会在这上面花一点时间。
第二种情况更有趣,因为它将导致执行器状态的改变。因为当前运行线程的数量小于corePoolSize
,将Task6
提交给执行器将会导致为此任务生成新的worker。最终状态看起来会跟下面的差不多。
corePoolSize
和maximumPoolSize
设置为相同的值,您实际上可以创建一个固定大小的线程池,并且池运行的线程数永远不会低于或高于设置的值 - 至少对长期来说是这样。maximumPoolSize
设置为足够高的值,例如Integer.MAX_VALUE
- 您可以使其几乎无界。从ThreadPoolExecutor
实现中会产生大约5亿((2²⁹)-1)个线程的实际限制。不过,在达到上限之前,我敢打赌您的机器会崩溃。如果您想了解为什么这个数字成为限制的原因,有一个非常好的JavaDoc描述了这一点。它位于ThreadPoolExecutor
类声明后面。我只是提醒一下,它与ThreadPoolExecutor
如何保持其状态的方式有关。
Executors类为您提供了6种方法来生成ThreadPoolExecutor
。我将在两个包中描述它们,因为它们是设计为这样工作的。
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
这些方法创建一个固定大小的线程池 - 大小核心和最大相等。此外,如果您不想使用标准库中的默认线程工厂,还可以将threadFactory
作为参数传递。
Executors.newFixedThreadPool(2);
Executors.newFixedThreadPool(2, Executors.defaultThreadFactory());
下面两种方法的批处理:
public static ExecutorService newCachedThreadPool()
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
上面的方法通过将maxPollSize
设置为Integer.Max
来创建实际上没有限制的线程池;第二个版本类似于FixThreadPool
,允许传入自定义的ThreadFactory
。
Executors.newCachedThreadPool();
Executors.newCachedThreadPool(Executors.defaultThreadFactory());
还有最后两种方法:
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
这两种方法都创建了一个使用单个线程的线程池。而且,这些方法生成了一个包装在AutoShutdownDelegatedExecutorService
中的ThreadPoolExecutors
。它只暴露了ExecutorService
的方法:没有ThreadPoolExecutor
特定的方法可用。
此外,关闭方法是通过Cleanable
覆盖的,并且在其成为虚引用时调用。
向ThreadPoolExecutor
添加新任务的默认方式是使用submit
方法的其中一个版本。另外,也可以直接使用Executor
接口中的execute
方法。但这不是推荐的方式。
使用execute
方法将返回void,而不是future:这意味着您对其执行的控制较少,因此选择权在您。当然,这两种方法都会激活上述线程池的所有逻辑。
Runnable task = () -> System.out.print("test");
Future<?> submit = executorService.submit(task);
vs
executorService.execute(task);
ForkJoinPool
是一个完全独立的ExecutorService
实现,其主要卖点是工作窃取的概念。
工作窃取是一个非常复杂的概念,值得专门撰写一篇博客文章来描述。但是,通过足够的抽象概念,可以比较简单地描述 — 池中的所有线程都尝试执行提交给池的任何任务,无论其最初的所有者如何。
这就是为什么这个概念被称为工作窃取,因为线程“窃取”了彼此的工作。在理论上,这样的方法应该导致显着的性能提升,特别是如果提交的任务很小或产生其他子任务。
在不久的将来,我计划发表一篇专门关注工作窃取和ForkJoinFramework
的文章。在那之前,您可以在这里阅读更多关于工作窃取的内容。
这个执行器是Java 7引入的Fork/Join框架的一部分,它是一组旨在通过使用工作窃取的概念来添加更高效的任务并行处理的类。目前,这个框架被广泛用于CompletableFuture
和Streams的上下文中。
如果您想充分利用ForkJoinPool
,我建议您全面了解Fork/Join框架。然后,尝试将处理此类任务的方式切换为更符合Fork/Join要求的方法。
在深入研究Fork/Join框架之前,请做基准测试和性能测试,因为潜在的收益可能不如预期的那么好。
此外,在Java文档中有一张表,描述了与ForkJoinPool交互以获得最佳结果的方法。
在ForkJoinPool中的关键参数是parallelism,它描述了池子将要使用的工作线程数。默认情况下,它等于CPU上的可用处理器数。在大多数情况下,这是一个足够的设置,我建议不要进行适当的性能测试而更改它。
请记住,Java线程是面向CPU的,并且我们可能会很快耗尽处理能力来进行任务进展。
Executors类提供了生成ForkJoinPool实例的两种方法:
Executors.newWorkStealingPool();
Executors.newWorkStealingPool(2);
第一种方法创建forkJoinPoll时使用默认的parallelism(可用处理器数),而第二种方法让我们可以自己指定parallelism级别。
此外,Executors使用FIFO队列来生成ForkJoinPool,而ForkJoinPool自身的默认设置(例如通过new ForkJoinPool(2))是LIFO。使用Executors无法使用LIFO。尽管如此,您可以通过ForkJoinPool类的asyncMode构造器参数来更改底层队列的类型。
使用FIFO设置,ForkJoinPool可能更适用于从未加入的任务,比如可调用或可运行的用法。
ScheduledThreadPoolExecutor在经典的ThreadPoolExecutor上添加了一层,并允许调度任务。
它支持三种类型的调度:
schedule
scheduleAtFixedRate
scheduleWithFixedDelay
当然,您也可以使用“普通”的ExecutorService API。只需记住,在这种情况下,submit和execute方法等同于调用schedule 方法,延迟为0,即立即执行提供的任务。
由于ScheduledThreadPoolExecutor
扩展了ThreadPoolExecutor
,它的一些部分与经典的ThreadPoolExecutor
相同。无论如何,它都使用自己的任务ScheduledFutureTask
和队列的实现:DelayedWorkQueue
。
此外,ScheduledThreadPoolExecutor
始终创建固定大小的ThreadPoolExecutor
作为其基础线程池,因此corePoolSize
和MaxPoolSize
始终相等。
然而,在ScheduledThreadPoolExecutor
的实现中有一个或两个隐藏的问题。
Executors
类提供了五种生成ScheduledThreadPoolExecutor
的方式。它们组织方式类似于ThreadPoolExecutor
的方式。
ScheduledThreadPoolExecutor
使用固定数量的线程:
Executors.newScheduledThreadPool(2);
Executors.newScheduledThreadPool(2, Executors.defaultThreadFactory());
第一种方法允许我们创建具有特定数量线程的ScheduledThreadPoolExecutor
。第二种方法添加了选择ThreadFactory
的功能。
ScheduledThreadPoolExecutor
使用单个线程:
Executors.newSingleThreadScheduledExecutor();
Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory());
创建一个SingleThreadScheduledExecutor
,其底层的ThreadExecutorPool
只有一个线程。事实上,在这里,我们生成了DelegatedScheduledExecutorService
的实例,它使用ScheduledThreadPoolExecutor
作为代理,因此代理的底层ThreadExecutorPool
只有一个线程。
生成ScheduledThreadPoolExecutor
的最后一种方法是通过使用:
Executors.unconfigurableScheduledExecutorService(new DummyScheduledExecutorServiceImpl());
该方法允许您使用您自己的实现包装ScheduledExecutorService
接口与DelegatedScheduledExecutorService
- Executors
类中的一个私有静态类。这个只显露出ScheduledExecutorService
接口的方法。
在一定程度上,我们可以将其视为封装助手。您可以在实现中拥有多个公共方法,但当您使用代理包装时,所有这些方法都将对用户隐藏。
我并不是特别喜欢这样的封装方法。这应该是实现的问题。但是,也许我错过了代理的所有重要用例。
这是Java标准库和Executors
类中的最新增加之一。它不是线程池实现,而是线程生成器。顾名思义,每个提交的任务都会得到一个与其执行绑定的线程,该线程在任务处理开始时启动。
为了实现这样的行为,该执行器使用自己的Future的自定义实现,名为ThreadBoundFuture
。由这些执行器创建的线程的生命周期看起来像这样:
Future
创建时立即创建。Executor
以编程方式启动线程之后线程才开始工作。Future
被中断时,线程被中断。Future
完成时线程停止。此外,如果Executor
无法为特定任务启动新线程并且在此过程中没有抛出异常,则Executor
将RejectedExecutionException
。
另外,ThreadPerTaskExecutor
持有一组上线的线程。每次启动线程时,它将被添加到该组中。相应地,当线程停止时,它将从组中移除。
您可以使用该集合通过threadCount()
方法跟踪Executor
在任何给定时间运行多少线程。
Executors
类暴露了两种生成此Executor
的方法。我会说其中一种更推荐。让我们从不推荐的方法开始:
Executors.newThreadPerTaskExecutor(Executors.defaultThreadFactory());
上述方法使用提供的线程工厂生成ThreadPerTaskExecutor
。至少在我看来,它不推荐的原因是ThreadPerTaskExecutor
的实例将在普通的Java CPU-bound线程上运行。
在这种情况下,如果你通过Executor
传递了大量的任务,你很容易耗尽应用程序的处理能力。
当然,没有什么能阻止你做以下的“技巧”,并且仍然使用虚拟线程。
Executors.newThreadPerTaskExecutor(Thread.ofVirtual().factory());
然而,当你可以简单地使用以下方法时,没有理由这样做:
Executors.newVirtualThreadPerTaskExecutor();
这个ThreadPerTaskExecutor
实例将充分利用Java 21的虚拟线程。这样的设置也应该大大增加Executor
在耗尽处理能力之前能够处理的任务数量。
正如你所见,Java提供了一组不同的执行者,从经典的ThreadPool
实现,如ThreadPoolExecutor
到更复杂的ThreadPerTaskExecutor
,它充分利用了Java 21的虚拟线程功能。
此外,每个Executor
实现都具有其独特的特性:
ThreadPoolExecutor
: 经典的ThreadPool
实现ForkJoinPool
: 工作窃取ScheduledThreadPoolExecutor
: 定期调度任务ThreadPerTaskExecutor
: 使用虚拟线程,并有可能在其单独的短暂线程中运行任务尽管有这些区别,所有执行者都具有一个定义特性: 所有它们都公开了一个API,使得并发处理多个任务变得更容易。
希望这些知识在将来某时对你有所帮助。谢谢你的时间。
注: 感谢Michał Grabowski和Krzysztof Atlasik的审阅。
推荐阅读: ChatGpt会对生活产生哪些影响
本文链接: 线程、线程池和执行器:Java中的多线程处理