在處理大量資料時,如果單線程處理會很慢。Spring Batch 的 Partitioner 就是用來把大任務分成多個小區塊,然後平行處理。我之前有個 ETL job 處理 1000 萬筆資料,用了 Partitioner 之後,執行時間直接從 2 小時降到 15 分鐘。
Partitioner 的核心概念
Partitioner 做的事很簡單:把一個大任務分成 N 個小任務。
假設你要處理 1000 萬筆資料:
- 沒有 Partitioner:一個線程處理全部,花 1 小時
- 有 Partitioner(8 個 partition):8 個線程各處理 125 萬筆,花 7.5 分鐘
核心元件
1. Partitioner Interface
Partitioner 的工作就是定義怎麼分割。你要實現 Partitioner interface:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class CustomPartitioner implements Partitioner { @Override public Map<String, ExecutionContext> partition(int gridSize) { Map<String, ExecutionContext> result = new HashMap<>(); long totalRecords = 10_000_000; long recordsPerPartition = totalRecords / gridSize; for (int i = 0; i < gridSize; i++) { ExecutionContext context = new ExecutionContext(); long startIndex = i * recordsPerPartition; long endIndex = (i == gridSize - 1) ? totalRecords : (i + 1) * recordsPerPartition; context.putLong("startIndex", startIndex); context.putLong("endIndex", endIndex); context.putInt("partitionId", i); result.put("partition" + i, context); } return result; } }
|
Partitioner 會回傳一個 Map,key 是 partition 的名稱,value 是 ExecutionContext(包含該 partition 的必要資訊)。
2. Reader 和 Writer
Reader 要根據 ExecutionContext 裡的資訊來讀取指定範圍的資料:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Bean public ItemReader<User> userPartitionReader( @Value("#{stepExecutionContext['startIndex']}") long startIndex, @Value("#{stepExecutionContext['endIndex']}") long endIndex) { JdbcPagingItemReader<User> reader = new JdbcPagingItemReader<>(); reader.setDataSource(dataSource); reader.setFetchSize(1000); SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean(); queryProvider.setDataSource(dataSource); queryProvider.setSelectClause("SELECT *"); queryProvider.setFromClause("FROM users"); queryProvider.setWhereClause("WHERE id BETWEEN " + startIndex + " AND " + endIndex); queryProvider.setSortKey("id"); try { reader.setQueryProvider(queryProvider.getObject()); } catch (Exception e) { throw new RuntimeException(e); } reader.setRowMapper(new BeanPropertyRowMapper<>(User.class)); return reader; }
|
注意那個 @Value("#{stepExecutionContext['startIndex']}"),這就是怎麼取得 partition 的資訊。
3. PartitionHandler
PartitionHandler 管理 worker step 的執行。通常用 TaskExecutorPartitionHandler:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Bean public PartitionHandler partitionHandler(Step workerStep) { TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler(); handler.setTaskExecutor(taskExecutor()); handler.setStep(workerStep); handler.setGridSize(8); return handler; }
@Bean public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(8); executor.setMaxPoolSize(8); executor.setQueueCapacity(100); executor.setThreadNamePrefix("batch-partition-"); executor.initialize(); return executor; }
|
完整的 Job 配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
| @Configuration @EnableBatchProcessing public class PartitionBatchConfig { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired private DataSource dataSource; @Bean public Step masterStep(PartitionHandler partitionHandler, Partitioner partitioner) { return stepBuilderFactory.get("masterStep") .partitioner("workerStep", partitioner) .partitionHandler(partitionHandler) .build(); } @Bean public Step workerStep( ItemReader<User> itemReader, ItemProcessor<User, UserDto> itemProcessor, ItemWriter<UserDto> itemWriter) { return stepBuilderFactory.get("workerStep") .<User, UserDto>chunk(1000) .reader(itemReader) .processor(itemProcessor) .writer(itemWriter) .build(); } @Bean public Partitioner partitioner() { return new CustomPartitioner(); } @Bean public PartitionHandler partitionHandler(Step workerStep) { TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler(); handler.setTaskExecutor(taskExecutor()); handler.setStep(workerStep); handler.setGridSize(8); return handler; } @Bean public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(8); executor.setMaxPoolSize(8); executor.setQueueCapacity(100); executor.initialize(); return executor; } @Bean public Job partitionJob(Step masterStep) { return jobBuilderFactory.get("partitionJob") .start(masterStep) .build(); } @Bean @StepScope public ItemReader<User> itemReader( @Value("#{stepExecutionContext['startIndex']}") long startIndex, @Value("#{stepExecutionContext['endIndex']}") long endIndex) { JdbcPagingItemReader<User> reader = new JdbcPagingItemReader<>(); reader.setDataSource(dataSource); reader.setFetchSize(1000); return reader; } @Bean public ItemProcessor<User, UserDto> itemProcessor() { return user -> { UserDto dto = new UserDto(); dto.setId(user.getId()); dto.setName(user.getName().toUpperCase()); return dto; }; } @Bean public ItemWriter<UserDto> itemWriter( NamedParameterJdbcTemplate template) { return items -> { String sql = "INSERT INTO user_dto (id, name) VALUES (:id, :name)"; List<Map<String, Object>> batchArgs = items.stream() .map(item -> { Map<String, Object> map = new HashMap<>(); map.put("id", item.getId()); map.put("name", item.getName()); return map; }) .collect(Collectors.toList()); template.batchUpdate(sql, batchArgs.toArray(new Map[0])); }; } }
|
怎麼執行 Partition Job
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @RestController public class JobController { @Autowired private JobLauncher jobLauncher; @Autowired private Job partitionJob; @PostMapping("/start-partition-job") public void startJob() throws Exception { JobParameters parameters = new JobParametersBuilder() .addLong("timestamp", System.currentTimeMillis()) .toJobParameters(); JobExecution execution = jobLauncher.run(partitionJob, parameters); System.out.println("Job execution ID: " + execution.getId()); System.out.println("Job status: " + execution.getStatus()); } }
|
常見的坑
1. ExecutionContext 的資訊沒有正確傳到 reader
記得用 @StepScope 和 @Value("#{stepExecutionContext[...]}"),這樣 Spring 才會把 partition 的資訊注入進去。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Bean @StepScope public ItemReader<User> itemReader( @Value("#{stepExecutionContext['startIndex']}") long startIndex) { }
@Bean public ItemReader<User> itemReader( @Value("#{stepExecutionContext['startIndex']}") long startIndex) { }
|
2. 線程池設定太小
如果線程池只有 2 個線程,但 gridSize 設成 8,就浪費了。要根據 CPU 核心數和任務性質來調整:
1 2
| int optimalThreadCount = Runtime.getRuntime().availableProcessors();
|
3. 資料重複或遺漏
partition 的範圍要確保沒有重疊,也不能有間隙:
1 2 3 4 5 6 7 8 9 10 11 12 13
| context.put("startIndex", 0); context.put("endIndex", 100);
context.put("startIndex", 101); context.put("endIndex", 200);
context.put("startIndex", 0); context.put("endIndex", 100);
context.put("startIndex", 100); context.put("endIndex", 200);
|
監控和日誌
可以加一些日誌來監控 partition 的執行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Bean public StepExecutionListener partitionStepListener() { return new StepExecutionListener() { @Override public void beforeStep(StepExecution stepExecution) { Long startIndex = stepExecution.getExecutionContext().getLong("startIndex"); Long endIndex = stepExecution.getExecutionContext().getLong("endIndex"); logger.info("Partition {} starting: {} to {}", stepExecution.getStepName(), startIndex, endIndex); } @Override public ExitStatus afterStep(StepExecution stepExecution) { logger.info("Partition {} finished. Read: {}, Written: {}", stepExecution.getStepName(), stepExecution.getReadCount(), stepExecution.getWriteCount()); return stepExecution.getExitStatus(); } }; }
|
重點整理
- Partitioner:定義怎麼分割資料,回傳 Map<String, ExecutionContext>
- PartitionHandler:管理 worker step 的執行,通常用 TaskExecutorPartitionHandler
- @StepScope + **@Value(“#{stepExecutionContext[…]}”)**:取得 partition 的資訊
- GridSize:partition 的數量,通常等於線程數
- 要注意資料的分界點,確保沒有重複或遺漏
- 線程池大小要合理調整
用 Partitioner 處理大資料量時,能大幅提升效能。下次有百萬級以上的資料要處理,記得用 Partitioner。
你的鼓勵將被轉換為我明天繼續加班的動力(真的)。 ❤️