震撼!通过双重异步,Excel 10万行数据导入从191秒优化到2秒!
在现代的企业级应用开发中,海量数据的处理效率和并发性能优化是一个非常重要的课题。无论是大规模数据导入、文件解析,还是在分布式系统中处理高并发任务,如何提升系统的处理速度、合理利用计算资源、减少线程上下文切换的开销,这些都是开发者必须面对的问题。在这一背景下,线程池技术以及异步编程逐渐成为提升系统性能的利器。
本文将深入探讨如何通过合理设计线程池和利用异步编程模型,有效优化大规模数据的处理性能。我们将结合 Spring Boot 框架中的 @Async 注解、自定义线程池、以及通过使用 EasyExcel 进行大数据量的 Excel 解析和异步写入数据库的场景,详细说明如何通过分而治之的策略,减少系统的响应时间、提高并发处理能力。同时,还将分析如何基于 CPU 和 IO 密集型任务的特性,来合理设置线程池的核心线程数、最大线程数等参数,以便在实际项目中能够充分发挥硬件资源的性能。
通常我是这样做的:
-
使用POI读取需要导入的Excel文件;
-
将文件名作为表名,列标题作为列名,并将数据拼接成SQL语句;
-
通过JDBC或Mybatis插入到数据库。
在操作中,如果文件数量多且数据量大,处理过程可能会非常缓慢。
访问后,感觉程序没有响应,但实际上,它正在读取并插入数据,只是速度很慢。
读取包含10万行的Excel文件竟然耗时191秒!
我以为程序卡住了!
private void readXls(String filePath, String filename) throws Exception {@SuppressWarnings("resource")XSSFWorkbook xssfWorkbook = new XSSFWorkbook(new FileInputStream(filePath));// 读取第一个工作表XSSFSheet sheet = xssfWorkbook.getSheetAt(0);// 获取总行数int maxRow = sheet.getLastRowNum();StringBuilder insertBuilder = new StringBuilder();insertBuilder.append("insert into ").append(filename).append(" ( UUID,");XSSFRow row = sheet.getRow(0);for (int i = 0; i < row.getPhysicalNumberOfCells(); i++) {insertBuilder.append(row.getCell(i)).append(",");}insertBuilder.deleteCharAt(insertBuilder.length() - 1);insertBuilder.append(" ) values ( ");StringBuilder stringBuilder = new StringBuilder();for (int i = 1; i <= maxRow; i++) {XSSFRow xssfRow = sheet.getRow(i);String id = "";String name = "";for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {if (j == 0) {id = xssfRow.getCell(j) + "";} else if (j == 1) {name = xssfRow.getCell(j) + "";}}boolean flag = isExisted(id, name);if (!flag) {stringBuilder.append(insertBuilder);stringBuilder.append('\'').append(uuid()).append('\'').append(",");for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {stringBuilder.append('\'').append(value).append('\'').append(",");}stringBuilder.deleteCharAt(stringBuilder.length() - 1);stringBuilder.append(" )").append("\n");}}List<String> collect = Arrays.stream(stringBuilder.toString().split("\n")).collect(Collectors.toList());int sum = JdbcUtil.executeDML(collect);
}private static boolean isExisted(String id, String name) {String sql = "select count(1) as num from " + static_TABLE + " where ID = '" + id + "' and NAME = '" + name + "'";String num = JdbcUtil.executeSelect(sql, "num");return Integer.valueOf(num) > 0;
}private static String uuid() {return UUID.randomUUID().toString().replace("-", "");
}
如何优化?
优化1:首先,查询所有数据,将其缓存到map中,然后在插入前做决策。这样可以大大提高速度。
优化2:如果单个Excel文件太大,可以考虑使用异步和多线程,分批读取多行并插入数据库。
优化3:如果文件太多,可以为每个Excel文件使用一个异步进程,实现双重异步读取和插入。
使用双重异步处理后,从191秒优化到了2秒,你能相信吗?
以下是异步读取Excel文件和批量读取大Excel文件的关键代码。
异步读取缓存的Excel Controller类
@RequestMapping(value = "/readExcelCacheAsync", method = RequestMethod.POST)
@ResponseBody
public String readExcelCacheAsync() {String path = "G:\\Test\\data\\";try {// 读取Excel之前,缓存所有数据USER_INFO_SET = getUserInfo();File file = new File(path);String[] xlsxArr = file.list();for (int i = 0; i < xlsxArr.length; i++) {File fileTemp = new File(path + "\\" + xlsxArr[i]);String filename = fileTemp.getName().replace(".xlsx", "");readExcelCacheAsyncService.readXls(path + filename + ".xlsx", filename);}} catch (Exception e) {logger.error("|#ReadDBCsv|#Exception: ", e);return "error";}return "success";
}
批量读取超大Excel文件
@Async("async-executor")
public void readXls(String filePath, String filename) throws Exception {@SuppressWarnings("resource")XSSFWorkbook xssfWorkbook = new XSSFWorkbook(new FileInputStream(filePath));// 读取第一个工作表XSSFSheet sheet = xssfWorkbook.getSheetAt(0);// 总行数int maxRow = sheet.getLastRowNum();logger.info(filename + ".xlsx,共 " + maxRow + " 行数据!");StringBuilder insertBuilder = new StringBuilder();insertBuilder.append("insert into ").append(filename).append(" ( UUID,");XSSFRow row = sheet.getRow(0);for (int i = 0; i < row.getPhysicalNumberOfCells(); i++) {insertBuilder.append(row.getCell(i)).append(",");}insertBuilder.deleteCharAt(insertBuilder.length() - 1);insertBuilder.append(" ) values ( ");int times = maxRow / STEP + 1;for (int time = 0; time < times; time++) {int start = STEP * time + 1;int end = STEP * time + STEP;if (time == times - 1) {end = maxRow;}if (end + 1 - start > 0) {readExcelDataAsyncService.readXlsCacheAsyncMybatis(sheet, row, start, end, insertBuilder);}}
}
异步批量插入数据库
@Async("async-executor")
public void readXlsCacheAsync(XSSFSheet sheet, XSSFRow row, int start, int end, StringBuilder insertBuilder) {StringBuilder stringBuilder = new StringBuilder();for (int i = start; i <= end; i++) {XSSFRow xssfRow = sheet.getRow(i);String id = "";String name = "";for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {if (j == 0) {id = xssfRow.getCell(j) + "";} else if (j == 1) {name = xssfRow.getCell(j) + "";}}// 在读取Excel之前,先缓存所有数据,然后做决策boolean flag = isExisted(id, name);if (!flag) {stringBuilder.append(insertBuilder);stringBuilder.append('\'').append(uuid()).append('\'').append(",");for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {stringBuilder.append('\'').append(value).append('\'').append(",");}stringBuilder.deleteCharAt(stringBuilder.length() - 1);stringBuilder.append(" )").append("\n");}}List<String> collect = Arrays.stream(stringBuilder.toString().split("\n")).collect(Collectors.toList());if (collect != null && collect.size() > 0) {int sum = JdbcUtil.executeDML(collect);}
}private boolean isExisted(String id, String name) {return ReadExcelCacheAsyncController.USER_INFO_SET.contains(id + "," + name);
}
异步线程池工具类
@Async 的目的是异步处理任务。
-
在方法上添加 @Async 表明该方法是异步的。
-
在类上添加 @Async 表示该类中的所有方法都是异步的。
-
使用此注解的类必须由 Spring 管理。
-
必须在启动类或配置类中添加 @EnableAsync 注解,@Async 才能生效。
在使用 @Async 时,如果不指定线程池的名称,即不自定义线程池,默认会使用一个线程池。这个默认线程池是 Spring 的 SimpleAsyncTaskExecutor。
默认线程池的默认配置如下:
-
默认核心线程数:8。
-
最大线程数:Integer.MAX_VALUE。
-
队列类型:LinkedBlockingQueue。
-
容量:Integer.MAX_VALUE。
-
空闲线程保留时间:60秒。
-
线程池拒绝策略:AbortPolicy。
从最大线程数可以看出,在并发情况下,线程会无限制地创建。
你也可以通过 yml 文件重新配置:
spring:task:execution:pool:max-size: 10core-size: 5keep-alive: 3squeue-capacity: 1000thread-name-prefix: my-executor
你也可以自定义线程池。以下是使用 @Async 自定义线程池的简单代码实现:
@EnableAsync // 支持异步操作
@Configuration
public class AsyncTaskConfig {/*** 来自 com.google.guava 的线程池* @return*/@Bean("my-executor")public Executor firstExecutor() {ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("my-executor").build();// 获取 CPU 处理器数量int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;ThreadPoolExecutor threadPool = new ThreadPoolExecutor(curSystemThreads, 100,200, TimeUnit.SECONDS,new LinkedBlockingQueue<>(), threadFactory);threadPool.allowsCoreThreadTimeOut();return threadPool;}/*** Spring 的线程池* @return*/@Bean("async-executor")public Executor asyncExecutor() {ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();// 核心线程数taskExecutor.setCorePoolSize(24);// 线程池维护的最大线程数,超出核心线程数的线程仅当缓冲队列满时才会创建taskExecutor.setMaxPoolSize(200);// 缓冲队列taskExecutor.setQueueCapacity(50);// 超出核心线程数的线程空闲时间,超时后将被销毁taskExecutor.setKeepAliveSeconds(200);// 异步方法内部线程名taskExecutor.setThreadNamePrefix("async-executor-");/*** 当线程池的任务缓存队列已满,且线程池中的线程数量已达到最大值时,如果还有任务到来,将采用任务拒绝策略。* 通常有以下四种策略:* ThreadPoolExecutor.AbortPolicy:抛弃任务并抛出 RejectedExecutionException 异常。* ThreadPoolExecutor.DiscardPolicy:抛弃任务,但不抛出异常。* ThreadPoolExecutor.DiscardOldestPolicy:抛弃队列最前面的任务,然后尝试执行当前任务(重复此过程)。* ThreadPoolExecutor.CallerRunsPolicy:重试添加当前任务,自动调用执行方法,直到成功。*/taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());taskExecutor.initialize();return taskExecutor;}
}
异步失效的原因
-
被 @Async 注解的方法不是 public 的;
-
被 @Async 注解的方法的返回值类型只能是 void 或 Future;
-
被 @Async 注解的方法如果是静态的也会失效;
-
未添加 @EnableAsync 注解;
-
调用者和被 @Async 注解的方法不能在同一个类中;
-
对异步方法使用 @Transactional 是无效的,但对异步方法内调用的方法加上 @Transactional 是有效的。
线程池中设置核心线程数的问题
我尚未有时间详细探讨:在线程池中设置 CorePoolSize 和 MaxPoolSize 的最适宜和最高效的数量是多少。
借此机会进行了一些测试。
我记得有个关于 CPU 处理器数量的说法
将 CorePoolSize 设置为 CPU 处理器的数量时,效率最高吗?
// 获取 CPU 处理器数量
int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;
Runtime.getRuntime().availableProcessors()
会获取 CPU 核心线程数,代表计算资源。
-
对于 CPU 密集型任务,线程池的大小设置为 N,与 CPU 线程数一致,这可以最大限度地减少线程间的上下文切换。但在实际开发中,一般设置为 N+1,以防止线程由于不可预见的情况而阻塞。如果发生阻塞,多出来的线程可以继续执行任务,保证 CPU 的高效利用。
-
对于 IO 密集型任务,线程池的大小设置为 2N。这个数值是根据业务压力测试得出的,或者在不涉及业务时使用推荐值。
实际中,线程池的具体大小需要根据压力测试以及机器的当前状态进行调整。
如果线程池过大,会导致 CPU 持续切换,系统整体性能并不会有显著提高,反而可能会变慢。
我电脑的 CPU 处理器数量为 24。
那么一次读取多少行效率最高呢?
测试中,Excel 文件包含 10 万行数据。10 万 / 24 = 4166,因此我设置为 4200。这是最有效的设置吗?
测试过程中似乎的确如此。
我记得大家习惯性地将核心线程数(CorePoolSize)和最大线程数(MaxPoolSize)设置为相同的数值,通常是 200。
这只是随机选择,还是基于经验的?
测试发现,当 CorePoolSize 和 MaxPoolSize 都设置为 200 时,最初同时开启了 150 个线程工作。
为什么会这样呢?
经过数十次测试后
-
发现核心线程数并没有太大区别;
-
关键是每次读取和存储的行数,不能太多,存储速度会逐渐减慢;
-
也不能太少,如果少于 150 个线程,会导致线程阻塞,反而减慢进程。
IV.使用 EasyExcel 读取并插入数据库
我不会写 EasyExcel 的双异步优化。大家要记住避免掉进低级勤奋的陷阱。
ReadEasyExcelController
@RequestMapping(value = "/readEasyExcel", method = RequestMethod.POST)
@ResponseBody
public String readEasyExcel() {try {String path = "G:\\Test\\data\\";String[] xlsxArr = new File(path).list();for (int i = 0; i < xlsxArr.length; i++) {String filePath = path + xlsxArr[i];File fileTemp = new File(path + xlsxArr[i]);String fileName = fileTemp.getName().replace(".xlsx", "");List<UserInfo> list = new ArrayList<>();EasyExcel.read(filePath, UserInfo.class, new ReadEasyExeclAsyncListener(readEasyExeclService, fileName, batchCount, list)).sheet().doRead();}}catch (Exception e){logger.error("readEasyExcel Exception:",e);return "error";}return "success";
}
ReadEasyExeclAsyncListener
public ReadEasyExeclService readEasyExeclService;
// 表名
public String TABLE_NAME;
// 批量插入阈值
private int BATCH_COUNT;
// 数据收集
private List<UserInfo> LIST;public ReadEasyExeclAsyncListener(ReadEasyExeclService readEasyExeclService, String tableName, int batchCount, List<UserInfo> list) {this.readEasyExeclService = readEasyExeclService;this.TABLE_NAME = tableName;this.BATCH_COUNT = batchCount;this.LIST = list;
}@Override
public void invoke(UserInfo data, AnalysisContext analysisContext) {data.setUuid(uuid());data.setTableName(TABLE_NAME);LIST.add(data);if (LIST.size() >= BATCH_COUNT) {// 批量入库readEasyExeclService.saveDataBatch(LIST);}
}@Override
public void doAfterAllAnalysed(AnalysisContext analysisContext) {if (LIST.size() > 0) {// 最后一批入库readEasyExeclService.saveDataBatch(LIST);}
}public static String uuid() {return UUID.randomUUID().toString().replace("-", "");
}
ReadEasyExeclServiceImpl
@Service
public class ReadEasyExeclServiceImpl implements ReadEasyExeclService {@Resourceprivate ReadEasyExeclMapper readEasyExeclMapper;@Overridepublic void saveDataBatch(List<UserInfo> list) {// Insert into the database via mybatisreadEasyExeclMapper.saveDataBatch(list);// Insert into the database via JDBC// insertByJdbc(list);list.clear();}private void insertByJdbc(List<UserInfo> list){List<String> sqlList = new ArrayList<>();for (UserInfo u : list){StringBuilder sqlBuilder = new StringBuilder();sqlBuilder.append("insert into ").append(u.getTableName()).append(" ( UUID,ID,NAME,AGE,ADDRESS,PHONE,OP_TIME ) values ( ");sqlBuilder.append("'").append(ReadEasyExeclAsyncListener.uuid()).append("',").append("'").append(u.getId()).append("',").append("'").append(u.getName()).append("',").append("'").append(u.getAge()).append("',").append("'").append(u.getAddress()).append("',").append("'").append(u.getPhone()).append("',").append("sysdate )");sqlList.add(sqlBuilder.toString());}JdbcUtil.executeDML(sqlList);}
}
UserInfo
@Data
public class UserInfo {private String tableName;private String uuid;@ExcelProperty(value = "ID")private String id;@ExcelProperty(value = "NAME")private String name;@ExcelProperty(value = "AGE")private String age;@ExcelProperty(value = "ADDRESS")private String address;@ExcelProperty(value = "PHONE")private String phone;
}
结语
在处理高并发、大数据导入等场景时,异步编程和线程池技术提供了一种极具效率的解决方案。通过合理配置线程池的核心线程数、最大线程数、队列长度等参数,能够在确保系统稳定性的前提下,大幅提升并发处理能力。而通过异步编程,我们可以有效避免线程阻塞、减少资源浪费,并让系统在面对大量请求时依然能够保持较高的响应速度。
本文的示例通过 Spring Boot 的 @Async 注解和自定义线程池,在实际的 EasyExcel 大数据导入场景下,验证了这种技术组合的高效性和实用性。此外,通过对 CPU 密集型任务和 IO 密集型任务的深入分析,开发者能够根据自身项目的特点,选择合适的线程池配置策略,最大化资源利用率和性能表现。
在实际应用中,线程池和异步编程不仅适用于大数据导入,还可以推广到包括文件处理、网络请求、日志处理等各类需要并发处理的场景中。因此,掌握并灵活运用这些技术,将为我们的系统性能优化提供坚实的基础,使我们能够应对更复杂、更苛刻的业务需求。