-
spring batch개발/spring 2023. 6. 30. 19:15
spring batch
Spring batch 는 대량의 데이터를 처리하기 위한 배치 처리 프레임워크. ItemReader, ItemWriter, ItemProcessor, Execution Context 와 같은 스프링부트의 구성요소를 이용해서 작업을 쉽게 처리할 수 있다. 처음에는 복잡해보이지만 사용해보면 간편하다.
ItemReader
데이터 소스에서 데이터를 읽어오는 역할을 함. 데이터베이스나 파일, 메시지 큐등에서 다양한 데이터 소스를 읽어올 수 있으며 한번에 하나씩 또는 청크 단위로 읽어오는 것이 가능하다. 예제 코드에서는 final 로 선언된 배열의 데이터를 순차적으로 읽어오도록 작성했다. 더이상 읽을 값이 없는 경우 null을 리턴해서 reader를 종료한다.
public class CustomItemReader implements ItemReader<String> { int index = 0; private final String[] data = {"1", "2 3", "4 5 6", "7 8 9 10"}; @Override public String read() { if (index < data.length) { return data[index++]; } return null; } }
ItemProcessor
ItemProcessor는 ItemReader 에서 읽어온 데이터를 가공하거나 필터링하는 역할을 함. 필요에 따라서는 사용하지 않을 수도 있다. 예제 코드에서는 데이터를 공백으로 split 하는 처리만 했다.
public static class WordCountProcessor implements ItemProcessor<String, WordCount> { @Override public WordCount process(String item) { String[] words = item.trim().split("\\s+"); return new WordCount(words.length); } }
ItemWriter
ItemWriter 는 ItemReader, ItemProcessor 에서 처리된 데이터를 최종적으로 쓰는 역할을 한다. 데이터베이스, 메세지큐, 파일처럼 다양한 곳에 데이터를 저장할 수 있고 청크 단위로 데이터를 한 번에 여러 개씩 쓸 수 있다. 예제 코드에서는 전달 받은 데이터를 출력하는 용도로만 사용했다.
public class CustomItemWriter implements ItemWriter<WordCount> { @Override public void write(List<? extends WordCount> items) { for (WordCount wordCount : items) { System.out.println("Thread name " + Thread.currentThread().getName()); System.out.println("Word Count: " + wordCount.getCount()); } } }
Step
Reader, Processor, Writer가 모여서 하나의 배치 Step 이 된다. 스텝은 단일 쓰레드가 순차적으로 실행하는 작업의 단위라고 볼 수 있다.
@Bean public Step singleStep(ItemReader<String> reader, ItemProcessor<String, WordCount> processor, ItemWriter<WordCount> writer) { return stepBuilderFactory.get("singleStep") .<String, WordCount>chunk(1) .reader(reader) .processor(processor) .writer(writer) .build(); }
chunk
ItemReadeer 함수가 실행되고 ItemProcessor, ItemWriter 객체가 실행되는데 데이터를 한번에 여러개를 전달하고 싶을 때가 있다. 직접 코딩으로 ItemReader에 리턴타입을 배열로 두고 여러 개의 데이터를 넣어주는 방법도 있지만 process 함수도 같이 변경해야하기 때문에 번거롭다.
이럴때는 chunkSize를 설정하면 간단하다. chunkSize를 설정하면 ItemReader 에서 read 함수를 실행하는 횟수를 결정할 수 있기 때문에 코드상으로는 단일의 데이터를 처리하도록 해놓고 실제 동작은 복수의 데이터를 전달하는 것처럼 할 수 있다.
한번에 많은 데이터를 읽어오게 되기 때문에 캐시효과를 이용해 성능 향상을 기대해볼 수 있지만 chunkSize가 커질수록 메모리 사용량도 증가하게 되기 때문에 선택할 때 고려해봐야한다.
master-slave
스프링이 멀티쓰레드를 지원하는 프레임워크인 만큼 스프링 배치의 작업도 멀티 쓰레드로 병렬로 처리할 수 있다. 스프링 배치에서는 Step을 크게 Master Step, Slave Step 으로 나누고 Master 단계는 Slave step을 처리할 쓰레드를 생성하고 작업을 분담처리를 담당하고, Slave Step 에서는 read, process, writer를 이용한 실질적인 정보 처리를 담당하도록 한다.
Slave는 별도의 쓰레드에서 처리하기 때문에 저마다 특수한 ItemReader, ItemWriter, ItemProcessor 를 갖고 있어 안전하다. Partitioner 에서 전달한 정보를 토대로 데이터를 처리한다
Partitioner
Spring Batch 에서 대량의 데이터를 처리할 때 여러 파티션으로 분할하는 역할을 한다. 직접 처리할 데이터를 넘겨줄 수 있고, 처리할 파일 이름 또는 파일의 라인 단위를 넘겨줄 수도 있다. 파티션은 Slave 마다 별도로 주어지게 된다. grid size는 병렬로 실행할 Slave Step의 개수다.
간단히 말하면 쓰레드 별로 처리할 작업물을 분배하는 역할을 한다고 보면 쉽다. 예제 코드에서는 array 단위로 작업을 나누고 값을 전달했다.
public class CustomPartitioner implements Partitioner { private final String[] data = {"Hello World", "1 2 3 4 5", "카리나 윈터", "지젤 닝닝 지효 정연"}; @Override public Map<String, ExecutionContext> partition(int gridSize) { Map<String, ExecutionContext> partitions = new HashMap<>(); int partitionSize = data.length / gridSize; for (int i = 0; i < gridSize; i++) { ExecutionContext context = new ExecutionContext(); int length = (i == (gridSize - 1) ? data.length : (i + 1) * partitionSize) - i * partitionSize; String[] partitionData = new String[length]; System.arraycopy(data, i * partitionSize, partitionData, 0, length); context.put("data", partitionData); partitions.put("partition" + i, context); } return partitions; } }
ExecutionContext
Job 또는 Step 별로 데이터를 공유할 수 있는 객체다. key-value 의 형태로 값을 저장할 수 있어서 사용하기 간편하다. int, string 뿐만 아니라 배열도 저장할 수 있다. Partitioner 에서는 Partition 마다 ExecutionContext 를 생성하고 상응하는 값을 넣는다. 이렇게 생성된 ExecutionContext는 Slave Step의 Reader 에서 사용된다.
ItemReader 코드에서 Partitioner에서 전달해준 배열을 ExecutionContext 객체를 이용해서 가져왔다.
public class CustomItemReader implements ItemReader<String> { int index = 0; private ThreadLocal<StepExecution> stepExecution = new ThreadLocal<>(); @BeforeStep public void beforeStep(StepExecution stepExecution) { this.stepExecution.set(stepExecution); } @Override public String read() { ExecutionContext executionContext = stepExecution.get().getExecutionContext(); String[] partitionData = (String[]) executionContext.get("data"); if (index < partitionData.length) { return partitionData[index++]; } else { return null; } } }
Master-Slave 구조에서는 두개의 Step 이 필요하다. Slave Step은 앞서 단일쓰레드와 동일하고 Master 스텝에서는 앞서 정의한 Partitioner와 gridSize, taskExecutor를 설정한다.
public Step slaveStep(ItemReader<String> reader, ItemProcessor<String, WordCount> processor, ItemWriter<WordCount> writer) { return stepBuilderFactory.get("slaveStep") .<String, WordCount>chunk(1) .reader(reader) .processor(processor) .writer(writer) .build(); } @Bean public Step masterStep(Partitioner partitioner, Step slaveStep) { return stepBuilderFactory.get("masterStep") .partitioner("slaveStep", partitioner) .step(slaveStep) .gridSize(2) .taskExecutor(taskExecutor()) .build(); } @Bean public Job masterSlaveJob(Step masterStep) { return jobBuilderFactory.get("masterSlaveJob") .incrementer(new RunIdIncrementer()) .start(masterStep) .build(); } @Bean @StepScope public Partitioner partitioner() { return new CustomPartitioner(); }
전체 소스코드
'개발 > spring' 카테고리의 다른 글
IoC container and Bean (0) 2021.09.06 @Bean vs @Component (0) 2021.09.06 Node.js vs Spring Boot (5) 2021.03.13 Spring 테이블 칼럼이 아닌 필드 데이터 받아오기 (0) 2021.03.05