Spring Batch 애플리케이션 구축기: ERP 시스템의 대용량 자동 처리 구현


들어가며

ERP 시스템을 구축하던 중, 점점 더 많은 자동화 요구사항이 발생하기 시작했다.

대표적인 요구사항들:

  • 매일 새벽 휴면 회원 일괄 처리
  • 매월 초 관리비 자동 부과
  • 세금 데이터 수집 및 동기화
  • 대량의 전자세금계산서 발행

이러한 작업들의 공통점은:

  • 대용량 데이터 처리가 필요
  • 주기적 자동 실행이 필요
  • 실패 시 재처리 가능해야 함
  • 처리 이력 추적이 필요

초기에는 단순 스케줄러로 처리하려 했지만, 곧 한계에 부딪혔다.

// 초기 시도: @Scheduled만으로 처리
@Scheduled(cron = "0 0 2 * * ?")
public void processDormantUsers() {
    List<User> users = userRepository.findAll(); // 전체 조회... OOM 위험
    users.forEach(user -> {
        // 실패하면? 어디서부터 재시작?
        // 처리 이력은? 모니터링은?
    });
}

문제점:

  • 전체 데이터를 메모리에 로드 → OOM(Out Of Memory) 위험
  • 중간에 실패하면 처음부터 다시 시작
  • 어디까지 처리했는지 추적 불가
  • 실행 이력 관리 어려움

자체적으로 배치 시스템을 구축하기엔 너무 복잡했다. 트랜잭션 관리, 재시작 메커니즘, 메타데이터 관리 등을 직접 구현하는 것은 비효율적이었다.

결론: Spring Batch를 도입하기로 결정했다.

이 글은 Spring Batch 기반 배치 애플리케이션을 구축하면서 요구사항 분석부터 설계, 구현까지 전 과정을 기록한 내용이다.

1. 요구사항 분석

1.1. 기능 요구사항

필수 기능:

  • 배치 Job을 데이터베이스로 관리
  • 즉시 실행 (수동 트리거)
  • 스케줄 실행 (Cron 기반)
  • 실행 이력 조회
  • 실패 시 알림

사용자 시나리오:

시나리오 1: 관리자가 즉시 실행

1. 관리 화면에서 "휴면 회원 처리" Job 선택
2. 필요한 파라미터 입력 (예: 기준일자)
3. "즉시 실행" 버튼 클릭
4. 실행 상태 확인

시나리오 2: 스케줄 자동 실행

1. 관리자가 "매일 새벽 2시" 스케줄 등록
2. 시스템이 자동으로 해당 시간에 실행
3. 실패 시 담당자에게 알림 발송
4. 관리자가 실행 이력 확인 후 재실행 여부 결정

1.2. 비기능 요구사항

성능:

  • 대용량 데이터 처리 (수십만 건)
  • 메모리 효율적 처리 (Chunk 단위)
  • 적절한 트랜잭션 경계

안정성:

  • 실패 시 재시작 가능
  • 중복 실행 방지
  • 트랜잭션 롤백 지원

운영성:

  • 실행 이력 추적
  • 실패 알림
  • 동적 스케줄 관리

1.3. 데이터 모델 요구사항

배치 Job 관리:

-- batch_job 테이블
CREATE TABLE batch_job (
    batch_job_seq BIGINT PRIMARY KEY,
    id VARCHAR(100),              -- Job Bean 이름
    name VARCHAR(200),            -- Job 표시 이름
    description TEXT,             -- 설명
    use_flag CHAR(1)              -- 사용 여부
);

스케줄 관리:

-- batch_cron 테이블
CREATE TABLE batch_cron (
    batch_cron_seq BIGINT PRIMARY KEY,
    batch_job_seq BIGINT,         -- FK to batch_job
    expression VARCHAR(100),       -- Cron 표현식
    parameter_data_json TEXT,      -- Job 파라미터 (JSON)
    use_flag CHAR(1),              -- 사용 여부
    valid_start_date DATE,         -- 유효 시작일
    valid_end_date DATE            -- 유효 종료일
);

2. 기술 선택

2.1. 왜 Spring Batch인가?

Spring Batch의 장점:

  • Chunk 지향 처리로 대용량 데이터 처리 최적화
  • 재시작 메커니즘 기본 제공
  • 메타데이터 자동 관리
  • 트랜잭션 관리 자동화
  • 풍부한 리스너와 확장 포인트

2.2. 최종 기술 스택

백엔드:

  • Spring Batch 4.x
  • Spring Boot 2.x
  • MyBatis (데이터베이스 연동)
  • Spring Task Scheduler (스케줄링)

데이터베이스:

  • Spring Batch 메타 테이블
  • 커스텀 배치 관리 테이블

인프라:

  • 단일 서버 구성 (이중화는 향후 고려)

3. 시스템 설계

3.1. 전체 아키텍처

시스템은 크게 3개 계층으로 설계했다.

┌─────────────────────────────────────────────────────────────────┐
│                    프레젠테이션 계층                                │
├─────────────────────────────────────────────────────────────────┤
│  BatchCommonController (REST API)                               │
│  - 즉시 실행 (동기/비동기)                                           │
│  - 파라미터 조회                                                   │
│  - 스케줄 관리                                                     │
└───────────────────────┬─────────────────────────────────────────┘
                        │
                        ▼
┌─────────────────────────────────────────────────────────────────┐
│                    비즈니스 계층                                   │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────────-─┐    ┌────────────────────────────────┐    │
│  │ BatchCommonService│    │ BatchCronScheduleManager       │    │
│  │ - 실행 조정         │    │ - 동적 스케줄 관리                 │    │
│  └─────────────────-─┘    └────────────────────────────────┘    │
│  ┌─────────────--─────┐    ┌────────────────────────────────┐    │
│  │ JobParameterFactory│    │ JobExecutor                    │    │
│  │ - 파라미터 변환       │    │ - Job 실행 (동기/비동기)           │    │
│  └────────────────────┘    └────────────────────────────────┘    │
└───────────────────────┬──────────────────────────────────────────┘
                        │
                        ▼
┌─────────────────────────────────────────────────────────────────┐
│                    Spring Batch 계층                             │
├─────────────────────────────────────────────────────────────────┤
│  Job (JobBuilderFactory)                                        │
│   │                                                             │
│   ├─> CustomRunIdIncrementer (고유 실행 ID 생성)                   │
│   ├─> BatchJobExecutionListener (실행 전후 처리)                   │
│   │                                                             │
│   └─> Step (StepBuilderFactory)                                 │
│        │                                                        │
│        ├─> Reader (MyBatisPagingItemReader)                     │
│        ├─> Processor (ItemProcessor)                            │
│        └─> Writer (MyBatisBatchItemWriter)                      │
└─────────────────────────────────────────────────────────────────┘

3.2. 핵심 설계 결정 사항

1) Job과 Parameter의 중앙 관리

// BatchJobEnum: Job과 Parameter를 함께 관리
public enum BatchJobEnum {
    SAMPLE_JOB("sampleJob", SampleJobParameter.class),
    DORMANT_USER_JOB("dormantUserJob", null),
    ...
    
    private final String id;  // Bean 이름
    private final Class<? extends BasicJobParameter> parameterClass;
}

장점:

  • Job과 Parameter의 관계를 한 곳에서 관리
  • 타입 안정성 제공
  • IDE의 자동 완성 지원

2) Reflection 기반 동적 파라미터 처리

// BasicJobParameter 인터페이스를 통한 표준화
public interface BasicJobParameter {
}

// @BatchParameter로 메타데이터 제공
public class SampleJobParameter implements BasicJobParameter {
    @BatchParameter(description = "처리 대상 날짜", example = "2024-01-01")
    @Value("#{jobParameters[targetDate]}")
    private Date targetDate;
}

장점:

  • 관리 화면에서 파라미터 정보 자동 조회
  • 파라미터 검증 용이
  • 확장 가능한 구조

3) 동적 스케줄 관리

// 데이터베이스 기반 스케줄 관리
@Component
public class BatchCronScheduleManager {
    // 메모리에서 스케줄 관리
    private final Map<Long, ScheduledFuture<?>> scheduleMap = new HashMap<>();
    
    // 스케줄 추가/수정/삭제를 동적으로 처리
    public void insert(BatchCron batchCron) { ... }
    public void update(BatchCron batchCron) { ... }
    public void delete(Long batchCronSeq) { ... }
}

장점:

  • 서버 재시작 없이 스케줄 변경 가능
  • 유효 기간 관리 가능
  • 사용 여부 토글 가능

4. 핵심 컴포넌트 구현

4.1. CustomRunIdIncrementer

목적: 동일한 파라미터로 Job을 여러 번 실행 가능하게 함

문제 상황:

// Spring Batch 기본 동작
// 동일한 Job + 동일한 Parameter = 중복 실행 방지
JobParameters params = new JobParametersBuilder()
    .addDate("targetDate", new Date())
    .toJobParameters();

jobLauncher.run(job, params);  // 첫 실행: 성공
jobLauncher.run(job, params);  // 두 번째 실행: 실패!
// JobInstanceAlreadyCompleteException 발생

해결:

public class CustomRunIdIncrementer implements JobParametersIncrementer {
    private static final String RUN_ID_KEY = "run.id";
    
    @Override
    public JobParameters getNext(@Nullable JobParameters parameters) {
        return new JobParametersBuilder()
            .addString(RUN_ID_KEY, makeKey())
            .toJobParameters();
    }
    
    private String makeKey() {
        // 시간 + 호스트명으로 고유 키 생성
        return String.format("%s-%s", 
            LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")),
            getHostName()
        );
    }
    
    private String getHostName() {
        try {
            return InetAddress.getLocalHost().getHostName();
        } catch (UnknownHostException e) {
            return "unknown";
        }
    }
}

적용:

@Bean
public Job dormantUserJob() {
    return jobBuilderFactory.get("dormantUserJob")
        .incrementer(new CustomRunIdIncrementer())  // 추가
        .start(dormantUserStep())
        .build();
}

효과:

  • 동일 파라미터로 재실행 가능
  • 각 실행을 고유하게 식별
  • 분산 환경에서도 충돌 방지 (호스트명 포함)

4.2. BatchJobExecutionListener

목적: Job 실행 전후 공통 처리

구현:

@Component
@RequiredArgsConstructor
public class BatchJobExecutionListener implements JobExecutionListener {
    
    private final JobExplorer jobExplorer;
    private final BatchJobExecutionServerContextMapper serverContextMapper;
    private final JobExecutionFailMessageSender messageSender;
    
    @Override
    public void beforeJob(JobExecution jobExecution) {
        // 1. 중복 실행 검증
        if (!canRun(jobExecution)) {
            jobExecution.stop();
            log.warn("Job 중복 실행 방지: {}", jobExecution.getJobInstance().getJobName());
            return;
        }
        
        // 2. 서버 컨텍스트 저장 (어느 서버에서 실행되었는지)
        serverContextMapper.save(
            BatchJobExecutionServerContextSaveRequest.of(
                jobExecution.getJobId()
            )
        );
        
        log.info("Job 시작: {} (ID: {})", 
            jobExecution.getJobInstance().getJobName(),
            jobExecution.getJobId()
        );
    }
    
    @Override
    public void afterJob(JobExecution jobExecution) {
        // 3. 실행 결과 로그
        printLog(jobExecution);
        
        // 4. 실패 시 알림 발송
        if (jobExecution.getStatus() == BatchStatus.FAILED) {
            messageSender.sendMessage(jobExecution);
        }
        
        log.info("Job 종료: {} - 상태: {}", 
            jobExecution.getJobInstance().getJobName(),
            jobExecution.getStatus()
        );
    }
    
    private boolean canRun(JobExecution jobExecution) {
        // 동일 Job + 동일 Parameter가 이미 실행 중인지 확인
        String jobName = jobExecution.getJobInstance().getJobName();
        JobParameters params = jobExecution.getJobParameters();
        
        Set<JobExecution> runningExecutions = jobExplorer.findRunningJobExecutions(jobName);
        
        return runningExecutions.stream()
            .filter(execution -> !execution.getId().equals(jobExecution.getId()))
            .noneMatch(execution -> execution.getJobParameters().equals(params));
    }
    
    private void printLog(JobExecution jobExecution) {
        StringBuilder sb = new StringBuilder("\n");
        sb.append("=================================================\n");
        sb.append("Job: ").append(jobExecution.getJobInstance().getJobName()).append("\n");
        sb.append("Status: ").append(jobExecution.getStatus()).append("\n");
        sb.append("Start: ").append(jobExecution.getStartTime()).append("\n");
        sb.append("End: ").append(jobExecution.getEndTime()).append("\n");
        sb.append("Duration: ").append(Duration.between(
            jobExecution.getStartTime().toInstant(),
            jobExecution.getEndTime().toInstant()
        ).getSeconds()).append("s\n");
        
        jobExecution.getStepExecutions().forEach(step -> {
            sb.append("\nStep: ").append(step.getStepName()).append("\n");
            sb.append("  Read: ").append(step.getReadCount()).append("\n");
            sb.append("  Write: ").append(step.getWriteCount()).append("\n");
            sb.append("  Skip: ").append(step.getSkipCount()).append("\n");
        });
        
        sb.append("=================================================");
        log.info(sb.toString());
    }
}

효과:

  • 중복 실행 방지
  • 실행 서버 추적
  • 상세 실행 로그
  • 실패 시 즉시 알림

4.3. JobParameterFactory

목적: BasicJobParameter를 Spring Batch JobParameters로 변환

구현:

@Component
@RequiredArgsConstructor
public class JobParameterFactory {
    
    private final ObjectMapper objectMapper;
    private final JobExplorer jobExplorer;
    
    // JSON 문자열을 JobParameters로 변환
    public JobParameters getJobParameters(
        Class<? extends BasicJobParameter> parameterClass,
        String parameterDataJson
    ) throws JsonProcessingException {
        
        BasicJobParameter parameter = 
            objectMapper.readValue(parameterDataJson, parameterClass);
        
        return convertJobParameters(parameter);
    }
    
    // Incrementer와 함께 JobParameters 생성
    public JobParameters getJobParametersWithIncrementer(
        Job job,
        BasicJobParameter parameter
    ) {
        return new JobParametersBuilder(jobExplorer)
            .getNextJobParameters(job)  // RunId 자동 증가
            .addJobParameters(convertJobParameters(parameter))
            .toJobParameters();
    }
    
    private JobParameters convertJobParameters(BasicJobParameter parameter) {
        JobParametersBuilder builder = new JobParametersBuilder();
        
        // Reflection으로 필드 순회
        for (Field field : parameter.getClass().getDeclaredFields()) {
            field.setAccessible(true);
            
            // @BatchParameter 확인
            BatchParameter annotation = field.getAnnotation(BatchParameter.class);
            if (annotation == null || !annotation.include()) {
                continue;  // include=false면 제외
            }
            
            try {
                Object value = field.get(parameter);
                if (value == null) continue;
                
                // 타입별 변환
                String fieldName = field.getName();
                if (value instanceof String) {
                    builder.addString(fieldName, (String) value);
                } else if (value instanceof Long) {
                    builder.addLong(fieldName, (Long) value);
                } else if (value instanceof Double) {
                    builder.addDouble(fieldName, (Double) value);
                } else if (value instanceof Date) {
                    builder.addDate(fieldName, (Date) value);
                } else if (value instanceof LocalDateTime) {
                    // LocalDateTime은 Date로 변환
                    builder.addDate(fieldName, 
                        Date.from(((LocalDateTime) value)
                            .atZone(ZoneId.systemDefault())
                            .toInstant())
                    );
                }
            } catch (IllegalAccessException e) {
                log.error("필드 접근 실패: {}", field.getName(), e);
            }
        }
        
        return builder.toJobParameters();
    }
}

사용 예제:

// Parameter 클래스 정의
@Getter
@Setter
public class SampleJobParameter implements BasicJobParameter {
    @BatchParameter(description = "처리 대상 날짜")
    @Value("#{jobParameters[targetDate]}")
    private Date targetDate;
    
    @BatchParameter(include = false)  // 관리 화면에 노출 안 됨
    private String systemField;
}

// 사용
String json = "{\"targetDate\":\"2024-01-01\"}";
JobParameters params = jobParameterFactory.getJobParameters(
    SampleJobParameter.class, 
    json
);

4.4. BatchCronScheduleManager

목적: 동적으로 스케줄을 등록/수정/삭제

구현:

@Component
@RequiredArgsConstructor
public class BatchCronScheduleManager {
    
    private final TaskScheduler taskScheduler;
    private final JobLauncher jobLauncher;
    private final JobParameterFactory jobParameterFactory;
    private final ApplicationContext applicationContext;
    
    // 스케줄 저장소 (메모리)
    private final Map<Long, ScheduledFuture<?>> scheduleMap = new ConcurrentHashMap<>();
    
    // 앱 시작 시 초기화
    public void initialize(List<BatchCron> batchCronList) {
        log.info("배치 스케줄 초기화 시작: {} 건", batchCronList.size());
        batchCronList.forEach(this::insert);
        log.info("배치 스케줄 초기화 완료: {} 건 등록", scheduleMap.size());
    }
    
    // 스케줄 추가
    public void insert(BatchCron batchCron) {
        if (!canInsertSchedule(batchCron)) {
            log.warn("스케줄 등록 불가: {}", batchCron);
            return;
        }
        
        try {
            // 1. Job Bean 조회
            BatchJobEnum jobEnum = BatchJobEnum.findByCode(batchCron.getBatchJob().getId());
            Job job = applicationContext.getBean(jobEnum.getId(), Job.class);
            
            // 2. 파라미터 준비
            BasicJobParameter parameter = null;
            if (jobEnum.getParameterClass() != null) {
                parameter = objectMapper.readValue(
                    batchCron.getParameterDataJson(),
                    jobEnum.getParameterClass()
                );
            }
            
            // 3. 스케줄 등록
            final BasicJobParameter finalParameter = parameter;
            ScheduledFuture<?> scheduledFuture = taskScheduler.schedule(() -> {
                try {
                    JobParameters params = jobParameterFactory
                        .getJobParametersWithIncrementer(job, finalParameter);
                    
                    jobLauncher.run(job, params);
                    
                } catch (Exception e) {
                    log.error("스케줄 Job 실행 실패: {}", job.getName(), e);
                }
            }, new CronTrigger(batchCron.getExpression()));
            
            scheduleMap.put(batchCron.getBatchCronSeq(), scheduledFuture);
            
            log.info("스케줄 등록 완료: {} - {}", 
                batchCron.getBatchJob().getName(), 
                batchCron.getExpression()
            );
            
        } catch (Exception e) {
            log.error("스케줄 등록 실패: {}", batchCron, e);
        }
    }
    
    // 스케줄 수정 (삭제 후 재등록)
    public void update(BatchCron batchCron) {
        delete(batchCron.getBatchCronSeq());
        insert(batchCron);
    }
    
    // 스케줄 삭제
    public void delete(Long batchCronSeq) {
        ScheduledFuture<?> scheduledFuture = scheduleMap.remove(batchCronSeq);
        if (scheduledFuture != null) {
            scheduledFuture.cancel(false);
            log.info("스케줄 삭제 완료: {}", batchCronSeq);
        }
    }
    
    // 유효성 검증
    private boolean canInsertSchedule(BatchCron batchCron) {
        LocalDate now = LocalDate.now();
        
        // 1. 사용 여부 확인
        if (!UseFlag.Y.equals(batchCron.getUseFlag())) {
            return false;
        }
        
        // 2. Cron 표현식 유효성
        if (!CronExpression.isValidExpression(batchCron.getExpression())) {
            return false;
        }
        
        // 3. 유효 기간 확인
        if (now.isBefore(batchCron.getValidStartDate()) || 
            now.isAfter(batchCron.getValidEndDate())) {
            return false;
        }
        
        return true;
    }
}

효과:

  • 서버 재시작 없이 스케줄 변경
  • 유효 기간 관리
  • Cron 표현식 검증

5. 실전 예제: 휴면 회원 처리 배치

5.1. Job 구성

@Configuration
@RequiredArgsConstructor
public class DormantUserJobConfig {
    
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final SqlSessionFactory sqlSessionFactory;
    private final BatchJobExecutionListener batchJobExecutionListener;
    
    private static final String JOB_NAME = "dormantUserJob";
    
    @Value("${app.batch.dormantuser.chunksize:100}")
    private int chunkSize;
    
    @Bean
    public Job dormantUserJob() {
        return jobBuilderFactory.get(JOB_NAME)
            .incrementer(new CustomRunIdIncrementer())
            .listener(batchJobExecutionListener)
            .start(dormantUserStep())
            .build();
    }
    
    @Bean
    public Step dormantUserStep() {
        return stepBuilderFactory.get(JOB_NAME + "_step")
            .<User, UserWriteDto>chunk(chunkSize)
            .reader(oneYearOverLoginUserReader())
            .processor(dormantUserProcessor())
            .writer(updateDormantUserWriter())
            .build();
    }
    
    @Bean
    public MyBatisPagingItemReader<User> oneYearOverLoginUserReader() {
        return new MyBatisPagingItemReaderBuilder<User>()
            .pageSize(chunkSize)
            .sqlSessionFactory(sqlSessionFactory)
            .queryId("com.example.repository.UserMapper.findDormantTargets")
            .build();
    }
    
    @Bean
    public ItemProcessor<User, UserWriteDto> dormantUserProcessor() {
        return user -> {
            user.setStatus(UserStatus.DORMANT);
            user.setPassword("TEMP_PASSWORD");
            return UserWriteDto.from(user);
        };
    }
    
    @Bean
    public MyBatisBatchItemWriter<UserWriteDto> updateDormantUserWriter() {
        return new MyBatisBatchItemWriterBuilder<UserWriteDto>()
            .sqlSessionFactory(sqlSessionFactory)
            .statementId("com.example.repository.UserMapper.updateDormant")
            .build();
    }
}

5.2. 처리 흐름

1. Reader (100명씩 읽기)
   ↓
   SELECT * FROM user
   WHERE last_login_date < DATE_SUB(NOW(), INTERVAL 1 YEAR)
   LIMIT 100 OFFSET ?
   
2. Processor (각 User 처리)
   ↓
   user.setStatus(DORMANT)
   user.setPassword("TEMP")
   
3. Writer (일괄 업데이트)
   ↓
   UPDATE user SET 
     status = 'DORMANT', 
     password = 'TEMP'
   WHERE user_seq IN (?, ?, ...)
   
4. Commit
   
5. 다음 100명으로 반복

장점:

  • 한 번에 100명씩 처리 → 메모리 효율적
  • Chunk 단위 트랜잭션 → 일부 실패해도 나머지는 처리
  • 중간에 실패 시 마지막 Chunk부터 재시작 가능

6. REST API 구현

6.1. 즉시 실행 API

@RestController
@RequestMapping("/common")
@RequiredArgsConstructor
public class BatchCommonController {
    
    private final BatchCommonService batchCommonService;
    
    // 비동기 실행
    @PostMapping("/job/{batchJobId}/execute")
    public ApiResponse<Void> execute(
        @PathVariable String batchJobId,
        @RequestBody BatchJobExecuteRequest request
    ) {
        batchCommonService.executeAsync(batchJobId, request);
        return ApiResponse.ok();
    }
    
    // 동기 실행
    @PostMapping("/job/{batchJobId}/execute/sync")
    public ApiResponse<Void> executeSync(
        @PathVariable String batchJobId,
        @RequestBody BatchJobExecuteRequest request
    ) {
        batchCommonService.executeSync(batchJobId, request);
        return ApiResponse.ok();
    }
    
    // 파라미터 정보 조회
    @GetMapping("/job/{batchJobId}/parameter")
    public ApiResponse<BatchJobParameterInfoResponse> getParameterList(
        @PathVariable String batchJobId
    ) {
        return ApiResponse.ok(
            batchCommonService.getParameterList(batchJobId)
        );
    }
}

6.2. 서비스 구현

@Service
@RequiredArgsConstructor
public class BatchCommonService {
    
    private final JobExecutor jobExecutor;
    private final JobParameterFactory jobParameterFactory;
    private final ApplicationContext applicationContext;
    
    // 비동기 실행
    @Async
    public void executeAsync(String batchJobId, BatchJobExecuteRequest request) {
        execute(batchJobId, request);
    }
    
    // 동기 실행
    public void executeSync(String batchJobId, BatchJobExecuteRequest request) {
        execute(batchJobId, request);
    }
    
    private void execute(String batchJobId, BatchJobExecuteRequest request) {
        try {
            // 1. Job 조회
            BatchJobEnum jobEnum = BatchJobEnum.findByCode(batchJobId);
            Job job = applicationContext.getBean(jobEnum.getId(), Job.class);
            
            // 2. 파라미터 변환
            BasicJobParameter parameter = null;
            if (jobEnum.getParameterClass() != null) {
                String json = objectMapper.writeValueAsString(request.getParameterMap());
                parameter = objectMapper.readValue(json, jobEnum.getParameterClass());
            }
            
            // 3. Job 실행
            JobParameters params = jobParameterFactory
                .getJobParametersWithIncrementer(job, parameter);
            
            jobExecutor.execute(job, params);
            
        } catch (Exception e) {
            log.error("Job 실행 실패: {}", batchJobId, e);
            throw new BatchExecutionException("Job 실행 실패", e);
        }
    }
    
    // 파라미터 정보 조회 (Reflection 활용)
    public BatchJobParameterInfoResponse getParameterList(String batchJobId) {
        BatchJobEnum jobEnum = BatchJobEnum.findByCode(batchJobId);
        Class<? extends BasicJobParameter> parameterClass = jobEnum.getParameterClass();
        
        if (parameterClass == null) {
            return BatchJobParameterInfoResponse.empty();
        }
        
        List<ParameterInfo> parameterList = new ArrayList<>();
        
        for (Field field : parameterClass.getDeclaredFields()) {
            BatchParameter annotation = field.getAnnotation(BatchParameter.class);
            if (annotation == null || !annotation.include()) {
                continue;
            }
            
            parameterList.add(ParameterInfo.builder()
                .name(field.getName())
                .type(field.getType().getSimpleName())
                .description(annotation.description())
                .example(annotation.example())
                .build()
            );
        }
        
        return new BatchJobParameterInfoResponse(parameterList);
    }
}

7. 운영 및 모니터링

7.1. 실행 이력 조회

Spring Batch는 자동으로 메타데이터를 관리한다.

주요 메타 테이블:

-- Job 인스턴스 (고유한 Job + Parameter 조합)
SELECT * FROM BATCH_JOB_INSTANCE;

-- Job 실행 이력
SELECT * FROM BATCH_JOB_EXECUTION
WHERE JOB_INSTANCE_ID = ?
ORDER BY CREATE_TIME DESC;

-- Step 실행 이력
SELECT * FROM BATCH_STEP_EXECUTION
WHERE JOB_EXECUTION_ID = ?;

-- 실행 파라미터
SELECT * FROM BATCH_JOB_EXECUTION_PARAMS
WHERE JOB_EXECUTION_ID = ?;

커스텀 서버 컨텍스트 조회:

-- 어느 서버에서 실행되었는지 추적
SELECT 
    e.JOB_EXECUTION_ID,
    i.JOB_NAME,
    e.START_TIME,
    e.STATUS,
    c.server_name,
    c.ip_address
FROM BATCH_JOB_EXECUTION e
JOIN BATCH_JOB_INSTANCE i ON e.JOB_INSTANCE_ID = i.JOB_INSTANCE_ID
JOIN batch_job_execution_server_context c ON e.JOB_EXECUTION_ID = c.job_execution_id
ORDER BY e.CREATE_TIME DESC;

7.2. 실패 알림

@Component
@RequiredArgsConstructor
public class JobExecutionFailMessageSender {
    
    private final SlackMessengerClient slackClient;
    
    public void sendMessage(JobExecution jobExecution) {
        if (jobExecution.getStatus() != BatchStatus.FAILED) {
            return;
        }
        
        String message = buildFailMessage(jobExecution);
        slackClient.send(message);
    }
    
    private String buildFailMessage(JobExecution jobExecution) {
        StringBuilder sb = new StringBuilder();
        sb.append("배치 실행 실패 알림\n\n");
        sb.append("Job: ").append(jobExecution.getJobInstance().getJobName()).append("\n");
        sb.append("실행 ID: ").append(jobExecution.getJobId()).append("\n");
        sb.append("시작: ").append(jobExecution.getStartTime()).append("\n");
        sb.append("종료: ").append(jobExecution.getEndTime()).append("\n");
        sb.append("\n에러 메시지:\n");
        
        // 에러 메시지 추출
        List<Throwable> exceptions = jobExecution.getAllFailureExceptions();
        exceptions.forEach(e -> sb.append("- ").append(e.getMessage()).append("\n"));
        
        return sb.toString();
    }
}

7.3. 애플리케이션 시작 시 초기화

@Component
@RequiredArgsConstructor
public class AppStartedListener implements ApplicationListener<ApplicationStartedEvent> {
    
    private final BatchMetaDataService metaDataService;
    private final BatchCronMapper batchCronMapper;
    private final BatchCronScheduleManager scheduleManager;
    
    @Override
    public void onApplicationEvent(ApplicationStartedEvent event) {
        // 1. 비정상 종료된 Job 상태 복구
        metaDataService.modifyFailedStatusByJobAndStep();
        log.info("비정상 종료 Job 상태 복구 완료");
        
        // 2. 스케줄 초기화
        List<BatchCron> batchCronList = batchCronMapper.findAll();
        scheduleManager.initialize(batchCronList);
        log.info("배치 스케줄 초기화 완료");
    }
}

8. 개선 효과

8.1. 변경 전 vs 변경 후

항목 변경 전 (@Scheduled) 변경 후 (Spring Batch)
대용량 처리 OOM 위험 Chunk 단위 처리
재시작 처음부터 다시 실패 지점부터 재시작
트랜잭션 수동 관리 자동 관리 (Chunk 단위)
실행 이력 직접 구현 필요 자동 저장
파라미터 관리 하드코딩 동적 관리
스케줄 변경 코드 수정 필요 API로 즉시 변경
모니터링 직접 구현 메타 테이블 제공

8.2. 구체적인 개선 사항

1) 메모리 효율성

변경 전:
- 전체 데이터 메모리 로드
- 10만 건 처리 시 OOM 발생

변경 후:
- Chunk 단위 처리 (100건씩)
- 100만 건 처리 가능

2) 안정성

변경 전:
- 5만 번째에서 실패 → 처음부터 다시

변경 후:
- 5만 번째에서 실패 → 5만 번째부터 재시작

3) 운영 편의성

변경 전:
- 스케줄 변경 → 코드 수정 → 배포

변경 후:
- 관리 화면에서 즉시 변경
- 배포 불필요

9. 한계와 트레이드오프

9.1. 현재 한계

단일 서버 구성:

  • 현재는 단일 서버 기준 설계
  • BatchCronScheduleManager가 메모리에서 스케줄 관리
  • 이중화 환경에서는 스케줄 중복 실행 가능

개선 방안:

// Redis를 활용한 분산 스케줄 관리
@Component
public class DistributedBatchCronScheduleManager {
    
    private final RedissonClient redissonClient;
    
    public void insert(BatchCron batchCron) {
        // 분산 락으로 중복 실행 방지
        RLock lock = redissonClient.getLock("batch:schedule:" + batchCron.getId());
        
        if (lock.tryLock()) {
            try {
                // 스케줄 등록
            } finally {
                lock.unlock();
            }
        }
    }
}

9.2. 트레이드오프

항목 선택한 방식 포기한 것 이유
스케줄 관리 메모리 기반 이중화 지원 단일 서버 구성, 구현 단순
파라미터 관리 Reflection 컴파일 타임 검증 동적 파라미터 조회 필요
실행 방식 JobLauncher 직접 실행 Spring Batch 표준 준수

10. 마무리

10.1. 핵심 요약

Spring Batch 기반 배치 애플리케이션을 구축하면서:

기술적 성과:

  • 대용량 데이터 처리 안정화
  • 재시작 메커니즘으로 안정성 확보
  • 동적 스케줄 관리로 운영 편의성 향상

설계 원칙:

  • Job과 Parameter를 중앙에서 관리
  • Reflection 기반 동적 파라미터 처리
  • 데이터베이스 기반 스케줄 관리

운영 개선:

  • 실행 이력 자동 추적
  • 실패 시 즉시 알림
  • 서버 재시작 없는 스케줄 변경

10.2. 향후 개선 방향

이중화 지원:

  • Redis 기반 분산 스케줄 관리
  • 분산 락을 통한 중복 실행 방지

모니터링 강화:

  • 실행 현황 대시보드
  • 성능 메트릭 수집
  • 알림 채널 다양화 (Slack, Email)

성능 최적화:

  • 파티셔닝을 통한 병렬 처리
  • 멀티 스레드 Step 적용

10.3. 배운 점

Spring Batch의 강점:

  • 대용량 처리를 위한 최적화된 프레임워크
  • 메타데이터 자동 관리
  • 풍부한 확장 포인트

설계의 중요성:

  • 초기 설계가 확장성을 결정
  • 표준 준수가 유지보수성을 높임
  • 운영을 고려한 설계가 필수

“처음부터 완벽할 필요는 없다. 점진적으로 개선하면 된다.”


참고 자료

댓글남기기