Spring @Async 与自定义线程池实战指南
一、概述
@Async是 Spring 提供的异步执行注解,可以将方法的执行从调用线程转移到独立的线程池中,实现非阻塞调用。常用于日志记录、消息通知、数据同步等不需要同步等待结果的场景。
但@Async如果不配合自定义线程池使用,会带来严重的生产隐患。本文详细介绍@Async的使用方式、为什么必须自定义线程池、线程池参数如何设计,以及常见陷阱。
二、示例场景
一个订单系统,支付成功后需要:
- 更新订单状态(同步,必须成功)
- 发送短信通知(异步,失败不影响主流程)
- 记录操作日志(异步,失败不影响主流程)
注:
博客:
https://blog.csdn.net/badao_liumang_qizhi
三、@Async 基本使用
3.1 启用异步支持
@Configuration@EnableAsyncpublicclassAsyncConfig{// 启用 @Async 注解支持}3.2 标记异步方法
@ServicepublicclassNotificationService{@AsyncpublicvoidsendSmsNotification(Stringphone,Stringcontent){// 这个方法会在独立线程中执行,不阻塞调用方smsClient.send(phone,content);}@AsyncpublicvoidsaveOperationLog(StringuserId,Stringaction){OperationLoglog=newOperationLog();log.setUserId(userId);log.setAction(action);log.setCreateTime(newDate());operationLogRepository.save(log);}}3.3 调用方
@ServicepublicclassOrderPaymentService{@ResourceprivateNotificationServicenotificationService;@Transactional(rollbackFor=Exception.class)publicvoidprocessPayment(PaymentDtodto){// 同步:更新订单状态Orderorder=orderRepository.findByOrderNo(dto.getOrderNo());order.setStatus("PAID");orderRepository.save(order);// 异步:发送通知(不阻塞主流程)notificationService.sendSmsNotification(dto.getPhone(),"支付成功");// 异步:记录日志(不阻塞主流程)notificationService.saveOperationLog(dto.getUserId(),"PAYMENT_SUCCESS");}}3.4 带返回值的异步方法
@ServicepublicclassReportService{@AsyncpublicFuture<ReportResult>generateReport(StringreportId){// 耗时操作ReportResultresult=doHeavyComputation(reportId);returnnewAsyncResult<>(result);}// Java 8+ 推荐使用 CompletableFuture@AsyncpublicCompletableFuture<ReportResult>generateReportV2(StringreportId){ReportResultresult=doHeavyComputation(reportId);returnCompletableFuture.completedFuture(result);}}// 调用方获取结果@ServicepublicclassReportController{@ResourceprivateReportServicereportService;publicReportResultgetReport(StringreportId)throwsException{CompletableFuture<ReportResult>future=reportService.generateReportV2(reportId);// 阻塞等待结果(最多等 30 秒)returnfuture.get(30,TimeUnit.SECONDS);}}四、为什么必须自定义线程池
4.1 默认线程池的问题
如果不指定线程池,Spring 默认使用SimpleAsyncTaskExecutor:
// SimpleAsyncTaskExecutor 的行为:// 1. 每次调用都创建一个新线程// 2. 没有线程复用// 3. 没有队列缓冲// 4. 没有最大线程数限制生产事故场景:
高并发下: - 每秒 1000 个请求,每个请求触发 2 个 @Async 调用 - 每秒创建 2000 个新线程 - 线程数持续增长,无上限 - 最终 OOM 或系统线程数耗尽,整个服务崩溃4.2 Spring Boot 2.1+ 的默认行为
Spring Boot 2.1+ 自动配置了ThreadPoolTaskExecutor,但默认参数可能不适合你的场景:
# Spring Boot 默认配置 spring.task.execution.pool.core-size=8 spring.task.execution.pool.max-size=Integer.MAX_VALUE # 无限制! spring.task.execution.pool.queue-capacity=Integer.MAX_VALUE # 无限制! spring.task.execution.pool.keep-alive=60s问题:max-size和queue-capacity都是无限制,高并发下仍可能 OOM。
4.3 自定义线程池的必要性
| 问题 | 默认行为 | 自定义解决 |
|---|---|---|
| 线程数无限增长 | 可能 OOM | 设置 maxPoolSize 上限 |
| 队列无限堆积 | 内存持续增长 | 设置 queueCapacity 上限 |
| 任务被丢弃无感知 | 静默失败 | 配置拒绝策略 |
| 线程无法区分来源 | 排查困难 | 设置线程名前缀 |
| 不同业务互相影响 | 共用一个池 | 按业务隔离线程池 |
五、自定义线程池配置
5.1 基础配置
@Configuration@EnableAsyncpublicclassAsyncConfig{@Bean("logExecutor")publicExecutorlogExecutor(){ThreadPoolTaskExecutorexecutor=newThreadPoolTaskExecutor();executor.setCorePoolSize(2);// 核心线程数executor.setMaxPoolSize(5);// 最大线程数executor.setQueueCapacity(1000);// 队列容量executor.setKeepAliveSeconds(60);// 空闲线程存活时间executor.setThreadNamePrefix("log-async-");// 线程名前缀executor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());executor.initialize();returnexecutor;}@Bean("smsExecutor")publicExecutorsmsExecutor(){ThreadPoolTaskExecutorexecutor=newThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(10);executor.setQueueCapacity(500);executor.setKeepAliveSeconds(60);executor.setThreadNamePrefix("sms-async-");executor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());executor.initialize();returnexecutor;}}5.2 指定使用哪个线程池
@ServicepublicclassNotificationService{// 使用日志专用线程池@Async("logExecutor")publicvoidsaveOperationLog(StringuserId,Stringaction){...}// 使用短信专用线程池@Async("smsExecutor")publicvoidsendSmsNotification(Stringphone,Stringcontent){...}}六、线程池参数设计
6.1 核心参数说明
任务提交流程: 1. 线程数 < corePoolSize -> 创建新线程执行 2. 线程数 >= corePoolSize -> 放入队列 3. 队列满 + 线程数 < maxPoolSize -> 创建新线程执行 4. 队列满 + 线程数 >= maxPoolSize -> 执行拒绝策略6.2 参数设计原则
| 场景 | corePoolSize | maxPoolSize | queueCapacity | 说明 |
|---|---|---|---|---|
| IO 密集型(日志、HTTP调用) | CPU核数 * 2 | CPU核数 * 4 | 500~2000 | IO 等待时间长,需要更多线程 |
| CPU 密集型(计算、加密) | CPU核数 | CPU核数 + 1 | 100~500 | 线程过多反而增加上下文切换 |
| 低频任务(定时报表) | 1~2 | 3~5 | 100 | 不需要太多资源 |
| 高频轻量任务(日志记录) | 2~4 | 5~10 | 1000~5000 | 队列大一些缓冲突发流量 |
6.3 获取 CPU 核数
intcpuCores=Runtime.getRuntime().availableProcessors();executor.setCorePoolSize(cpuCores*2);executor.setMaxPoolSize(cpuCores*4);七、拒绝策略
7.1 四种内置策略
| 策略 | 行为 | 适用场景 |
|---|---|---|
| CallerRunsPolicy | 由调用线程执行任务 | 不丢弃任务,但会阻塞调用方 |
| AbortPolicy | 抛出 RejectedExecutionException | 默认策略,需要调用方处理异常 |
| DiscardPolicy | 静默丢弃任务 | 允许丢失的非关键任务 |
| DiscardOldestPolicy | 丢弃队列中最老的任务 | 只关心最新数据的场景 |
7.2 推荐选择
// 日志记录场景:CallerRunsPolicy// 队列满时由调用线程执行,保证日志不丢失,但可能短暂阻塞主线程executor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());// 通知场景:DiscardPolicy 或自定义// 通知丢失可接受,不能阻塞主流程executor.setRejectedExecutionHandler(newThreadPoolExecutor.DiscardPolicy());7.3 自定义拒绝策略
// 记录被拒绝的任务到文件日志,便于后续补偿executor.setRejectedExecutionHandler((runnable,poolExecutor)->{log.error("线程池已满,任务被拒绝: {}",runnable.toString());// 可以写入文件、发送告警等});八、@Async 的常见陷阱
8.1 自调用不生效
@ServicepublicclassOrderService{// 不生效!同类内部调用不走代理publicvoidmethodA(){this.asyncMethod();// 直接调用,不会异步执行}@AsyncpublicvoidasyncMethod(){...}}解决方案:与 @Transactional 相同,拆分到不同 Bean 或注入自身代理。
8.2 异常被吞
@AsyncpublicvoidasyncMethod(){// 如果这里抛异常,调用方完全感知不到// 异常会被线程池吞掉,只在日志中可见thrownewRuntimeException("异步方法异常");}解决方案:
// 方案1:方法内 try-catch@AsyncpublicvoidasyncMethod(){try{// 业务逻辑}catch(Exceptione){log.error("异步任务执行失败",e);}}// 方案2:全局异常处理器@ConfigurationpublicclassAsyncConfigimplementsAsyncConfigurer{@OverridepublicAsyncUncaughtExceptionHandlergetAsyncUncaughtExceptionHandler(){return(throwable,method,params)->{log.error("异步方法 {} 执行异常: {}",method.getName(),throwable.getMessage());};}}8.3 事务不传播
@ServicepublicclassOrderService{@Transactional(rollbackFor=Exception.class)publicvoidprocess(){orderRepository.save(order);// 异步方法在新线程中执行,不在当前事务中!// 如果主事务回滚,异步方法的数据库操作不会回滚asyncService.saveLog(order.getId());}}原因:事务绑定在 ThreadLocal 中,新线程没有事务上下文。
解决:异步方法如果需要事务,必须自己开启独立事务。
8.4 方法必须是 public
// 不生效!private 方法无法被代理@AsyncprivatevoidasyncMethod(){...}// 不生效!static 方法无法被代理@AsyncpublicstaticvoidasyncMethod(){...}8.5 返回值为 void 时无法感知失败
// 调用方无法知道异步方法是否成功@AsyncpublicvoidfireAndForget(){// 如果失败,调用方完全不知道}// 如果需要感知结果,使用 Future/CompletableFuture@AsyncpublicCompletableFuture<Boolean>asyncWithResult(){try{doSomething();returnCompletableFuture.completedFuture(true);}catch(Exceptione){returnCompletableFuture.completedFuture(false);}}九、线程池监控
9.1 暴露线程池指标
@ComponentpublicclassThreadPoolMonitor{@Resource@Qualifier("logExecutor")privateThreadPoolTaskExecutorlogExecutor;// 定时打印线程池状态@Scheduled(fixedRate=60000)publicvoidmonitor(){ThreadPoolExecutorpool=logExecutor.getThreadPoolExecutor();log.info("线程池状态 - 活跃线程:{}, 核心线程:{}, 最大线程:{}, 队列大小:{}, 已完成:{}",pool.getActiveCount(),pool.getCorePoolSize(),pool.getMaximumPoolSize(),pool.getQueue().size(),pool.getCompletedTaskCount());}}9.2 关键监控指标
| 指标 | 含义 | 告警阈值建议 |
|---|---|---|
| activeCount | 当前活跃线程数 | > maxPoolSize * 0.8 |
| queueSize | 队列中等待的任务数 | > queueCapacity * 0.8 |
| completedTaskCount | 已完成任务总数 | 用于计算吞吐量 |
| rejectedCount | 被拒绝的任务数 | > 0 需要告警 |
十、优雅关闭
10.1 问题
应用关闭时,线程池中可能还有未完成的任务。如果直接关闭,任务会丢失。
10.2 配置优雅关闭
@Bean("logExecutor")publicExecutorlogExecutor(){ThreadPoolTaskExecutorexecutor=newThreadPoolTaskExecutor();executor.setCorePoolSize(2);executor.setMaxPoolSize(5);executor.setQueueCapacity(1000);executor.setThreadNamePrefix("log-async-");// 优雅关闭配置executor.setWaitForTasksToCompleteOnShutdown(true);// 等待任务完成executor.setAwaitTerminationSeconds(60);// 最多等待 60 秒executor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());executor.initialize();returnexecutor;}十一、业务隔离
11.1 为什么要隔离
// 反例:所有异步任务共用一个线程池// 如果日志写入大量堆积,会影响短信发送@AsyncpublicvoidsaveLog(){...}@AsyncpublicvoidsendSms(){...}11.2 按业务划分线程池
@Configuration@EnableAsyncpublicclassAsyncConfig{// 日志线程池:低优先级,队列大@Bean("logExecutor")publicExecutorlogExecutor(){ThreadPoolTaskExecutorexecutor=newThreadPoolTaskExecutor();executor.setCorePoolSize(2);executor.setMaxPoolSize(5);executor.setQueueCapacity(2000);executor.setThreadNamePrefix("log-");executor.setRejectedExecutionHandler(newThreadPoolExecutor.DiscardOldestPolicy());executor.initialize();returnexecutor;}// 通知线程池:高优先级,队列小@Bean("notifyExecutor")publicExecutornotifyExecutor(){ThreadPoolTaskExecutorexecutor=newThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(10);executor.setQueueCapacity(200);executor.setThreadNamePrefix("notify-");executor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());executor.initialize();returnexecutor;}// 计算线程池:CPU 密集型@Bean("computeExecutor")publicExecutorcomputeExecutor(){intcpuCores=Runtime.getRuntime().availableProcessors();ThreadPoolTaskExecutorexecutor=newThreadPoolTaskExecutor();executor.setCorePoolSize(cpuCores);executor.setMaxPoolSize(cpuCores+1);executor.setQueueCapacity(100);executor.setThreadNamePrefix("compute-");executor.setRejectedExecutionHandler(newThreadPoolExecutor.AbortPolicy());executor.initialize();returnexecutor;}}十二、方案对比总结
| 维度 | 默认线程池 | 自定义单线程池 | 按业务隔离线程池 |
|---|---|---|---|
| 实现复杂度 | 零配置 | 低 | 中 |
| 资源可控性 | 无 | 有 | 精细 |
| 业务隔离 | 无 | 无 | 有 |
| 故障影响范围 | 全局 | 全局 | 局部 |
| 监控粒度 | 无 | 整体 | 按业务 |
| 推荐程度 | 禁止生产使用 | 小项目可用 | 生产推荐 |
十三、最佳实践清单
- 禁止使用默认线程池:生产环境必须自定义线程池
- 设置线程名前缀:方便日志排查和线程 dump 分析
- 按业务隔离线程池:避免不同业务互相影响
- 合理设置队列容量:不要用无界队列(Integer.MAX_VALUE)
- 选择合适的拒绝策略:CallerRunsPolicy 最安全,不丢任务
- 异步方法内必须 try-catch:防止异常被吞
- 配置优雅关闭:
setWaitForTasksToCompleteOnShutdown(true) - 监控线程池状态:活跃线程数、队列大小、拒绝次数
- 注意事务不传播:异步方法需要自己管理事务
- 避免自调用:@Async 方法必须通过代理调用才生效