在處理大量資料時,如果單線程處理會很慢。Spring Batch 的 Partitioner 就是用來把大任務分成多個小區塊,然後平行處理。我之前有個 ETL job 處理 1000 萬筆資料,用了 Partitioner 之後,執行時間直接從 2 小時降到 15 分鐘。

Partitioner 的核心概念

Partitioner 做的事很簡單:把一個大任務分成 N 個小任務。

假設你要處理 1000 萬筆資料:

  • 沒有 Partitioner:一個線程處理全部,花 1 小時
  • 有 Partitioner(8 個 partition):8 個線程各處理 125 萬筆,花 7.5 分鐘

核心元件

1. Partitioner Interface

Partitioner 的工作就是定義怎麼分割。你要實現 Partitioner interface:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class CustomPartitioner implements Partitioner {

@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// gridSize 是 partition 的數量,通常等於線程池的大小

Map<String, ExecutionContext> result = new HashMap<>();

// 假設我們要處理 1000 萬筆資料,分成 gridSize 個 partition
long totalRecords = 10_000_000;
long recordsPerPartition = totalRecords / gridSize;

for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();

long startIndex = i * recordsPerPartition;
long endIndex = (i == gridSize - 1) ? totalRecords : (i + 1) * recordsPerPartition;

context.putLong("startIndex", startIndex);
context.putLong("endIndex", endIndex);
context.putInt("partitionId", i);

result.put("partition" + i, context);
}

return result;
}
}

Partitioner 會回傳一個 Map,key 是 partition 的名稱,value 是 ExecutionContext(包含該 partition 的必要資訊)。

2. Reader 和 Writer

Reader 要根據 ExecutionContext 裡的資訊來讀取指定範圍的資料:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Bean
public ItemReader<User> userPartitionReader(
@Value("#{stepExecutionContext['startIndex']}") long startIndex,
@Value("#{stepExecutionContext['endIndex']}") long endIndex) {

JdbcPagingItemReader<User> reader = new JdbcPagingItemReader<>();
reader.setDataSource(dataSource);
reader.setFetchSize(1000);

SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
queryProvider.setDataSource(dataSource);
queryProvider.setSelectClause("SELECT *");
queryProvider.setFromClause("FROM users");
queryProvider.setWhereClause("WHERE id BETWEEN " + startIndex + " AND " + endIndex);
queryProvider.setSortKey("id");

try {
reader.setQueryProvider(queryProvider.getObject());
} catch (Exception e) {
throw new RuntimeException(e);
}

reader.setRowMapper(new BeanPropertyRowMapper<>(User.class));

return reader;
}

注意那個 @Value("#{stepExecutionContext['startIndex']}"),這就是怎麼取得 partition 的資訊。

3. PartitionHandler

PartitionHandler 管理 worker step 的執行。通常用 TaskExecutorPartitionHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Bean
public PartitionHandler partitionHandler(Step workerStep) {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
handler.setTaskExecutor(taskExecutor());
handler.setStep(workerStep);
handler.setGridSize(8); // 使用 8 個線程

return handler;
}

@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("batch-partition-");
executor.initialize();

return executor;
}

完整的 Job 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
@Configuration
@EnableBatchProcessing
public class PartitionBatchConfig {

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
private DataSource dataSource;

// Master step - 負責分區
@Bean
public Step masterStep(PartitionHandler partitionHandler, Partitioner partitioner) {
return stepBuilderFactory.get("masterStep")
.partitioner("workerStep", partitioner)
.partitionHandler(partitionHandler)
.build();
}

// Worker step - 實際處理資料
@Bean
public Step workerStep(
ItemReader<User> itemReader,
ItemProcessor<User, UserDto> itemProcessor,
ItemWriter<UserDto> itemWriter) {

return stepBuilderFactory.get("workerStep")
.<User, UserDto>chunk(1000)
.reader(itemReader)
.processor(itemProcessor)
.writer(itemWriter)
.build();
}

@Bean
public Partitioner partitioner() {
return new CustomPartitioner();
}

@Bean
public PartitionHandler partitionHandler(Step workerStep) {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
handler.setTaskExecutor(taskExecutor());
handler.setStep(workerStep);
handler.setGridSize(8);

return handler;
}

@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(100);
executor.initialize();

return executor;
}

@Bean
public Job partitionJob(Step masterStep) {
return jobBuilderFactory.get("partitionJob")
.start(masterStep)
.build();
}

// Reader - 每個 partition 一個
@Bean
@StepScope
public ItemReader<User> itemReader(
@Value("#{stepExecutionContext['startIndex']}") long startIndex,
@Value("#{stepExecutionContext['endIndex']}") long endIndex) {

JdbcPagingItemReader<User> reader = new JdbcPagingItemReader<>();
reader.setDataSource(dataSource);
reader.setFetchSize(1000);

// 配置 reader,根據 startIndex 和 endIndex 讀取資料
// ...

return reader;
}

@Bean
public ItemProcessor<User, UserDto> itemProcessor() {
return user -> {
UserDto dto = new UserDto();
dto.setId(user.getId());
dto.setName(user.getName().toUpperCase());
return dto;
};
}

@Bean
public ItemWriter<UserDto> itemWriter(
NamedParameterJdbcTemplate template) {

return items -> {
String sql = "INSERT INTO user_dto (id, name) VALUES (:id, :name)";

List<Map<String, Object>> batchArgs = items.stream()
.map(item -> {
Map<String, Object> map = new HashMap<>();
map.put("id", item.getId());
map.put("name", item.getName());
return map;
})
.collect(Collectors.toList());

template.batchUpdate(sql,
batchArgs.toArray(new Map[0]));
};
}
}

怎麼執行 Partition Job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@RestController
public class JobController {

@Autowired
private JobLauncher jobLauncher;

@Autowired
private Job partitionJob;

@PostMapping("/start-partition-job")
public void startJob() throws Exception {
JobParameters parameters = new JobParametersBuilder()
.addLong("timestamp", System.currentTimeMillis())
.toJobParameters();

JobExecution execution = jobLauncher.run(partitionJob, parameters);
System.out.println("Job execution ID: " + execution.getId());
System.out.println("Job status: " + execution.getStatus());
}
}

常見的坑

1. ExecutionContext 的資訊沒有正確傳到 reader

記得用 @StepScope@Value("#{stepExecutionContext[...]}"),這樣 Spring 才會把 partition 的資訊注入進去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 對的
@Bean
@StepScope
public ItemReader<User> itemReader(
@Value("#{stepExecutionContext['startIndex']}") long startIndex) {
// ...
}

// 錯的
@Bean
public ItemReader<User> itemReader(
@Value("#{stepExecutionContext['startIndex']}") long startIndex) {
// ...
}

2. 線程池設定太小

如果線程池只有 2 個線程,但 gridSize 設成 8,就浪費了。要根據 CPU 核心數和任務性質來調整:

1
2
int optimalThreadCount = Runtime.getRuntime().availableProcessors();
// 或者根據記憶體和 I/O 能力調整

3. 資料重複或遺漏

partition 的範圍要確保沒有重疊,也不能有間隙:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 錯的:間隙
context.put("startIndex", 0);
context.put("endIndex", 100);

context.put("startIndex", 101); // 100 被跳過了
context.put("endIndex", 200);

// 對的:連續
context.put("startIndex", 0);
context.put("endIndex", 100);

context.put("startIndex", 100); // 注意是 100,不是 101
context.put("endIndex", 200);

監控和日誌

可以加一些日誌來監控 partition 的執行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Bean
public StepExecutionListener partitionStepListener() {
return new StepExecutionListener() {
@Override
public void beforeStep(StepExecution stepExecution) {
Long startIndex = stepExecution.getExecutionContext().getLong("startIndex");
Long endIndex = stepExecution.getExecutionContext().getLong("endIndex");

logger.info("Partition {} starting: {} to {}",
stepExecution.getStepName(), startIndex, endIndex);
}

@Override
public ExitStatus afterStep(StepExecution stepExecution) {
logger.info("Partition {} finished. Read: {}, Written: {}",
stepExecution.getStepName(),
stepExecution.getReadCount(),
stepExecution.getWriteCount());

return stepExecution.getExitStatus();
}
};
}

重點整理

  • Partitioner:定義怎麼分割資料,回傳 Map<String, ExecutionContext>
  • PartitionHandler:管理 worker step 的執行,通常用 TaskExecutorPartitionHandler
  • @StepScope + **@Value(“#{stepExecutionContext[…]}”)**:取得 partition 的資訊
  • GridSize:partition 的數量,通常等於線程數
  • 要注意資料的分界點,確保沒有重複或遺漏
  • 線程池大小要合理調整

用 Partitioner 處理大資料量時,能大幅提升效能。下次有百萬級以上的資料要處理,記得用 Partitioner。