乐闻世界logo
搜索文章和话题

Spring Boot 中如何实现异步编程?

3月6日 21:58

Spring Boot 异步编程详解

为什么需要异步编程

  • 提高吞吐量:不阻塞主线程,处理更多请求
  • 改善响应时间:耗时操作后台执行,快速响应用户
  • 资源利用:充分利用 CPU 多核特性
  • 解耦操作:发送消息、记录日志等非核心操作异步化

实现方式一:@Async 注解(最常用)

1. 开启异步支持

java
@Configuration @EnableAsync public class AsyncConfig { /** * 自定义线程池 */ @Bean("taskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 核心线程数 executor.setCorePoolSize(5); // 最大线程数 executor.setMaxPoolSize(20); // 队列容量 executor.setQueueCapacity(100); // 线程名称前缀 executor.setThreadNamePrefix("async-task-"); // 拒绝策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 等待所有任务完成后再关闭线程池 executor.setWaitForTasksToCompleteOnShutdown(true); // 等待时间 executor.setAwaitTerminationSeconds(60); executor.initialize(); return executor; } /** * 邮件发送专用线程池 */ @Bean("mailExecutor") public Executor mailExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(2); executor.setMaxPoolSize(5); executor.setQueueCapacity(50); executor.setThreadNamePrefix("mail-task-"); executor.initialize(); return executor; } }

2. 使用 @Async 注解

java
@Service @Slf4j public class AsyncService { /** * 默认线程池执行 */ @Async public void sendNotification(String userId, String message) { log.info("发送通知给用户: {}, 线程: {}", userId, Thread.currentThread().getName()); // 模拟耗时操作 try { Thread.sleep(2000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } log.info("通知发送完成: {}", userId); } /** * 指定线程池 */ @Async("mailExecutor") public void sendEmail(String to, String subject, String content) { log.info("发送邮件到: {}, 线程: {}", to, Thread.currentThread().getName()); // 邮件发送逻辑 } /** * 带返回值的异步方法 */ @Async public CompletableFuture<String> processDataAsync(String data) { log.info("异步处理数据: {}, 线程: {}", data, Thread.currentThread().getName()); try { Thread.sleep(3000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return CompletableFuture.completedFuture("处理结果: " + data); } /** * 批量异步处理 */ @Async public CompletableFuture<Integer> calculateAsync(int number) { int result = number * number; return CompletableFuture.completedFuture(result); } }

3. 调用异步方法

java
@RestController @RequestMapping("/async") @RequiredArgsConstructor public class AsyncController { private final AsyncService asyncService; @GetMapping("/notify") public Result<Void> sendNotification(@RequestParam String userId) { // 立即返回,不等待异步操作完成 asyncService.sendNotification(userId, "您有一条新消息"); return Result.success("通知发送中"); } @GetMapping("/process") public Result<String> processData(@RequestParam String data) throws Exception { // 调用异步方法并等待结果 CompletableFuture<String> future = asyncService.processDataAsync(data); // 设置超时 String result = future.get(5, TimeUnit.SECONDS); return Result.success(result); } @GetMapping("/batch") public Result<List<Integer>> batchProcess() throws Exception { // 批量异步处理 List<CompletableFuture<Integer>> futures = IntStream.range(1, 11) .mapToObj(asyncService::calculateAsync) .collect(Collectors.toList()); // 等待所有任务完成 CompletableFuture<Void> allFutures = CompletableFuture.allOf( futures.toArray(new CompletableFuture[0]) ); // 获取所有结果 allFutures.join(); List<Integer> results = futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); return Result.success(results); } }

实现方式二:CompletableFuture(Java 8+)

java
@Service @Slf4j public class CompletableFutureService { private final ExecutorService executor = Executors.newFixedThreadPool(10); /** * 异步执行并组合结果 */ public CompletableFuture<String> combineAsyncTasks() { CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> { log.info("执行任务1"); sleep(1000); return "结果1"; }, executor); CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> { log.info("执行任务2"); sleep(1500); return "结果2"; }, executor); // 组合两个任务的结果 return task1.thenCombine(task2, (result1, result2) -> { return result1 + " + " + result2; }); } /** * 异步任务链 */ public CompletableFuture<String> taskChain(String input) { return CompletableFuture.supplyAsync(() -> { log.info("步骤1: 处理输入"); return input.toUpperCase(); }, executor) .thenApplyAsync(result -> { log.info("步骤2: 添加前缀"); return "PREFIX_" + result; }, executor) .thenApplyAsync(result -> { log.info("步骤3: 添加后缀"); return result + "_SUFFIX"; }, executor) .exceptionally(ex -> { log.error("任务链异常: {}", ex.getMessage()); return "DEFAULT_VALUE"; }); } /** * 并行处理多个任务 */ public List<String> parallelProcess(List<String> inputs) { List<CompletableFuture<String>> futures = inputs.stream() .map(input -> CompletableFuture.supplyAsync(() -> processSingle(input), executor)) .collect(Collectors.toList()); // 等待所有完成并收集结果 return futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); } private String processSingle(String input) { sleep(500); return "Processed: " + input; } /** * 带超时的异步操作 */ public String asyncWithTimeout() { try { return CompletableFuture.supplyAsync(() -> { sleep(5000); // 模拟耗时操作 return "结果"; }, executor) .orTimeout(3, TimeUnit.SECONDS) // Java 9+ .exceptionally(ex -> "超时默认值") .get(); } catch (Exception e) { return "异常: " + e.getMessage(); } } private void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }

实现方式三:Spring WebFlux(响应式编程)

java
@Configuration @EnableWebFlux public class WebFluxConfig { } @RestController @RequestMapping("/reactive") public class ReactiveController { @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> streamData() { return Flux.interval(Duration.ofSeconds(1)) .map(seq -> "Data " + seq + " at " + LocalTime.now()) .take(10); } @GetMapping("/mono/{id}") public Mono<ResponseEntity<User>> getUser(@PathVariable Long id) { return Mono.fromCallable(() -> findUserById(id)) .map(ResponseEntity::ok) .defaultIfEmpty(ResponseEntity.notFound().build()); } @PostMapping("/users") public Mono<User> createUser(@RequestBody Mono<User> userMono) { return userMono.flatMap(this::saveUser); } @GetMapping("/users") public Flux<User> getAllUsers() { return Flux.fromIterable(findAllUsers()); } // 模拟数据库操作 private User findUserById(Long id) { // 模拟异步数据库查询 return new User(id, "User" + id); } private Mono<User> saveUser(User user) { return Mono.just(user) .map(u -> { u.setId(System.currentTimeMillis()); return u; }); } private List<User> findAllUsers() { return Arrays.asList( new User(1L, "User1"), new User(2L, "User2"), new User(3L, "User3") ); } } @Data @AllArgsConstructor class User { private Long id; private String name; }

实现方式四:DeferredResult(Servlet 异步)

java
@RestController @RequestMapping("/deferred") @RequiredArgsConstructor public class DeferredResultController { private final AsyncService asyncService; @GetMapping("/task") public DeferredResult<String> asyncTask() { DeferredResult<String> deferredResult = new DeferredResult<>(10000L, "超时"); // 异步处理 new Thread(() -> { try { Thread.sleep(3000); deferredResult.setResult("任务完成"); } catch (InterruptedException e) { deferredResult.setErrorResult("任务中断"); } }).start(); // 超时回调 deferredResult.onTimeout(() -> { System.out.println("任务超时"); }); // 完成回调 deferredResult.onCompletion(() -> { System.out.println("任务完成回调"); }); return deferredResult; } @GetMapping("/callable") public Callable<String> callableTask() { return () -> { Thread.sleep(3000); return "Callable 任务完成"; }; } }

异步事务处理

java
@Service @Slf4j public class AsyncTransactionalService { /** * 异步方法中的事务 * 注意:@Async 和 @Transactional 可以一起使用 */ @Async @Transactional public CompletableFuture<Void> asyncTransactionalOperation() { // 事务操作 log.info("异步事务操作,线程: {}", Thread.currentThread().getName()); return CompletableFuture.completedFuture(null); } /** * 在异步方法中调用事务方法 */ @Async public CompletableFuture<Void> callTransactionalMethod() { // 调用另一个类的 @Transactional 方法 transactionalService.doTransaction(); return CompletableFuture.completedFuture(null); } }

异步异常处理

java
@Configuration @Slf4j public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler { @Override public void handleUncaughtException(Throwable ex, Method method, Object... params) { log.error("异步方法异常 - 方法: {}, 参数: {}", method.getName(), params, ex); // 可以在这里发送告警、记录日志等 sendAlert(method.getName(), ex.getMessage()); } private void sendAlert(String methodName, String errorMessage) { // 发送告警通知 log.warn("发送告警: 方法 {} 执行失败,错误: {}", methodName, errorMessage); } } // 在配置类中注册 @Configuration @EnableAsync public class AsyncConfig implements AsyncConfigurer { @Override @Bean(name = "taskExecutor") public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(20); executor.setQueueCapacity(100); executor.setThreadNamePrefix("async-"); executor.initialize(); return executor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new AsyncExceptionHandler(); } }

线程池监控

java
@Component @Slf4j public class ThreadPoolMonitor { @Autowired @Qualifier("taskExecutor") private ThreadPoolTaskExecutor taskExecutor; @Scheduled(fixedRate = 60000) // 每分钟执行 public void monitor() { ThreadPoolExecutor executor = taskExecutor.getThreadPoolExecutor(); log.info("线程池状态 - 核心线程数: {}, 活跃线程数: {}, 完成任务数: {}, " + "队列任务数: {}, 总任务数: {}", executor.getCorePoolSize(), executor.getActiveCount(), executor.getCompletedTaskCount(), executor.getQueue().size(), executor.getTaskCount()); } }

最佳实践

1. 线程池配置建议

java
@Bean("ioIntensiveExecutor") public Executor ioIntensiveExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // IO 密集型:核心线程数可以设置较大 executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2); executor.setMaxPoolSize(100); executor.setQueueCapacity(1000); executor.setThreadNamePrefix("io-task-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } @Bean("cpuIntensiveExecutor") public Executor cpuIntensiveExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // CPU 密集型:核心线程数 = CPU 核心数 + 1 executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() + 1); executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() + 1); executor.setQueueCapacity(100); executor.setThreadNamePrefix("cpu-task-"); executor.initialize(); return executor; }

2. 避免异步陷阱

java
@Service @Slf4j public class AsyncPitfallService { /** * ❌ 错误:同类中调用 @Async 方法不会异步执行 */ public void wrongAsyncCall() { // 这样调用不会走代理,不会异步执行 this.asyncMethod(); } @Async public void asyncMethod() { // 异步逻辑 } /** * ✅ 正确:通过注入的代理对象调用 */ @Autowired private AsyncPitfallService self; public void correctAsyncCall() { // 通过代理对象调用 self.asyncMethod(); } }

3. 异步方法返回值

java
@Service public class AsyncReturnService { /** * ❌ 不推荐:返回 void,无法知道执行结果 */ @Async public void fireAndForget() { // 执行操作 } /** * ✅ 推荐:返回 CompletableFuture,可以获取结果和异常 */ @Async public CompletableFuture<String> asyncWithResult() { try { // 执行操作 return CompletableFuture.completedFuture("success"); } catch (Exception e) { CompletableFuture<String> future = new CompletableFuture<>(); future.completeExceptionally(e); return future; } } }

总结

方式适用场景优点缺点
@Async简单异步任务简单易用功能有限
CompletableFuture复杂异步编排功能强大、可组合学习曲线
WebFlux高并发响应式非阻塞、高性能编程模型不同
DeferredResultServlet 异步兼容传统 MVC较底层

选择建议:

  • 简单后台任务:@Async
  • 复杂异步流程:CompletableFuture
  • 高并发 API:WebFlux
  • 传统项目升级:DeferredResult
标签:Spring Boot