m1ndy5's coding blog

Ditto Project Spring Batch&Scheduler 1편 - 스프링 배치 5.0, 스케줄러 적용하기 본문

Toy Projects/Ditto - Discuss Today's Topic

Ditto Project Spring Batch&Scheduler 1편 - 스프링 배치 5.0, 스케줄러 적용하기

정민됴 2024. 2. 16. 08:02

Ditto가 종목토론방 프로젝트인만큼 각 종목에 해당하는 전 주가데이터를 평일 아침 5시마다 가져오는 작업을 실행해야했다.

이 작업을 실행하기 위해 Spring Batch와 Scheduler를 사용해 구현했다.

Spring Batch?

Spring Batch는 대용량 데이터를 처리하기 위한 프레임워크로, 스프링 프레임워크 기반에서 동작한다.

일반적으로 대량의 데이터를 처리하거나, 주기적이고 반복적인 작업을 실행하는데 사용되며, 이러한 작업을 효율적이고 안정적으로 처리할 수 있는 로깅 및 추적, 트랜잭션 관리, 작업 재시작, 건너뛰기 등등의 기능들을 제공한다.

Scheduler?

배치와 스케줄러는 개념이 다른데, 배치는 논리적 또는 물리적으로 관련된 일련의 데이터를 그룹화하여 일괄 처리하는 방법이고 스케줄러는 주어진 작업을 미리 정의된 시간에 실행할 수 있게 해주는 도구나 소프트웨어를 의미한다.

즉, 배치는 대량의 데이터를 일괄적으로 처리하는 용도일 뿐, 특정 주기마다 자동으로 돌아가게 하려면 스케줄러를 사용하여 배치작업을 돌려주어야한다.

Spring Batch 용어 정리

- Job

배치처리 과정을 하나의 단위로 만들어 놓은 객체

하나 이상의 Step을 포함하며, 스프링 배치 계층에서 가장 상위에 위치함

각 Job은 고유한 이름을 가지며, 이 이름은 실행에 필요한 파라미터와 함께 JobInstance를 구별하는데 사용됨

 

- JobInstance

특정 Job을 실제 실행하는 인스턴스를 의미

ex) "평일 아침 5시에 데이터를 처리"하는 Job이 있으면 2월 15일 목요일, 2월 16일 금요일에 실행되는 새로운 JobInstance가 생성

한번 생성된 JobInstance는 해당 날짜의 데이터를 처리하는 데 사용되며, 실패했을 경우 같은 JobInstance를 다시 실행하여 작업 완료 가능

 

- JobParameters

JobInstance를 생성하고 구별하는 데 사용되는 파라미터

Job이 실행될 때 필요한 파라미터

스프링 배치는 String, Double, Long, Date 이렇게 4가지 타입의 파라미터를 지원

 

- JobExecution

JobInstance의 한 번의 시행 시도

실행 상태, 시작시간, 종료시간, 생성시간 등 JobInstance의 실행에 대한 세부정보를 담고 있음

 

- Step

Job의 하위 단계로서 실제 배치 처리 작업이 이루어지는 단위

한 개 이상의 Step으로 Job이 구성되며, 각 Step은 순차적으로 처리됨

각 Step의 내부에서는 ItemReader, ItemProcessor, ItemWriter를 사용하는 chunk 방식 또는 tasklet 하나를 가질 수 있음

 

- StepExecution

Step의 한 번의 실행을 의미, Step의 실행 상태, 실행 시간 등의 정보를 포함

각 Step의 실행 시도마다 새로운 StepExecution이 생성

 

- ExecutionContext

Step 간 또는 Job 실행 도중 데이터를 공유하는 데 사용되는 저장소

JobExecutionContext와 StepExecutionContext 두 종류가 있음

Job이나 Step이 실패했을 경우, ExecutionContext를 통해 마지막 실행 상태를 재구성하여 재시도 또는 복구 작업 수행

 

- JobRepository

배치 작업에 관련된 모든 정보를 저장하고 관리하는 매커니즘

JobExecution, StepExecution, JobParameters 등을 저장하고 관리

 

- JobLauncher

Job과 JobParameters를 받아 Job을 실행하는 역할

전반적인 Job 생명주기를 관리하고 JobRepository를 통해 실행 상태를 유지

 

- ItemReader

배치 작업에서 처리할 아이템을 읽어오는 역할

 

- ItemProcessor

ItemReader로부터 읽어온 아이템을 처리하는 역할

선택적임

 

- ItemWriter

ItemProcessor에서 처리된 데이터를 최종적으로 기록하는 역할

 

- Tasklet

간단한 단일 작업을 진행할 때 사용

ex) 리소스의 정리 or 시스템 상태 체크 등

execute 메서드는 step의 모든 처리가 끝날 때까지 계속 호출

 

- JobOperator

외부 인터페이스로, Job의 실행과 중지, 재시작 등의 배치 작업 흐름제어를 담당

이 인터페이스를 통해 JobLauncher와 JobRepository에 대한 직접적인 접근 없이도 배치 작업 수행 및 상태 조회 가능

 

- JobExplorer

Job의 실행 이력을 조회하는 데 사용됨

JobRepository에서 제공하는 정보와 유사하지만, JobRepository는 주로 Job의 실행 도중인 상태에 대해 업데이트 및 관리하는 반면, JobExplorer는 주로 읽기 전용 접근에 초점을 맞추고 있음

 

프로젝트 적용

이제 본격적으로 내 프로젝트에 적용을 해보겠다.

일단 나는 외부api로부터 각 종목의 시작가, 고가, 저가, 종가, 거래량을 받아와 저장하는 PricePerDay라는 엔티티를 가지고 있다.

package org.example.domain;

import jakarta.persistence.*;
import lombok.AccessLevel;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;

import java.time.LocalDate;

@Getter
@Entity
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class PricePerDay {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @ManyToOne
    @JoinColumn(name = "company_id")
    private Company company;

    @Column(name = "date")
    private LocalDate date;

    @Column(name = "start_price")
    private Integer startPrice;

    @Column(name = "high_price")
    private Integer highPrice;

    @Column(name = "low_price")
    private Integer lowPrice;

    @Column(name = "last_price")
    private Integer lastPrice;

    @Column(name = "trading_volume")
    private Long tradingVolume;

    @Builder
    public PricePerDay(Company company, LocalDate date, Integer startPrice, Integer highPrice, Integer lowPrice, Integer lastPrice, Long tradingVolume){
        this.company = company;
        this.date = date;
        this.startPrice = startPrice;
        this.highPrice = highPrice;
        this.lowPrice = lowPrice;
        this.lastPrice = lastPrice;
        this.tradingVolume = tradingVolume;
    }

}

 

이 각 엔티티를 받아올 때는 종목코드를 바꿔가면서 api 요청을 보내고 정보들을 받아와 파싱한다.

따라서 ItemReader에서 종목코드를 바꿔가면서 api 요청을 보내고 해당 정보들을 PricePerDay객체로 바꿔서 저장했다.

이 때 사실 객체로 바꾸는 과정은 ItemProcessor에서 진행하는 것이 더 맞긴하지만 크게 상관은 없어서 reader에서 진행했다.

package org.example.config;

import lombok.RequiredArgsConstructor;
import org.example.domain.Company;
import org.example.domain.PricePerDay;
import org.example.repository.CompanyRepository;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.http.*;
import org.springframework.web.client.RestTemplate;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import java.io.StringReader;
import java.net.URI;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;

@RequiredArgsConstructor
public class PriceItemReader implements ItemReader<List<PricePerDay>> {
    private final String stockApi = "https://fchart.stock.naver.com/sise.nhn?symbol=";
    private final CompanyRepository companyRepository;
    @Override
    public List<PricePerDay> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {

        List<Company> companyList = companyRepository.findAll();
        List<PricePerDay> pricePerDayList = new ArrayList<>();

        for (Company company:companyList) {
            String itemCode = company.getItemCode();
            String api = stockApi + itemCode + "&timeframe=day&count=1&requestType=0";
            String result = fnGetAttribute(get(api));
            DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyyMMdd");

            String[] r = result.split("\\|");
            PricePerDay ppd = PricePerDay.builder().company(company)
                        .date(LocalDate.parse(r[0], format))
                        .startPrice(Integer.parseInt(r[1]))
                        .highPrice(Integer.parseInt(r[2]))
                        .lowPrice(Integer.parseInt(r[3]))
                        .lastPrice(Integer.parseInt(r[4]))
                        .tradingVolume(Long.parseLong(r[5])).build();

            pricePerDayList.add(ppd);
        }
        return pricePerDayList;
    }

    public String get(String url){
        RestTemplate restTemplate = new RestTemplate();
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);

        HttpEntity<HttpHeaders> entity = new HttpEntity<>(headers);

        ResponseEntity<String> response = restTemplate.exchange(URI.create(url), HttpMethod.GET, entity, String.class);
        return response.getBody();
    }

    public String fnGetAttribute(String xml){
        DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
        DocumentBuilder builder;
        Document doc;
        String rAttribute = null;
        try {
            InputSource is = new InputSource(new StringReader(xml));
            builder = factory.newDocumentBuilder();
            doc = builder.parse(is);

            doc.getDocumentElement().normalize();

            NodeList children = doc.getElementsByTagName("item");

            for(int i = 0; i < children.getLength(); i++){
                Node node = children.item(i);
                if(node.getNodeType() == Node.ELEMENT_NODE){
                    Element ele = (Element) node;
                    rAttribute = ele.getAttribute("data");
                }
            }
        } catch (Exception e){
            throw new RuntimeException("xml 파싱 불가");
        }
        return rAttribute;
    }
}

 

다음으로는 이제 이 3000몇개정도 되는 데이터들을 실제 데이터베이스에 저장해줄 ItemWriter를 작성했다.

package org.example.config;

import lombok.RequiredArgsConstructor;
import org.example.domain.PricePerDay;
import org.example.repository.PricePerDayRepository;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemWriter;

import java.util.List;

@RequiredArgsConstructor
public class PriceItemWriter implements ItemWriter<List<PricePerDay>> {

    private final PricePerDayRepository pricePerDayRepository;

    @Override
    public void write(Chunk<? extends List<PricePerDay>> chunk) throws Exception {
        pricePerDayRepository.saveAll(chunk.getItems().get(0));
    }
}

 

3000몇개,, 많다면 많고 적다면 적은 양이기 때문에 하나씩 불러서 하나씩 저장하는 로직이 썩 마음에 들진 않는다.

추후에 더 최적화할 수 있는 방법을 찾아봐야겠다! (병렬 처리, batchUpdate 등등)

이제 이와 같은 로직을 포함하는 Step을 만들고 이 Step들을 처리하는 Job을 만들어야한다!

package org.example.config;

import lombok.RequiredArgsConstructor;
import org.example.domain.PricePerDay;
import org.example.repository.CompanyRepository;
import org.example.repository.PricePerDayRepository;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

import java.util.List;

@Configuration
@RequiredArgsConstructor
public class BatchConfig {

    private final CompanyRepository companyRepository;
    private final PricePerDayRepository pricePerDayRepository;

    @Bean
    public Job todayScheduleJob(JobRepository jobRepository, Step todayScheduleStep){
        return new JobBuilder("today-schedule-job", jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(todayScheduleStep)
                .build();
    }

    @Bean
    public Step todayScheduleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager){
        return new StepBuilder("today-schedule-step", jobRepository)
                .<List<PricePerDay>, List<PricePerDay>>chunk(1, transactionManager)
                .reader(reader())
                .writer(writer())
                .build();
    }

    @Bean
    public PriceItemReader reader(){
        return new PriceItemReader(companyRepository);
    }

    @Bean
    public PriceItemWriter writer(){
        return new PriceItemWriter(pricePerDayRepository);
    }
}

 

위에서 설명한대로 이제 이 Batch작업을 주기적으로 실행해줄 스케줄러를 만들어야한다.

Batch 작업만 만든다고 해서 주기적으로 실행되는 것은 아닌 것에 주의하자!!

package org.example.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;

import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;


@Component
public class BatchScheduler {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job job;

    @Scheduled(cron = "0 0 5 ? * MON-FRI")
    public void runJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
        System.out.println("실행되나?");
        JobParameters parameters = new JobParameters();
        jobLauncher.run(job, parameters);
    }
}

 

실행할 Job을 의존성 주입을 통해 받아온 뒤 Job 실행에 필요한 JobParameter와 함께 실행한다.

package org.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@EnableScheduling
@SpringBootApplication
public class BatchApplication {

	public static void main(String[] args) {
		SpringApplication.run(BatchApplication.class, args);
	}

}

 

이렇게 BatchApplication에 @EnableScheduling을 붙여주고 실행을 시키면

 

각 종목의 주가데이터들이 잘 들어오는 것을 확인할 수 있었다~~!! 행복ㅎㅎㅎㅎ

 

다음으로는 Springbatch와 Jenkins를 사용하여 파이프라인을 구축해보는 실습 정리를 해보겠다!!