일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |
- 취업리부트코스
- infcon 2024
- 빈 조회 2개 이상
- KPT회고
- 인프콘 2024
- 1주일회고
- Python
- jwt
- 커스텀 헤더
- 파이썬
- 디자인 패턴
- 개발자 취업
- 디자인패턴
- 코딩테스트 준비
- @FeignClient
- 항해99
- Spring multimodule
- 빈 충돌
- 프로그래머스
- 개발자부트캠프추천
- JavaScript
- 99클럽
- DesignPattern
- spring batch 5.0
- 프로그래머스 이중우선순위큐
- 단기개발자코스
- 전략패턴 #StrategyPattern #디자인패턴
- jwttoken
- 구글 OAuth login
- TiL
- Today
- Total
m1ndy5's coding blog
Ditto Project Spring Batch&Scheduler 1편 - 스프링 배치 5.0, 스케줄러 적용하기 본문
Ditto Project Spring Batch&Scheduler 1편 - 스프링 배치 5.0, 스케줄러 적용하기
정민됴 2024. 2. 16. 08:02Ditto가 종목토론방 프로젝트인만큼 각 종목에 해당하는 전 주가데이터를 평일 아침 5시마다 가져오는 작업을 실행해야했다.
이 작업을 실행하기 위해 Spring Batch와 Scheduler를 사용해 구현했다.
Spring Batch?
Spring Batch는 대용량 데이터를 처리하기 위한 프레임워크로, 스프링 프레임워크 기반에서 동작한다.
일반적으로 대량의 데이터를 처리하거나, 주기적이고 반복적인 작업을 실행하는데 사용되며, 이러한 작업을 효율적이고 안정적으로 처리할 수 있는 로깅 및 추적, 트랜잭션 관리, 작업 재시작, 건너뛰기 등등의 기능들을 제공한다.
Scheduler?
배치와 스케줄러는 개념이 다른데, 배치는 논리적 또는 물리적으로 관련된 일련의 데이터를 그룹화하여 일괄 처리하는 방법이고 스케줄러는 주어진 작업을 미리 정의된 시간에 실행할 수 있게 해주는 도구나 소프트웨어를 의미한다.
즉, 배치는 대량의 데이터를 일괄적으로 처리하는 용도일 뿐, 특정 주기마다 자동으로 돌아가게 하려면 스케줄러를 사용하여 배치작업을 돌려주어야한다.
Spring Batch 용어 정리
- Job
배치처리 과정을 하나의 단위로 만들어 놓은 객체
하나 이상의 Step을 포함하며, 스프링 배치 계층에서 가장 상위에 위치함
각 Job은 고유한 이름을 가지며, 이 이름은 실행에 필요한 파라미터와 함께 JobInstance를 구별하는데 사용됨
- JobInstance
특정 Job을 실제 실행하는 인스턴스를 의미
ex) "평일 아침 5시에 데이터를 처리"하는 Job이 있으면 2월 15일 목요일, 2월 16일 금요일에 실행되는 새로운 JobInstance가 생성
한번 생성된 JobInstance는 해당 날짜의 데이터를 처리하는 데 사용되며, 실패했을 경우 같은 JobInstance를 다시 실행하여 작업 완료 가능
- JobParameters
JobInstance를 생성하고 구별하는 데 사용되는 파라미터
Job이 실행될 때 필요한 파라미터
스프링 배치는 String, Double, Long, Date 이렇게 4가지 타입의 파라미터를 지원
- JobExecution
JobInstance의 한 번의 시행 시도
실행 상태, 시작시간, 종료시간, 생성시간 등 JobInstance의 실행에 대한 세부정보를 담고 있음
- Step
Job의 하위 단계로서 실제 배치 처리 작업이 이루어지는 단위
한 개 이상의 Step으로 Job이 구성되며, 각 Step은 순차적으로 처리됨
각 Step의 내부에서는 ItemReader, ItemProcessor, ItemWriter를 사용하는 chunk 방식 또는 tasklet 하나를 가질 수 있음
- StepExecution
Step의 한 번의 실행을 의미, Step의 실행 상태, 실행 시간 등의 정보를 포함
각 Step의 실행 시도마다 새로운 StepExecution이 생성
- ExecutionContext
Step 간 또는 Job 실행 도중 데이터를 공유하는 데 사용되는 저장소
JobExecutionContext와 StepExecutionContext 두 종류가 있음
Job이나 Step이 실패했을 경우, ExecutionContext를 통해 마지막 실행 상태를 재구성하여 재시도 또는 복구 작업 수행
- JobRepository
배치 작업에 관련된 모든 정보를 저장하고 관리하는 매커니즘
JobExecution, StepExecution, JobParameters 등을 저장하고 관리
- JobLauncher
Job과 JobParameters를 받아 Job을 실행하는 역할
전반적인 Job 생명주기를 관리하고 JobRepository를 통해 실행 상태를 유지
- ItemReader
배치 작업에서 처리할 아이템을 읽어오는 역할
- ItemProcessor
ItemReader로부터 읽어온 아이템을 처리하는 역할
선택적임
- ItemWriter
ItemProcessor에서 처리된 데이터를 최종적으로 기록하는 역할
- Tasklet
간단한 단일 작업을 진행할 때 사용
ex) 리소스의 정리 or 시스템 상태 체크 등
execute 메서드는 step의 모든 처리가 끝날 때까지 계속 호출
- JobOperator
외부 인터페이스로, Job의 실행과 중지, 재시작 등의 배치 작업 흐름제어를 담당
이 인터페이스를 통해 JobLauncher와 JobRepository에 대한 직접적인 접근 없이도 배치 작업 수행 및 상태 조회 가능
- JobExplorer
Job의 실행 이력을 조회하는 데 사용됨
JobRepository에서 제공하는 정보와 유사하지만, JobRepository는 주로 Job의 실행 도중인 상태에 대해 업데이트 및 관리하는 반면, JobExplorer는 주로 읽기 전용 접근에 초점을 맞추고 있음
프로젝트 적용
이제 본격적으로 내 프로젝트에 적용을 해보겠다.
일단 나는 외부api로부터 각 종목의 시작가, 고가, 저가, 종가, 거래량을 받아와 저장하는 PricePerDay라는 엔티티를 가지고 있다.
package org.example.domain;
import jakarta.persistence.*;
import lombok.AccessLevel;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import java.time.LocalDate;
@Getter
@Entity
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class PricePerDay {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@ManyToOne
@JoinColumn(name = "company_id")
private Company company;
@Column(name = "date")
private LocalDate date;
@Column(name = "start_price")
private Integer startPrice;
@Column(name = "high_price")
private Integer highPrice;
@Column(name = "low_price")
private Integer lowPrice;
@Column(name = "last_price")
private Integer lastPrice;
@Column(name = "trading_volume")
private Long tradingVolume;
@Builder
public PricePerDay(Company company, LocalDate date, Integer startPrice, Integer highPrice, Integer lowPrice, Integer lastPrice, Long tradingVolume){
this.company = company;
this.date = date;
this.startPrice = startPrice;
this.highPrice = highPrice;
this.lowPrice = lowPrice;
this.lastPrice = lastPrice;
this.tradingVolume = tradingVolume;
}
}
이 각 엔티티를 받아올 때는 종목코드를 바꿔가면서 api 요청을 보내고 정보들을 받아와 파싱한다.
따라서 ItemReader에서 종목코드를 바꿔가면서 api 요청을 보내고 해당 정보들을 PricePerDay객체로 바꿔서 저장했다.
이 때 사실 객체로 바꾸는 과정은 ItemProcessor에서 진행하는 것이 더 맞긴하지만 크게 상관은 없어서 reader에서 진행했다.
package org.example.config;
import lombok.RequiredArgsConstructor;
import org.example.domain.Company;
import org.example.domain.PricePerDay;
import org.example.repository.CompanyRepository;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.http.*;
import org.springframework.web.client.RestTemplate;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import java.io.StringReader;
import java.net.URI;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
@RequiredArgsConstructor
public class PriceItemReader implements ItemReader<List<PricePerDay>> {
private final String stockApi = "https://fchart.stock.naver.com/sise.nhn?symbol=";
private final CompanyRepository companyRepository;
@Override
public List<PricePerDay> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
List<Company> companyList = companyRepository.findAll();
List<PricePerDay> pricePerDayList = new ArrayList<>();
for (Company company:companyList) {
String itemCode = company.getItemCode();
String api = stockApi + itemCode + "&timeframe=day&count=1&requestType=0";
String result = fnGetAttribute(get(api));
DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyyMMdd");
String[] r = result.split("\\|");
PricePerDay ppd = PricePerDay.builder().company(company)
.date(LocalDate.parse(r[0], format))
.startPrice(Integer.parseInt(r[1]))
.highPrice(Integer.parseInt(r[2]))
.lowPrice(Integer.parseInt(r[3]))
.lastPrice(Integer.parseInt(r[4]))
.tradingVolume(Long.parseLong(r[5])).build();
pricePerDayList.add(ppd);
}
return pricePerDayList;
}
public String get(String url){
RestTemplate restTemplate = new RestTemplate();
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<HttpHeaders> entity = new HttpEntity<>(headers);
ResponseEntity<String> response = restTemplate.exchange(URI.create(url), HttpMethod.GET, entity, String.class);
return response.getBody();
}
public String fnGetAttribute(String xml){
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder;
Document doc;
String rAttribute = null;
try {
InputSource is = new InputSource(new StringReader(xml));
builder = factory.newDocumentBuilder();
doc = builder.parse(is);
doc.getDocumentElement().normalize();
NodeList children = doc.getElementsByTagName("item");
for(int i = 0; i < children.getLength(); i++){
Node node = children.item(i);
if(node.getNodeType() == Node.ELEMENT_NODE){
Element ele = (Element) node;
rAttribute = ele.getAttribute("data");
}
}
} catch (Exception e){
throw new RuntimeException("xml 파싱 불가");
}
return rAttribute;
}
}
다음으로는 이제 이 3000몇개정도 되는 데이터들을 실제 데이터베이스에 저장해줄 ItemWriter를 작성했다.
package org.example.config;
import lombok.RequiredArgsConstructor;
import org.example.domain.PricePerDay;
import org.example.repository.PricePerDayRepository;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemWriter;
import java.util.List;
@RequiredArgsConstructor
public class PriceItemWriter implements ItemWriter<List<PricePerDay>> {
private final PricePerDayRepository pricePerDayRepository;
@Override
public void write(Chunk<? extends List<PricePerDay>> chunk) throws Exception {
pricePerDayRepository.saveAll(chunk.getItems().get(0));
}
}
3000몇개,, 많다면 많고 적다면 적은 양이기 때문에 하나씩 불러서 하나씩 저장하는 로직이 썩 마음에 들진 않는다.
추후에 더 최적화할 수 있는 방법을 찾아봐야겠다! (병렬 처리, batchUpdate 등등)
이제 이와 같은 로직을 포함하는 Step을 만들고 이 Step들을 처리하는 Job을 만들어야한다!
package org.example.config;
import lombok.RequiredArgsConstructor;
import org.example.domain.PricePerDay;
import org.example.repository.CompanyRepository;
import org.example.repository.PricePerDayRepository;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import java.util.List;
@Configuration
@RequiredArgsConstructor
public class BatchConfig {
private final CompanyRepository companyRepository;
private final PricePerDayRepository pricePerDayRepository;
@Bean
public Job todayScheduleJob(JobRepository jobRepository, Step todayScheduleStep){
return new JobBuilder("today-schedule-job", jobRepository)
.incrementer(new RunIdIncrementer())
.start(todayScheduleStep)
.build();
}
@Bean
public Step todayScheduleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager){
return new StepBuilder("today-schedule-step", jobRepository)
.<List<PricePerDay>, List<PricePerDay>>chunk(1, transactionManager)
.reader(reader())
.writer(writer())
.build();
}
@Bean
public PriceItemReader reader(){
return new PriceItemReader(companyRepository);
}
@Bean
public PriceItemWriter writer(){
return new PriceItemWriter(pricePerDayRepository);
}
}
위에서 설명한대로 이제 이 Batch작업을 주기적으로 실행해줄 스케줄러를 만들어야한다.
Batch 작업만 만든다고 해서 주기적으로 실행되는 것은 아닌 것에 주의하자!!
package org.example.config;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class BatchScheduler {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
@Scheduled(cron = "0 0 5 ? * MON-FRI")
public void runJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
System.out.println("실행되나?");
JobParameters parameters = new JobParameters();
jobLauncher.run(job, parameters);
}
}
실행할 Job을 의존성 주입을 통해 받아온 뒤 Job 실행에 필요한 JobParameter와 함께 실행한다.
package org.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@EnableScheduling
@SpringBootApplication
public class BatchApplication {
public static void main(String[] args) {
SpringApplication.run(BatchApplication.class, args);
}
}
이렇게 BatchApplication에 @EnableScheduling을 붙여주고 실행을 시키면
각 종목의 주가데이터들이 잘 들어오는 것을 확인할 수 있었다~~!! 행복ㅎㅎㅎㅎ
다음으로는 Springbatch와 Jenkins를 사용하여 파이프라인을 구축해보는 실습 정리를 해보겠다!!
'Toy Projects > Ditto - Discuss Today's Topic' 카테고리의 다른 글
Ditto 프로젝트 동시성 문제 해결하기 1편 - 낙관적 락 VS 비관적 락 (1) | 2024.02.24 |
---|---|
Ditto Project Spring Batch&Scheduling 2편 - Docker, Jenkins 사용하여 Spring Batch Job 실행하기 (1) | 2024.02.17 |
Ditto 프로젝트 멀티 모듈 도입 3편 - Gateway 도입하기 (2) | 2024.02.06 |
Ditto 프로젝트 멀티 모듈 도입 2편 - Entity 연관관계 끊기 (0) | 2024.02.06 |
Ditto 프로젝트 멀티 모듈 도입 1편 - 모듈 나누기에 대한 고민 (1) | 2024.02.02 |