공부/Springboot

Spring-Batch

songhees 2025. 11. 27. 23:51

Spring Batch란?

대용량 데이터를 정해진 규칙에 따라 읽고/처리하고/저장하는 이러한 배치 작업을, 안정적으로 실행/관리/복구할 수 있게 해주는 Spring 기반 프레임워크

  • 배치 애플리케이션 개발을 가능하게 하도록 설계된 가볍고(lightweight) 포괄적인(comprehensive) 배치 프레임워크
  • Batch : 데이터를 일괄적으로 묶어 한 번에 처리하는 방식
  • 스케줄링 프레임워크가 아니다.
  • 스케줄러를 대체하기보다 스케줄러와 함께 동작하도록 의도되었다.

 

Domain


JobLauncher

jobParameter + job 을 실행하는 인터페이스

private final JobLauncher simpleJobLauncher

@Scheduled(cron = "0 5 0 * * ?")
public void cronExpireCtrtJob() throws Exception {
    JobParameters param = new JobParametersBuilder()
            .addString("version", "1")
            .addString(InfConstant.Job.DATE_PARAM, LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME))
            .addLong("time", System.currentTimeMillis())
            .toJobParameters();
    simpleJobLauncher.run(expireCtrtJob(), param);
}

JobLauncher.run()을 호출하면 내부에서 JobRepository가 JobExecution을 생성하고, 그 JobExecution으로 잡을 실행한 뒤 최종적으로 그 JobExecution 객체를 그대로 반환

JobInstance = Job + JobParameter

batch_job_instance = 하나의 Job 정의를 JobParameter 조합 별로 논리적으로 식별하기 위한 단위

  • Job을 여러 번 실행한다면 같은 Job에 대해 각각의 JobInstance가 생성된다.
    • 성공(COMPLETED)까지 끝난 JobInstance는 같은 JobParameters로 다시 시작할 수 없다.
    • 만약에 중단된 Job(FAILED / STOPPED)이 있다면 같은 JobParameters로 재시작 할 수 있다.
  • 각각의 JobInstance를 구분해주는 값이 JobParameter이다.
JobParameters param = new JobParametersBuilder()
		.addLong("time", System.currentTimeMillis())
		.addString("name", "이름")
		.toJobParameters();
jobLauncher.run(**customJob**(), param);

JobLauncher.run 호출 시 내부에서 JobRepository가 JobExecution을 생성하기 전에

  • (jobName, jobParameters) 조합으로 JobInstance 조회
  • 없으면 → JobInstance 새로 생성 (DB BATCH_JOB_INSTANCE INSERT)
  • JobInstance를 참조하는 JobExecution을 생성 (DB BATCH_JOB_EXECUTION INSERT)

JobInstance : JobExecution : JobParameters(세트) = 1 : N : N

JobInstance (jobName + identifying params)
    └─ 1 : N ── JobExecution
                 └─ 1 : 1 ── JobParameters(해당 실행의 파라미터 세트)

Job

Job 안에서 실제 일을 하는 실행 단위

 

job은 배치가 작업할 전체 시나리오

더보기

통계 데이터 집계 배치를 만든다라고 한다면

해당 전체 프로세스를 job이라고 할때

step은 job의 시나리오에 들어있는 개별 단계

step은 그 안에서 데이터를 읽고, 데이터를 계산해서 통계 테이블의 저장하는 각각의 단계라고 할 수 있다.

@Bean
public Job expireCtrtJob() throws Exception {
    return jobBuilderFactory.get("expireCtrtJob")
            .start(expireCtrtStep(null))
            .next(finishCtrtBillStep(null))
            .build();
}

 

JobParameter

parameter 설정

.addT( String key, T parameter, boolean identifying)

  • key : parameter 이름
  • parameter : 파라미터 값
  • identifying : 파라미터가 잡 인스턴스를 식별하는 데 사용되는지 여부를 나타낸다.
    • default 값은 true 로 배치 스키마의 JOB_KEY에 해당 하는 값이 된다.

ex )

  • bizDate=2025-09-22 (identifying=true)
  • traceId=abc-123 (identifying=false, 단순 추적 용)

이 경우 JOB_KEY는 bizDate만 반영되어 생성

같은 날짜로 다시 돌리면(그리고 이전 인스턴스가 COMPLETED라면) 이미 완료된 동일 인스턴스로 간주되어 재실행 불가(JobInstanceAlreadyCompleteException)

반대로 날짜를 바꾸면 새 JobInstance가 된다.

JobParameters params = new JobParametersBuilder()
	.addString("bizDate", "2025-09-22", true) // 식별 파라미터
	.addString("traceId", "abc-123", false) // 비식별(인스턴스 키에 미포함)
	.toJobParameters();

JobParameter 사용

  1. Job을 식별하거나 참조 데이터로 사용한다.
  2. 배치 실행을 위해 필요한 값을 외부에서 전달하고 싶을 때
    import org.springframework.beans.factory.annotation.Value; + spel로 step, job에 jobParameter를 전달

JobScope / StepScope

Bean의 생성 시점을 지정된 Scope가 실행되는 시점으로 지연시킨다.
  • @JobScope: Job 실행(JobExecution) 동안만 살아 있는 빈으로 생성
    같은 Job 안의 여러 Step에서 공유할 수 있고 Job이 끝나면 파기
  • @StepScope: Step 실행(StepExecution) 동안만 살아 있는 빈으로 생성
    해당 Step이 시작될 때 생성되어 끝나면 파기
  1. JobParameter의 Late Binding이 가능
  2. 실행 컨텍스트의 병렬 처리에 안전
    상태를 공유하는 싱글톤 빈이 병렬 실행에서 꼬이는 문제를 막아준다.
    예를 들어
    • 같은 Spring 애플리케이션안에서 Job을 서로 다른 JobParameter로 동시에 실행하면
      서로 다른 JobExecution과 StepExecution으로 실행되지만
    • @JobScope/@StepScope를 안 붙인 Step/Reader/Writer는 기본이 싱글톤 빈이라 인스턴스가 공유될 수 있다.
      따라서
    • JpaPagingItemReader 에서 doReadPage 호출 할 때 사용하는 page, results 상태가 뒤섞일 수 있다.
      몇 페이지를 읽어야 되는지 꼬일 수 있다.
    • → 스코프 기준은 스레드가 아니라 JobExecution/StepExecution(실행 컨텍스트) 이다. ⭐⭐⭐
      멀티스레드에서도 하나의 StepExecution을 공유할 수 있기 때문에…
      쓰레드 1,2,3이 같은 StepExecution을 공유하여 chunck를 나눠 병렬로 처리할 수 있다.
    • 하지만 이것도 중복/누락이 발생 할 수 있기 때문에 Partitioning 으로 범위 분할 병렬 처리하는 것이 좋다.

 

step 예시

@Bean
public Job customJob() {
	return JobBuilderFactory.get("customJob")
        .start(**customStep(null)**)
        .next(deleteSpeOpertStep(null))
        .build();
}

@Bean
@JobScope
public Step customStep(@Value("#{jobParameters['key']}") String key) {
	
}

reader(tasklet) 예시

@Bean
@StepScope
public JdbcCursorReader reader(@Value("#{jobParameters['key']}") String key) {

}

 

incrementer

동일 Job Parameter로 계속 실행하는 방법

  • Incrementer를 구현해서 사용하면, 기존에 전달되는 JobParameters를 변경하지 않으면서 JobParameters를 추가해서 동일한 Job을 변경점 없이 여러번 실행할 수 있게 된다.
  • Incrementer는 build() 메서드가 실행되는 시점에 메타 정보가 넘어가면서 이 Incrementer를 가진 Job으로 생성된다.
  • 그냥 JobParameter 한 개 더 생성된다고 이해하면 됨
  • Job 의 incrementer 옵션에 추가하여 사용
    public Job testTaskChunkJob(){
      return jobBuilderFactory.get(JOBNAME)
              .incrementer(new RunIdIncrementer())
              .start(this.taskStep1()) //tasklet 처리
              .next(this.chunkStep1()) //chunk 처리
              .build();
    }

RunIdIncrementer 의 역할

RunIdIncrementer 는 잡 실행 때마다 run.id (Long) 라는 파라미터를 자동으로 하나씩 증가시켜 준다.

  • 예:
    • 첫 실행: run.id=1
    • 두 번째 실행: run.id=2
    • 세 번째 실행: run.id=3

이렇게 해서 동일한 잡이라도 실행할 때마다 고유한 JobParameters 를 가지게 되어 정상적으로 여러 번 실행할 수 있다.

 

JobExecution

Job 이 실제로 한 번 실행된 이력 = JobInstance를 실제로 한 번 시도한 기록, 상태값

  • BATCH_JOB_EXECUTION(및 관련 메타데이터 테이블)에 대해 JobRepository/JobExplorer 가 CRUD, 조회할 때 사용하는 도메인 클래스
  • JobInstance를 실행 할 때 마다 한 건씩 추가

로직

  1. JobLancher에서 마지막으로 실행된 JobExecution을 조회 (lastExecution)
  2. lastExecution이 존재하는데 job.isRestartable() == false면 Exeception
  3. true인 경우에만 다음 단계를 통해 재시작 가능 여부를 계속 검사한다.
  4. lastExecution 의 StepExecution들의 상태를 비교
    • 오직 Job의 재시도 여부를 판단 하기 위한 Step의 Status 비교 ( Step의 재시도 여부가 아님 )
  5. isRunning 중인지? UNKNOWN 상태인지? ⇒ Exeception
  6. 실행 중인 Job이 없다면 JobRepository에 JobExecution 생성 메소드를 호출
    1. JobRepository에서 jobName, jobParameters로 JobInstance를 조회
      • JobInstance 가 없다면 새 JobInstance를 생성 하고 빈 ExecutionContext로 시작
      • JobInstance 가 있다면 JobInstance로 다시 JobExecution를 조회 ← 재시도 로직
    2. 해당 JobExecution status값이 STARTING, STARTED, STOPPING 면 이미 Job이 실행중이므로 Exeception
      UNKNOWN 이면 상태를 신뢰할 수 없으므로 Exeception
    3. JobExecution의 JobParameters가 있는데 status값이 COMPLETED, ABANDONED면 해당 JobInstance는 이미 완료된 것으로 보고 Exeception
    4. 위와 같은 필터링 후 재시작인 경우 같은 JobParameters와 기존 배치 실행 도중 중간 상태 값(ExecutionContext)으로 새 JobExecution 생성
  • JobInstance
    → Job + (식별용) JobParameters 조합을 표현하는 클래스/엔티티
  • 이 JobInstance가 실행될 때마다
    → JobExecution 이 하나씩 생성
  • 이 JobExecution 들은
    → JobRepository 를 통해 BATCH_JOB_EXECUTION 등 메타데이터 테이블에 저장

 

JobRepository

JobExecution, StepExecution 등 다양한 영속화된 도메인 객체에 대해 기본 CRUD 작업을 수행하는 역할

 

위의 객체들에 대한 meta 정보를 저장을 담당하는 클래스

spring batch에서 사용하는 여러 domain 객체를 CRUD 할 때 필요하다.

Job을 실행할 때 repository에서 JobExecution을 조회하고, 구현체를 repository에 넘겨 저장한다.

Step

Configuring a Step :: Spring Batch

Job을 이루는 작업 단계 Step1, 2, 3…

 

Job과 1:N Step으로 구성

실제 배치 처리를 정의하고 제어하는 데 필요한 모든 정보를 포함

 

Chunk 기반


데이터를 한 번에 하나씩 읽고, 일정 수 만큼 모아서(청크) 트랜잭션 경계 내에서 쓰는 방식

대량의 데이터를 다룰 때 쓰는 방식. 데이터를 청크 단위로 나누어 읽고, 처리하고, 쓰는 구조


구조

읽은 아이템의 수가 커밋 간격(commit interval)에 도달하면, 그 청크 전체를 ItemWriter가 쓰고, 그 후에 트랜잭션을 커밋

ItemProcessor를 구성하여, ItemReader가 읽은 아이템을 ItemWriter로 전달하기 전에 처리할 수 있다.

먼저 ItemReader가 데이터를 읽어 items 리스트에 담고, 그 후 ItemProcessor를 통해 각 아이템을 처리한 뒤에 ItemWriter가 결과를 기록

  • reader: 처리할 아이템을 제공하는 ItemReader
  • writer: ItemReader가 제공한 아이템을 처리하는 ItemWriter

로직

private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;

@Bean
@JobScope
Step validateWeahterStep(JpaPagingItemReader<Weather> weatherReader,
    ItemProcessor<Weather, Weather> weatherFlagProcessor,
    ItemWriter<Weather> weatherFlagWriter) {
    return new StepBuilder("validateWeahterStep", jobRepository)
                .<Weather, Weather>chunk(100, transactionManager)
                .reader(weatherReader)
                .processor(weatherFlagProcessor)
                .writer(weatherFlagWriter)
                .build();
}
  • transactionManager : 데이터를 처리할 때 트랜잭션을 시작하고 커밋하는 Spring의 PlatformTransactionManager 객체
  • jobRepository : 실행 중인 StepExecution 및 ExecutionContext를 커밋 전에 주기적으로 저장하는 역할
  • chunk(100, transactionManager) : 한번에 처리할 아이템 개수를 100개로 설정하고 트랜잭션 관리자를 지정
  • ItemProcessor는 필수가 아니며, 경우에 따라 리더(reader)에서 읽은 아이템을 바로 라이터(writer)로 전달할 수 있다.

로직 흐름

위 예제에서 한 트랜잭션 내에서 100개의 아이템이 처리된다.

  • 처리 시작 시 트랜잭션이 시작된다.
  • 이후 ItemReader에서 read()가 호출될 때마다 카운터가 증가한다.
  • 카운터가 10에 도달하면, 모아둔 아이템 리스트가 ItemWriter로 전달된다.
  • 그리고 트랜잭션이 커밋된다.

 

Tasklet 기반

Step 안에서 한 번에 실행되는 작업 단위를 구현하는 인터페이스

저장 프로시저 호출로만 구성되어 있다면 ItemReader로 호출을 구현하고 프로시저가 끝난 뒤 null을 반환할 수 있다. 하지만 이렇게 하면 호출을 마무리하기 위해 의미 없는(no-op) ItemWriter가 필요하기 때문에 다소 부자연스럽다.

따라서 TaskletStep 을 제공한다.

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new StepBuilder("step1", jobRepository)
    			.tasklet(myTasklet(), transactionManager)
    			.build();
}

tasklet에 TaskletAdapter 혹은 Tasklet 구현체를 넣어야된다.

 

StepExecution

JobExecution 안에서 각 Step 이 실행된 이력

  • batch_step_execution(및 관련 메타데이터 테이블)에 대해 JobRepository/JobExplorer 가 CRUD, 조회할 때 사용하는 도메인 클래스
  • 한번의 step 실행시도 → step을 실행할때 마다 생성
  • JobExecution과 비슷한 기능으로 하나의 JobExecution에서 진행되는 여러 step에 대한 실행 상태를 check
  • 이전 단계 step이 실패해서 다음단계 step을 실행하지 않는다면 다음 단계 step에 대한 execution을 저장하지 않는다.
    • payment → unpaid 순으로 step이 있다면
    • payment에서 status가 failed로 뜬다면 unpaid는 실행되지 않기 때문에 unpaid에 대한 execution은 저장되지 않는다.

 

Reader

Step(Database)에서 배치 처리할 Item(데이터)을 읽어오는 역할

database에서 데이터를 읽어올 경우 전체 배치 로직에서 속도에 영향을 많이 주는 단계이다.

 

[if kakao 2022] Batch Performance를 고려한 최선의 Reader | 카카오페이 기술 블로그

if(kakao)2022 대량의 데이터를 Batch로 읽을 때의 노하우를 공유합니다.

tech.kakaopay.com

 

ItemReader 인터페이스

step에서 아이템을 하나씩 읽어오는 작업을 추상화한 개념

ItemReader는 다양한 입력 소스로부터 데이터를 제공하는 수단

public interface ItemReader<T> {

    T read() throws Exception, UnexpectedInputException, ParseException, 
    NonTransientResourceException;
}

read() : 호출하면 하나의 아이템을 반환하거나 더 이상 아이템이 없으면 null을 반환

 

ItemReader 구현체 목록

  • JdbcCursorItemReader
  • JdbcPagingItemReader
  • ZeroOffsetItemReader
  • HibernateCursorItemReader → Spring batch 5.0 이상에서는 JpaCursorItemReader 사용

JdbcPagingItemReader, JdbcCursorItemReader 에서 Paging, Cursor의 차이점?

  • Paging은 pagination 처럼 끊어서 읽는 방식
    • 커넥션/쿼리가 페이지 단위로 끊어지기 때문에, 한 커넥션을 오래 잡고 있지 않음
    • 재시작이 상대적으로 깔끔
      • 몇 페이지까지 처리했는지를 기준으로 다시 읽을 수 있음
    • 페이지마다 쿼리를 여러 번 실행하니, 전체적으로 커서 방식보다 SQL 호출 수가 많음
  • Cursor은 두루마리 휴지 같이 열려 있으면서 계속 뽑아갈 수 있는 방식
    • 초기화 시점에 커서를 열고, read 메서드가 호출될 때마다 커서를 한 row 앞으로 이동시키며, 처리에 사용할 수 있는 매핑 된 객체를 반환
    • SQL 한 번만 실행해서 ResultSet 을 열어 둠
    • 그 다음 read() 를 부를 때마다 rs.next() 를 호출해서
      • 한 행씩 앞으로만 계속 읽어감 (forward-only)
      • db Connection은 Step이 끝날 때 close된다.

대용량 데이터 조회 시

  • Cursor: 조회 끝날 때까지 커넥션+커서가 계속 살아 있음 → 중간에 끊길 위험이 상대적으로 크고
  • Paging: 페이지마다 쿼리/커넥션을 짧게 쓰고 바로 반납 → 중간 장애/타임아웃 위험이 적다.

 

reader에서 parameter 설정

 

JobParameter 활용 방법 (feat. LocalDate 파라미터 사용하기)

Spring Batch에서는 Spring Environment Variables (환경 변수) 외에 Batch에서만 사용할 수 있는 JobParameter를 지원합니다. Spring Batch Scope와 Job Parameter 기존에 사용하던 방식으로는 불편한 점이나 단점이 많아

jojoldu.tistory.com

 

JdbcCursorItemReader

@StepScope 설정시 함수의 return값은 ItemReader이 아닌 정확한 구현 클래스여야 한다.
@Bean
@StepScope
public JdbcCursorItemReader<VhcleSttusInfoDTO> reader(@Value("#{jobParameters[creatDt]}") String creatDt) {
    String sql;
    List<String> param = new ArrayList();
    param.add(creatDt);
    param.add(creatDt);
    sql = "";

    return new JdbcCursorItemReaderBuilder<VhcleSttusInfoDTO>()
            .fetchSize(chunkSize)
            .dataSource(dataSource)
            .beanRowMapper(VhcleSttusInfoDTO.class)
            .sql(sql)
            .queryArguments(param)
            .name("jdbcCursorItemReader")
            .build();
}
  • fetchSize : step에서 지정된 chuksize와 동일하게 설정해주는 것이 좋다.
    • fetchSize : DB에서 한 번에 가져오는 묶음 크기 ↔ chunk : 한 번에 처리+커밋하는 단위 크기
  • dataSource : sql을 실행시킬 data base에 해당하는 dataSource
  • 데이터를 얻을때
    1. .rowMapper((resultSet, rowNum) -> { … })
      .rowMapper((resultSet, rowNum) -> {
        VhcleSttusInfoDTO item = new VhcleSttusInfoDTO(resultSet.getDate("streDt"), resultSet.getDouble("vhcleVe"));
        log.info("Read item: {}", item.toString());
        return item;
      })
    2. .beanRowMapper(Class.class) ← 추천!
  • queryArguments(List a) : sql에 적용시킬 파라미터
  • name(””) : 해당 Reader 를 식별하는 메타데이터 이름

 

JpaPagingItemReader

 

JPA EntityManager 기반이라 JPA 매핑(@Entity 등)만 필요 bean Mapping 필요 없음

@Bean
public JpaPagingItemReader itemReader() {
    return new JpaPagingItemReaderBuilder<CustomerCredit>()
                    .name("creditReader")
                    .entityManagerFactory(entityManagerFactory())
                    .queryString("select c from CustomerCredit c")
                    .pageSize(1000)
                    .build();
}

 

Processor

Reader로 읽어온 Item을 데이터를 가공/처리하는 역할

배치를 처리하는데 필수 요소는 아님

item을 필터 도중 null로 리턴하면, 그 item은 write로 전달되지 못함( 값이 정확히 있는 item들만 write로 전달됨 )

@Bean
public ItemProcessor<Weather, Weather> itemProcessor() {
    return item -> {
        // 온도 범위에 따라 qcFlag 계산
        // 예: weather.setQcFlag(calc(weather.getTemperature()));
        return item ;
    };
}

<reader에서 읽은 타입, writer에 보낼 타입>

Item Processor

아이템을 처리하는 비지니스 로직을 추상화한 개념

 

Writer

Processor로 가공/처리 된 데이터들(items : List< item >)을 Database에 저장하는 역할

 

ItemWriter 인터페이스

step에서 아이템을 출력하는 작업을 추상화한 개념

ItemWriter는 기능적으로 ItemReader와 비슷하다. 리소스를 찾고, 열고, 닫는 과정은 여전히 필요하지만, ItemWriter는 읽기(read)가 아니라 쓰기(write) 를 수행한다는 점

데이터베이스나 큐의 경우, 이러한 작업은 삽입(insert), 갱신(update), 또는 전송(send)일 수 있다.

public interface ItemWriter<T> {

    void write(Chunk<? extends T> items) throws Exception;

}

write() : 열려 있는 동안, 전달된 아이템 리스트를 기록하려고 시도

아이템들은 청크(chunk) 단위로 묶여 출력하기 때문에 인터페이스는 단일 아이템이 아니라 아이템 리스트를 입력으로 받는다. 반환하기 전에 필요한 경우 flush(버퍼 비우기) 작업을 수행할 수 있다.

 

CompositeItemWriter

하나의 step에서 여러 개의 ItemWriter를 사용하고 싶을 때 사용하는 Writer

@Bean
public CompositeItemWriter<T> wwriter() {
    return new CompositeItemWriterBuilder()
						.delegates(writer1(), writer2())
						.build();
}

JdbcBatchItemWriter

대용량 처리 좋음

@Bean
public ItemWriter<VhcleSttusInfoHist> writer() {
    return new JdbcBatchItemWriterBuilder()
            .dataSource(dataSource)
            .sql("")
			.beanMapped()
            .build();
}
  • property
property parameter chunk type sql 설명
columnMapped 없음 Map<T, E> """
INSERT INTO vhcle_sttus_info_hist ( vhcle_id, status, creat_dt )
VALUES ( ?, ?, ? )
""”
Key,Value 기반으로 Insert SQL의 Values를 매핑
(ex: Map<String, Object>)
beanMapped 없음 Entity """
INSERT INTO vhcle_sttus_info_hist ( vhcle_id, status, creat_dt )
VALUES ( :vhcleId, :status, :creatDt )
""”
Pojo 기반으로 Insert SQL의 Values를 매핑
:vhcleId  item.getVhcleId()
:status  item.getStatus()
이런 식으로 필드/게터 이름과 SQL 파라미터 이름을 자동 매핑

RepositoryItemWriter

@Bean
public RepositoryItemWriter<Weather> weatherWriter(WeatherJpaRepository repo) {
    return new RepositoryItemWriterBuilder<Weather>()
            .repository(repo)
            .methodName("updateQcFlagById")
            .build();
}

WeatherJpaRepository : JpaRepository를 구현하는 클래스 인입 후 해당 클래스 내부의 메소드 명을 넣는다.

  • 엔티티 하나씩 save가 일어나서 영속성 컨텍스트 커짐, 성능 저하에 취약

 

JpaItemWriter

EntityManager에서 바로 엔티티의 상태를 update 해준다.

@Bean
public JpaItemWriter<Weather> weatherWriter(EntityManagerFactory emf) {
    JpaItemWriter<Weather> writer = new JpaItemWriter<>();
    writer.setEntityManagerFactory(emf);  // 필수
    // writer.setUsePersist(true);  // persist 쓸지, merge 쓸지 선택 가능
    return writer;
}
  • 영속성 컨텍스트/1차 캐시에 의한 메모리/성능 이슈가 발생

 

Tasklet

  • Step 내에서 단일 작업을 수행하는데 사용
  • Reader가 없이 저장 프로시저 호출 한 번으로만 구성되어야 하는 경우를 위한 방법
  • 한번에 처리하므로 Chuck 크기만큼 데이터를 나누어 처리하는 방식과 상반되어 대용량에는 적합하지 않는다.

1. TaskletAdapter

(예: MethodInvokingTaskletAdapter)

  • 목적 : 기존의 DAO, 서비스, 헬퍼 클래스 같은 일반 자바 객체를 Tasklet처럼 보이게 어댑트(adapt) 해주는 역할
  • Tasklet 인터페이스를 직접 구현하지 않은 클래스라도, 특정 메서드를 Tasklet의 execute()처럼 실행할 수 있도록 감싸줌
  • 재사용성에 초점을 둔 방식

예를 들어 FooDao라는 DAO에 updateFoo()라는 메서드가 이미 있다면 굳이 Tasklet 구현 클래스를 새로 작성할 필요 없이

@Bean
public MethodInvokingTaskletAdapter myTasklet() {
    MethodInvokingTaskletAdapter adapter = new MethodInvokingTaskletAdapter(); // adapter
    adapter.setTargetObject(fooDao());
    adapter.setTargetMethod("updateFoo");
    return adapter;
}

이렇게 쓰면 updateFoo()가 Tasklet의 execute()처럼 호출

장점: 간단하게 기존 로직을 재사용 가능

단점: Tasklet의 라이프사이클(RepeatStatus, ChunkContext, StepContribution) 같은 활용이 어려움

 

2. Tasklet 직접 구현

(예: FileDeletingTasklet)

  • 목적 : 배치 시나리오에 특화된 새로운 동작을 구현할 때 사용
  • Tasklet 인터페이스를 구현해서 execute() 메서드 안에 원하는 로직을 작성
public class FileDeletingTasklet implements Tasklet {
    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        // 파일 삭제 로직 직접 구현
        return RepeatStatus.FINISHED;
    }
}

장점 : Step의 실행 맥락(StepContribution, ChunkContext)을 활용할 수 있고, 복잡한 로직을 자유롭게 구현 가능

단점 : 새 클래스를 작성해야 하고 코드가 조금 길어짐

 

: 람다식을 이용한 함수형 인터페이스 구현

@Bean
Tasklet regionUpdateTasklet(RetryTemplate regionRetryTemplate) {
    return (contribution, chunkContext) -> {     
        regionRetryTemplate.execute(context -> {
            regionJpaRepository.updateRegionCount();
            return null;
        });
        return RepeatStatus.FINISHED;
    };
}

 

Restart

Job/Step이 끝난 뒤 다시 실행 할 때 설정

in Job

Job이 항상 새로운 JobInstance로 실행되어야 할 때 preventRestart()를 설정

@Bean
public Job footballJob(JobRepository jobRepository) {
    return new JobBuilder("footballJob", jobRepository)
         .preventRestart()
         ...
         .build();
}

in step

@Bean 
@JonScope
Step testStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new StepBuilder("testStep", jobRepository)
            .<String, String>chunk(chunkSize, transactionManager)
            .reader(customItemReader())
            .processor(customItemProcessor())
            .writer(items -> System.out.println("items=" + items))
        		.startLimit(2)
        		.allowStartIfComplete(true)
            .build();
}
  • .allowStartIfComplete(true) : 이미 성공한 Step 일 경우 다시 실행 여부를 설정
    • JOB이 다시 실행되면 Step은 COMPLETED인 STEP은 건너뛴다. 하지만 위 설정을 true로 하면 Step은 항상 다시 실행된다.
    • true : 다시 시작
    • false : default 값으로 재시작 시 건너뛰기 한다.
  • .startLimit(2) : 가능한 최대 실행 횟수 ( 1번 실행하고 성공/실패 상관없이 N-1 번 재실행 할 수 있다. )
    • 인수 보다 더 실행 할 경우 StartLimitExceededException이 발생
    • Step 재실행은 Job 자체를 재실행(restart 또는 rerun)했을 때 그 안의 Step이 다시 실행될 수 있는지 여부
    • default : Integer.MAX_VALUE ( 무제한 )

ItemStream 등록하기 : 배치 처리의 상태(state)를 관리하기 위한 인터페이스

  • 리더/라이터가 “상태를 관리하고 복원”할 수 있게 하는 확장 인터페이스
  • Step이 실패했다가 재시작할 중단점을 저장하기 위한 상태 저장/복원 용도
  • Writer(출력)이 대상에 따라 ItemStream등록
  • Reader은 거의 데이터를 읽어오는 로직이므로 상태 관리할 필요가 없으므로 Writer에 주로 ItemStream을 등록한다.
  1. 기본 규칙
  • Step에 직접 등록된 ItemReader / ItemWriter / ItemProcessor→ 별도로 .stream() 호출할 필요 없음
  • → 이들이 ItemStream을 구현하고 있다면 자동으로 등록됨
.writer(flatFileItemWriter())  // FlatFileItemWriter가 ItemStream 구현

👉 이 경우 자동으로 ItemStream 처리됨

  1. .stream() 이 필요한 경우
  • CompositeItemWriter / CompositeItemReader 같은 경우
    • 자기 자신은 ItemStream을 구현하지 않고, 내부 delegate만 ItemStream인 경우
    • → delegate들을 .stream()으로 직접 등록해줘야 함
.writer(compositeItemWriter())
.stream(fileItemWriter1())  // delegate 1
.stream(fileItemWriter2())  // delegate 2
  • 간접 의존성(delegates, 내부에서 주입된 Stream 등)
    • Step이 직접 알 수 없으니 수동 등록 필요
  1. Spring Batch 4 이후 변화

Spring Batch 4부터는 CompositeItemWriter도 ItemStream을 구현하도록 개선됨

👉 그래서 delegate들이 ItemStream이면 자동으로 처리 가능 → .stream() 거의 필요 없음

 

Rollback

예외가 났을 때 chunck 단위로 트랜잭션 롤백

트랜잭션 = reader/processor/writer가 한 덩어리(chunk)로 같은 트랜잭션을 공유

  • ItemReader 에서 발생한 예외는 skip 이 설정된 경우에 롤백을 일으키지 않는다.
    chunckSize = 3, item = {A, B, C} 일때
    
    - A: 정상 read
    
    - B: Reader에서 파싱 예외(FlatFileParseException) → skip 대상 → **롤백 없이** B는 건너뛰고 C부터
    
    - C: 계속 read, 이후 processor/writer 흐름
  • ItemWriter 는 noRollback에 상관없이 모든 예외가 롤백 됨
    - A: processor + writer 까지 정상 → 아직 트랜잭션 안 끝남 (chunk 전체 단위)
    
    - B: writer에서 예외
        
        → chunk 트랜잭션 롤백 (A까지 쓴 것도 모두 취소)
        
        → B를 skip 대상으로 기록
        
        → 남은 아이템들(A, C)을 다시 읽어서
        
        B를 빼고 [A, C]만 다시 writer에 전달 후 커밋
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(2, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.faultTolerant()
				.noRollback(IgnorableDataException.class)
				.build();
}

.noRollback(ValidationException.class) : 이 예외를 item read 또는 processing 단계에서 롤백을 무시할 수 있는 예외로 표시

  • 예외를 조용히 무시하고 chunck처리를 계속 진행 ⇒ 다음 아이템으로 넘어감
  • 사용하는 경우가 상대적으로 드물다. ⇒ 추적이 안됨
  • 완전히 무시해도 되는 데이터인 경우
    if ( ... ) { throw new IgnorableDataException(...); }
  • 예외를 일으켜 해당 데이터를 제외하고 계속 배치가 진행되도록 함

 

Skip/Retry

job/step에서 exception 발생 시 어떻게 로직을 진행시킬지 결정

retry → 그래도 안 되면 skip 처리 이렇게 하나의 예외에 retry + skip 처리 가능

Skip

오류가 실패로 이어지지 않고 건너뛰기 되도록 해야 되는 시나리오

  • 일부 데이터가 잘못된 형식으로 인한 오류일 경우 Skip하고 다른 데이터를 정상 처리
    깨진 데이터/검증 실패
  • 한 건이라도 잘못되면 안 되는 데이터는 절대 Skip하면 안 되고 바로 실패 시키는 것이 좋다.

해당 방식은 Chunck 단위에서 사용하는 방식

@Bean Step testStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new StepBuilder("testStep", jobRepository)
            .<String, String>chunk(chunkSize, transactionManager)
            .reader(customItemReader())
            .processor(customItemProcessor())
            .writer(items -> System.out.println("items=" + items))
            **.faultTolerant()
            .skip(CustomException.class)
            .noSkip(FileNotFoundException.class)
            .skipLimit(3)**
            .build();
}
  • .faultTolerant() : skip, retry 같은 내결함성 옵션을 쓰기 위해 켜는 오류 허용(fault tolerance) 모드로 전환하는 스위치 역할
  • .skipLimit(3) : 모든 read/process/write 단계에서 발생한 skip을 합쳐서 3번까지 허용
    default : 10
  • .skip(CustomException.class) : 해당 exception 클래스만 스킵 가능하고 그 외 모든 예외는 바로 실패 처리
  • .noSkip(FileNotFoundException.class) : 해당 exception 클래스만 스킵 대상에서 제외 → 치명적인 에러로 즉시 실패 처리 해야됨
    • 두 설정이 없다면 가장 가까운 상위 클래스 기준으로 스킵 가능/불가를 판단
      기본은 스킵 불가
    • skip()과 noSkip() 메서드의 호출 순서는 중요하지 않다.
    • 모든 예외는 통과 하지만 특정 예외 하나만 skip하고 싶지 않을 때 아래 조합을 사용
      • .noSkip(FileNotFoundException.class)
      • .skip(Exception.class)

Retry

  • 데이터 자체는 정상인데 네트워크, 타임아웃 등 환경 문제 일 때
  • Deadlock, 락 타임아웃, 네트워크 끊김 같은 일시적인 장애 ⇒ ****잠시 기다렸다가 다시 시도하면 성공할 수도 있다.
  • Example Exception
    • TransientDataAccessException
      • DeadlockLoserDataAccessException : 데드락
      • CannotAcquireLockException : 락
      • QueryTimeoutException : 쿼리 타임아웃
      • TransientDataAccessResourceException : 네트워크 끊김, 일시적인 커넥션 오류
    • RestClientException : 외부 API 호출 시
    SocketTimeoutException

 

in reader/writer step

  • .retryLimit(3) :
    각 아이템(데이터) 처리 중에 processor/write에서 나는 retry 대상 예외에 대해 최대 3번 재시도 가능하고 그 외 모든 예외는 바로 실패 처리
  • .retry(TransientDataAccessException.class) :
    TransientDataAccessException(및 그 하위 예외 들)은 retry 대상
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(2, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.faultTolerant()
				.retryLimit(3)
				.retry(TransientDataAccessException.class)
				.build();
}

시나리오: Item1, Item2 처리 (chunkSize=2)

  • Item1, Item2 모두 최종적으로 성공했으므로 → writer에게 [Item1, Item2] 전달
  • writer 처리 후 commit

단계 아이템 동작 결과 비고

1 Item1 read/process/write ✅ 성공 문제 없음
2 Item2 read ❌ 예외 발생 Deadlock 예외 가정
3 Item2 (Retry #1) read ❌ 예외 발생 retryLimit 3 중 1회
4 Item2 (Retry #2) read ❌ 예외 발생 retryLimit 3 중 2회
5 Item2 (Retry #3) read ✅ 성공 retryLimit 3 중 3회, 성공

 

in tasklet step

tasklet은 chunck 단위가 아니므로 RetryTemplate 또는 @Retryable을 사용

 

RetryTemplate

RetryOperations 의 가장 단순한 범용 구현 재시도 엔진

RetryOperations : 네트워크 문제로 인해 웹 서비스나 RMI 서비스에 대한 원격 호출이 실패했거나, 데이터베이스 업데이트에서 DeadLockLoserException 이 발생한 경우 잠시 기다린 뒤에 스스로 해결하기 위한 재시도를 자동화하기 위한 전략

@Bean
RetryTemplate regionRetryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();

    Map<Class<? extends Throwable>, Boolean> retryable = new HashMap<>();
    retryable.put(TransientDataAccessException.class, true);

    SimpleRetryPolicy retryPolicy =
            new SimpleRetryPolicy(3, retryable, true);

    ExponentialBackOffPolicy backoff = new ExponentialBackOffPolicy();
    backoff.setInitialInterval(500L);  
    backoff.setMultiplier(2.0);         
    backoff.setMaxInterval(5_000L);     

    retryTemplate.setRetryPolicy(retryPolicy);
    retryTemplate.setBackOffPolicy(backoff);

    return retryTemplate;
}

execute 매개변수

  • RetryCallback : RetryOperations로 재시도할 수 있는 작업을 표현하는 콜백
  • RecoveryCallback : 정책에 따라 재시도 가능한 횟수를 전부 사용하고도 여전히 실패했을 때 마지막에 한 번 호출되는 후 처리용 콜백 ⇒ 복구/보상 로직
  • RetryState : RetryContext가 어떤 아이템/호출에 대한 것이냐를 나타내는 식별자

RetryContext : 현재까지 몇 번째 시도인지, 어떤 예외가 났는지 저장

  • 재시도 대상 식별 키
  • 롤백 필요 여부 등등
  • 몇 번째 시도인지

RetryPolicy :

몇 번까지, 어떤 예외에 한해서, 언제까지 재시도 할 건지 ⇒ 재시도 전략을 캡슐화한 객체, 설정된 정책에 따라 RetryContext의 상태 값을 갱신

RetryTemplate의 execute 메서드가 재시도할지 실패로 처리할지는 RetryPolicy에 의해 결정

  1. RetryTemplate은 현재 정책을 사용해 RetryContext를 생성하고(RetryContext context = retryPolicy.open…)
  2. RetryCallback이 실패하면 RetryTemplate은 RetryPolicy에 호출해(상태는 RetryContext에 저장됨) 그 상태를 갱신하도록 요청
  3. 그런 다음 추가 시도가 가능한지 확인 후 추가 시도가 불가능하다면(예: 한도 도달, 타임아웃 감지) 정책은 소진(exhausted) 상태를 식별
  4. 소진 시
      RecoveryCallback 호출 원래 예외 rethrow  RetryExhaustedException
    stateless ( RetryState X ) - O X
    RecoveryCallback O X X

SimpleRetryPolicy : 는 지정된 예외 타입 목록 중 어떤 것이든 고정된 횟수만큼 재시도를 허용

  • RetryPolicy 구현체
SimpleRetryPolicy policy = new SimpleRetryPolicy(5, 
	Collections.singletonMap(Exception.class, true));

RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(policy);
template.execute(new RetryCallback<MyObject, Exception>() {
    public MyObject doWithRetry(RetryContext context) {
        // 비즈니스 로직
    }
});

Map<Class<? extends Throwable>, Boolean> retryable 
= new HashMap<>();
retryable.put(TransientDataAccessException.class, true);

SimpleRetryPolicy retryPolicy =
        new SimpleRetryPolicy(3, retryable, true);

Throwable : Java에서 예외 계층의 최상위 부모 클래스

 

BackoffPolicy : 실패하면 얼마 동안 기다렸다가 다시 시도할 지를 정하는 전략

  • 일시적(Transient) 실패 후 재시도할 때는, 다시 시도하기 전에 잠시 기다리는 것이 도움이 되는 경우가 많다.
  • 실패가 기다림으로만 해결될 수 있는 문제 때문에 발생하기도 한다. ( DB 락 경합, 네트워크 글리치처럼 잠깐 기다리면 풀리는 케이스 )
  • RetryCallback이 실패하면 RetryTemplate은 BackoffPolicy에 따라 실행을 일시 중지할 수 있다.
public interface BackoffPolicy {

    BackOffContext start(RetryContext context);

    void backOff(BackOffContext backOffContext)
        throws BackOffInterruptedException;

}

지수적으로 증가시키는 백오프를 사용하면 대기 시간이 시도마다 지수적으로 증가해, 분산 서버 환경에서 재시도 타이밍이 서로 어긋나면서 DB 락 경합이나 외부 API Rate Limit 같은 문제를 완화할 수 있다.

Spring Retry는 ExponentialBackoffPolicy 구현체를 제공

ExponentialBackOffPolicy backoff = new ExponentialBackOffPolicy();
backoff.setInitialInterval(500L);  // 첫 번째 실패 이후 대기 시간 (ms)
backoff.setMultiplier(2.0);    // 초기 0.5s, 배수 2배, 최대 5s
backoff.setMaxInterval(5_000L);     // 대기 시간 max 값 (ms)

wait(n) = initialInterval × (multiplier)^(n-1)

 

Listeners

retry를 monitoring하고 싶을 때 Listeners를 설정

template.registerListener(new MethodInvocationRetryListenerSupport() {
      @Override
      protected <T, E extends Throwable> void doClose(RetryContext context,
          MethodInvocationRetryCallback<T, E> callback, Throwable throwable) {
        monitoringTags.put(labelTagName, callback.getLabel());
        Method method = callback.getInvocation()
            .getMethod();
        monitoringTags.put(classTagName,
            method.getDeclaringClass().getSimpleName());
        monitoringTags.put(methodTagName, method.getName());

        // 적절한 태그를 가진 모니터링 카운터 등록
        // ...
      }

      @Override
      protected <T, E extends Throwable> void doOnSuccess(RetryContext context,
              MethodInvocationRetryCallback<T, E> callback, T result) {

          Object[] arguments = callback.getInvocation().getArguments();

          // 주어진 인자들에 대한 결과를 수용할지,
          // 아니면 재시도 정책에 따라 재시도할지 결정
      }
});

in tasklet example

public class RetryingTasklet implements Tasklet {

    private final RetryTemplate retryTemplate;
    private final RemoteClient remote; // 외부 API/DB 호출 등

    public RetryingTasklet(RemoteClient remote) {
        this.remote = remote;

        var retryPolicy = new SimpleRetryPolicy(
            3, // 총 시도 횟수(= 1회 + 재시도 2회)
            Map.of(SocketTimeoutException.class, true), // retry 대상 예외만 true
            true // cause chain 탐색
        );

        var backoff = new ExponentialBackOffPolicy();
        backoff.setInitialInterval(500);
        backoff.setMultiplier(2.0);    // 초기 0.5s, 배수 2배, 최대 5s
        backoff.setMaxInterval(5_000);

        retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(retryPolicy);
        retryTemplate.setBackOffPolicy(backoff);
    }

    @Override
    public RepeatStatus execute(StepContribution c, ChunkContext ctx) throws Exception {
        return retryTemplate.execute(context -> {
            // === retry 대상 로직 ===
            remote.call();   // 실패 시 지정 예외 던짐
            return RepeatStatus.FINISHED;

        }, context -> {
            // === 모든 재시도 소진 후(복구/보상) ===
            c.setExitStatus(ExitStatus.FAILED); // 필요시 커스텀 ExitStatus
            throw new IllegalStateException("tasklet retry exhausted", context.getLastThrowable());
        });
    }
}

 

트랜잭션 처리

Step 과 Service의 transaction 관계는 기본적으로 Step 트랜잭션 안에 서비스가 참여하는 구조이다.

Service @Transactional(propagation = Propagation.REQUIRES_NEW) 이라면 Step, Service가 트랜잭션을 공유하지 않고 Step 트랜잭션은 보류하고 Service는 독립적인 트랜잭션을 새로 만들고 Service 로직이 끝나면 다시 Step 트랜잭션을 이어서 사용
retry 정책에 의해 재시도 될때 Service의 트랜잭션만 재시도한다. Step을 재시도 하는 것 보다 컨트롤이 쉽고 범위가 좁다.

트랜잭션 옵션 : 트랜잭션의 격리 수준, 전파 방식, 타임아웃 설정 제어

  • Propagation.REQUIRED → 기존 트랜잭션이 있으면 참여, 없으면 새로 시작→ 다른 트랜잭션 안에서 실행되면 그걸 그대로 이어받음
  • → 단독으로 실행되면 스스로 트랜잭션을 시작
  • Isolation.DEFAULT → 데이터베이스 기본 격리 수준 사용
  • Timeout(30초) → 트랜잭션이 30초를 초과하면 롤백
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
	attribute.setPropagationBehavior(Propagation.REQUIRED.value()); // 전파 방식
	attribute.setIsolationLevel(Isolation.DEFAULT.value());         // 격리 수준
	attribute.setTimeout(30);                                       // 타임아웃 (초 단위)

	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(2, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.transactionAttribute(attribute) // 트랜잭션 속성 적용
				.build();
}

Listener

Step 내부에서 일어나는 여러 이벤트 지점에 대해 리스너(Listener)를 붙여서 추가 로직을 실행할 수 있다.

 

Intercepting Step Execution :: Spring Batch Reference

ItemReadListener, ItemProcessListener, and ItemWriteListener all provide mechanisms for being notified of errors, but none informs you that a record has actually been skipped. onWriteError, for example, is called even if an item is retried and successful.

docs.spring.io

 

Step 가로채기 : StepListener는 Step 실행 중 특정 시점(전/후/에러 발생 시점)에 개입해서 추가 로직을 실행할 수 있는 확장 포인트

  • ItemReader → 읽기 전/후, 에러 발생 시 (ItemReadListener)
  • ItemProcessor → 처리 전/후, 에러 발생 시 (ItemProcessListener)
  • ItemWriter → 쓰기 전/후, 에러 발생 시 (ItemWriteListener)
  • 청크 단위로 커밋할 때 호출(ChunkListener)
  • 실제로 skip 발생했을 때 호출 (SkipListener)