当前位置:Java -> 深入了解Java Executor框架

深入了解Java Executor框架

在Java中,ExecutorService提供了一个灵活而高效的异步任务执行框架。它抽象了手动管理线程的复杂性,使开发人员可以专注于任务的逻辑。

概述

ExecutorService接口是java.util.concurrent包的一部分,表示一个异步任务执行服务。它扩展了Executor接口,该接口定义了一个execute(Runnable command)方法用于执行任务。

Executors

Executors是Java中的一个实用类,它提供了用于创建和管理不同类型的ExecutorService实例的工厂方法。它简化了实例化线程池的过程,并允许开发人员轻松地创建和管理带有各种配置的执行器实例。

Executors类提供了几个静态工厂方法来创建不同类型的执行器服务:

  1. FixedThreadPool:创建一个具有固定线程数的ExecutorService。提交给此执行器的任务将由指定数量的线程并发执行。如果线程处于空闲状态且没有可用任务,则它将保持活动但休眠状态直到需要。

    ExecutorService executor = Executors.newFixedThreadPool(5);


  2. CachedThreadPool:创建一个具有无界线程池的ExecutorService,它根据工作量自动调整大小。线程按需创建并用于后续任务。如果线程在一定时间内保持空闲,可能会被终止以减少资源消耗。

    在缓存线程池中,提交的任务不会排队,而是立即交给一个线程执行。如果没有可用的线程,将创建一个新线程。如果服务器负载过重,导致所有CPU完全被利用,并且有更多任务到达,将创建更多线程,这只会使情况变得更糟。线程的空闲时间默认为60秒,如果它们没有任何任务,线程将被终止。

    因此,在服务器负载过重的生产环境中,最好使用Executors.newFixedThreadPool,它给您提供了一个具有固定线程数的线程池,或者直接使用ThreadPoolExecutor类,以达到最大的控制。

    ExecutorService executor = Executors.newCachedThreadPool();


  3. SingleThreadExecutor:创建一个具有单个工作线程的ExecutorService。任务按照它们提交的顺序由此线程顺序执行。此执行器适用于需要串行执行或具有相互依赖的任务。

    ExecutorService executor = Executors.newSingleThreadExecutor();


  4. ScheduledThreadPool:创建一个ExecutorService,它可以在指定的延迟后或定期间隔后安排任务运行。它提供了使用固定延迟或固定速率调度任务的方法,允许周期性地执行任务。

  5. newWorkStealingPool:使用目标并行级别创建一个工作窃取线程池。此执行器基于ForkJoinPool,能够动态调整其线程池大小,以有效利用所有可用的处理器核心。

总的来说,Executors类简化了执行器实例的创建和管理。

ExecutorService

可以将任务提交给ExecutorService进行执行。这些任务通常是RunnableCallable的实例,表示需要异步执行的工作单元。

以下是ExecutorService中的方法。

   1. execute(Runnable command):异步执行给定的任务。

ExecutorService executor = Executors.newFixedThreadPool(5);
executor.execute(() -> {
    System.out.println("Task executed asynchronously");
});


 2. submit(Callable<T> task):提交一个任务以进行执行,返回表示任务待处理结果的Future

ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(() -> {
    // Task logic
    return 42;
});

  

 3. shutdown():启动有序关闭ExecutorService,允许之前提交的任务在终止之前执行。

 4. shutdownNow():尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务列表。

List<Runnable> pendingTasks = executor.shutdownNow();

  

 5. awaitTermination(long timeout, TimeUnit unit):阻塞直到所有任务在关闭请求后完成执行,或者超时发生,或者当前线程被中断,以先发生者为准。

boolean terminated = executor.awaitTermination(10, TimeUnit.SECONDS);
if (terminated) {
    System.out.println("All tasks have completed execution");
} else {
    System.out.println("Timeout occurred before all tasks completed");
}

  

 6. invokeAny(Collection<? extends Callable<T>> tasks):执行给定的任务,返回成功完成的一个任务的结果。当我们有多个任务要运行但只关心第一个完成的结果时,此方法很有用。所有其他任务将被终止。

ExecutorService executor = Executors.newCachedThreadPool();
Set<Callable<String>> callables = new HashSet<>();
callables.add(() -> "Task 1");
callables.add(() -> "Task 2");
String result = executor.invokeAny(callables);
System.out.println("Result: " + result);

  

 7. invokeAll(Collection<? extends Callable<T>> tasks):执行给定的任务,返回表示它们待处理结果的Future对象列表。

List<Callable<Integer>> tasks = Arrays.asList(() -> 1, () -> 2, () -> 3);
List<Future<Integer>> futures = executor.invokeAll(tasks);
for (Future<Integer> future : futures) {
    System.out.println("Result: " + future.get());
}


实现

ExecutorService接口通常由Java并发框架提供的各种类来实现,例如ThreadPoolExecutorScheduledThreadPoolExecutorForkJoinPool

ExecutiveService diagram

注意事项

  • 谨慎配置线程池大小,以避免利用不足或过度资源消耗。
  • 考虑因素如任务提交速率、任务优先级、资源约束以及队列溢出时的期望行为。选择最符合应用程序对可扩展性、性能和资源利用的队列类型。
  • 正确处理异常和任务取消,以确保鲁棒性和可靠性。
  • 了解并发代码中的并发语义和潜在线程安全问题。

要创建ExecutorService的实例,我们可以在创建线程池时传递ThreadFactory和任务队列。

ThreadFactory是一个用于创建新线程的接口。它提供了一种封装创建线程逻辑的方法,允许自定义线程创建行为。 ThreadFactory的主要目的是将线程创建过程与应用程序的其余逻辑解耦,从而更容易管理和自定义线程创建。最好传递自定义的Thread工厂,因为这有助于设置线程前缀和优先级(如果需要)。

static final String prefix = "app.name.task";
ExecutorService executorService = Executors.newFixedThreadPool(5, () -> {  
    Thread t = new Thread(r);
    t.setName(prefix + "-" + t.getId()); // Customize thread name if needed
    return t;
});


任务队列

当任务提交给ExecutorService时,如果池中没有可用线程来处理任务,它们将被存储在队列中,下面是不同队列选项。

  1. 无界队列:无界队列,比如LinkedBlockingQueue,没有固定容量,可以动态增长以容纳无限数量的任务。它适用于任务提交速率不可预测的情况,或者需要无限期排队任务而又不希望由于队列溢出而导致拒绝。但是,请记住,无界队列如果任务提交速率快于处理速率,可能会导致内存耗尽。
  2. 有界队列:有界队列,比如ArrayBlockingQueue,具有指定容量的固定大小限制,只能容纳有限数量的任务。适用于需要强制执行资源限制或背压机制以防止过度内存使用或系统超载的情况。当队列达到容量时,任务可能被拒绝或根据指定的拒绝策略进行处理。
  3. 优先级队列:优先级队列,比如PriorityBlockingQueue,基于优先级或指定比较器对任务进行排序。适用于任务具有不同重要性或紧急程度,并且需要在低优先级任务之前处理高优先级任务的情况。优先级队列确保按优先级顺序执行任务,而不考虑它们的提交顺序。
  4. 同步队列:同步队列,比如SynchronousQueue,是一种特殊类型的队列,可以实现生产者和消费者线程之间的一对一任务交接。容量为零,并且需要同时有生产者和消费者才能进行任务交换。适用于需要严格的线程同步和协调的情况,比如线程池之间的交接或有界资源访问。

定时线程池

ScheduledThreadPoolExecutorThreadPoolExecutor继承了线程池管理能力,并提供了按给定延迟或在定义的时间间隔后定期运行任务的功能。以下是详细解释:

  • RunnableCallable任务:您可以使用这些接口定义要调度的任务,类似于常规的ExecutorService
  • ScheduledFuture:该接口代表了一个已安排任务的结果。它允许在执行前检查任务的完成状态,取消任务,并在Callable任务中获取结果。

调度能力

  • schedule(Runnable task, long delay, TimeUnit unit)按给定时间单位(例如秒、毫秒)安排执行Runnable任务。
  • scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)按固定速率执行Runnable任务。在initialDelay后首次执行任务,然后在它们之间以恒定的period执行。
  • scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)按固定延迟执行Runnable任务。类似于scheduleAtFixedRate,但delay是上一次执行完成后和下一次开始之间的时间。

关键注意事项

  • 线程池管理:ScheduledThreadPoolExecutor默认维护一个固定大小的线程池。您可以在对象创建时配置池大小。
  • 延迟执行:预定的任务不能保证精确执行。实际的执行时间可能略有不同,这取决于线程可用性和工作负载等因素。
  • 错过的执行:使用固定速率调度,如果任务执行时间超过period,则可能会跳过后续执行以保持固定速率。
  • 取消:您可以使用返回的ScheduledFuture对象的cancel方法来取消预定任务。但是,取消的成功取决于任务的状态(尚未开始、正在运行等)。
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPoolExample {

    public static void main(String[] args) throws InterruptedException {

        // Create a ScheduledThreadPoolExecutor with 2 threads
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);

        // Schedule a task with a 2-second delay
        Runnable task1 = () -> System.out.println("Executing task 1 after a delay");
        scheduler.schedule(task1, 2, TimeUnit.SECONDS);

        // Schedule a task to run every 5 seconds with a fixed rate
        Runnable task2 = () -> System.out.println("Executing task 2 at fixed rate");
        scheduler.scheduleAtFixedRate(task2, 1, 5, TimeUnit.SECONDS);

        // Schedule a task to run every 3 seconds with a fixed delay
        Runnable task3 = () -> System.out.println("Executing task 3 with fixed delay");
        scheduler.scheduleWithFixedDelay(task3, 0, 3, TimeUnit.SECONDS);

        // Wait for some time to allow tasks to be executed
        Thread.sleep(15000);

        // Shutdown the scheduler
        scheduler.shutdown();
    }
}


优雅地关闭 ExecutorService

要有效地关闭 ExecutorService,您可以按照以下步骤进行:

  1. 调用 shutdown() 方法来启动关闭过程。此方法允许先前提交的任务在终止之前执行,但阻止提交新任务。
  2. 如果要强制 ExecutorService 立即终止,则调用 shutdownNow() 方法。该方法尝试停止所有正在执行的任务,停止处理等待任务,并返回一组等待执行但从未启动的任务列表。
  3. 通过调用 awaitTermination() 方法来等待终止。该方法阻塞,直到所有任务在关闭请求后完成执行,或者超时发生,或者当前线程被中断,以先发生者为准。

以下是一个示例:

ExecutorService executor = Executors.newFixedThreadPool(10);

// Execute tasks using the executor

// Shutdown the executor
executor.shutdown();

try {
    // Wait for all tasks to complete or timeout after a certain period
    if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
        // If the timeout occurs, force shutdown
        executor.shutdownNow();
        // Optionally, wait for the tasks to be forcefully terminated
        if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
            // Log a message indicating that some tasks failed to terminate
        }
    }
} catch (InterruptedException ex) {
    // Log interruption exception
    executor.shutdownNow();
    // Preserve interrupt status
    Thread.currentThread().interrupt();
}


总之,ExecutorService 是一个多功能的框架,可帮助开发人员编写高效、可扩展和易于维护的并发代码。

推荐阅读: 程序员经验分享

本文链接: 深入了解Java Executor框架