Tech

Machine Learning pipeline for Financial AI

Machine Learning pipeline for Financial AI
Joon ik Lee
Nov 4, 2022

에이셀의 머신러닝과 데이터 엔지니어링 Workflow를 안정적으로 관리 하기 위한 management tool로 Airflow를 도입, 전체 파이프라인 관리 및 모니터링 디버깅을 고도화 하기 위한 과정을 간단하게 설명합니다.

Background of Financial AI

에이셀의 자본시장 투자에 필요한 AI 구성에서 ML/Data workflow는 기본적으로 AWS Cloud 상에 container 기반으로 수행이 됩니다. 대용량 데이터 처리가 기본적으로 필요하여 ECS(AWS Elastic Container Service)를 사용하며, ML 수행은 SageMaker에서 제공하는 container 기반의 job을 사용하며 추론을 위한 서비스도 container 기반의 Inference Endpoint를 사용합니다.

기본적으로 개발 초기 단계부터 container 기반으로 프로세스들이 개발 되어 workflow 단계들이 일부 합쳐져 있는 문제는 있었으나, 각각의 container 수행들이 queue 형태의 table로 연결되어 수행 시점에 대한 dependency는 없는 상황이었습니다. CI/CD에 대한 부분은 Github Actions를 통해 소스 코드 배포와 동시에 AWS Container Registry 에 image화 하여 등록하고 수행 시 변경 된 image를 기반으로 container를 생성하게 됩니다.

하지만 queue 형태의 table을 사용하여 파이프라인을 구성 한다는 것은 다양하고 복잡한 형태로 구성이 어렵습니다. 그리고 지속적으로 늘어날 파이프라인에 대한 관리 및 모니터링도 필요 하기때문에 손쉬운 구성과 관리를 위한 tool의 적용이 필요하였습니다.

이미 많은 종류의 ETL 혹은 orchestration tool이 나와 있으며 용도에 따라 선택지가 다양합니다. 대표적인 서비스들에 대해 간단히 정리해보면,

  • Kubeflow

Argo 기반의 Kubernetes + ML 형태의 서비스입니다. ML workflow에 적합한 구성으로 Jupyter Notebook을 통한 workflow 관리가 가능하며, 다양한 ML과 관련된 Operator(MXNet Operator, PyTorch Operator, XGBoost Operator 등) 와 Prometheus를 통한 로깅 및 모니터링 기능도 함께 제공하고 있습니다. AWS 에서도 EKS(Elastic Kubernetes Serivce)상에 Kubeflow가 정식 배포 되었습니다.

AWS features for kubeflow

  • Apache Airflow

Airbnb 에서 만든 task workflow management tool 입니다. Python 코드 기반으로 파이프라인을 구성하며 다양한 executor를 통해 standalone으로 구성하거나 Kubernetes 상에 pod 형태로 cluster 구성도 가능한 tool 입니다. ML에 특화되거나 ETL에 특화되지 않고 Python으로 연결 가능한 모든 서비스에 대한 Orchestration이 가능한 tool 입니다.

  • Dagster

Data driven application을 Python code 기반으로 쉽게 개발 및 배포 할 수 있도록 만든다는 모토를 가진 tool 입니다. 데이터를 기반으로 streaming 처리나 모니터링이 가능하며 Airflow2.0에 추가된 TaskFlow API와 유사한 개발 형태를 가지고 있습니다. Streaming 처리나 입력 데이터를 임의로 넣어서 결과를 받을 수 있는 등 데이터 처리에 강점을 보이고 있는 서비스 입니다.

그 외에도 public cloud 별로 특화된 ETL management tool(AWS step function, AZURE data factory), MLOps 관리 서비스(AWS SageMaker, AZURE Machine Learning) 등도 존재합니다.

에이셀은 일단 AWS를 사용하고 있으며 ML이나 데이터 특정 영역이 아닌 전체 workflow를 관리 하면서 AWS resource에 대한 모니터링이나 EC2 중지/삭제와 같은 task들도 통합해서 관리할 서비스가 필요하여 Airflow를 선택 했습니다. 또한, ML과 관련된 부분은 SageMaker의 기능을 활용하고, 파이프라인 구성 역시 Airflow에서 관리 하는 것으로 결정했습니다.

Machine Learning Architecture for Aicel Financial AI

사용하는 플랫폼은 ECS, SageMaker, RDS, DocumentDB, Opensearch (Elasticsearch), Github Actions, Airflow 입니다. ML/Data workflow를 구성하기 위한 주요 서비스인 ECS, SageMaker, Airflow에 대해서 간단히 확인하고 넘어 가겠습니다.

1. Elastic Container Service (ECS)

Amazon Elastic Container Service(Amazon ECS)는 확장성이 뛰어나고 빠른 Container 관리 서비스입니다. 이를 사용하여 cluster에서 Container를 실행, 중지 및 관리할 수 있습니다. Amazon ECS에서 Container는 서비스 내 개별 task나 여러 task를 실행하는 데 사용하는 task definition에 정의됩니다. task definition은 cluster에 지정된 수의 task를 동시에 실행하고 유지하는 데 사용하는 구성입니다. AWS Fargate를 사용하면 기본 인프라를 관리할 필요 없이 Container를 배포하고 관리할 수 있습니다.

  • AWS 에서 가장 오래 되고 간단하게 container를 배포, 확장 할 수 있는 서비스
  • Kubernetes 기반의 EKS에 비해 간단하게 container 실행이 가능하여 데이터 처리와 관련된 작업을 빠르게 구현 가능
  • Fargate를 통해 serverless 형태로 인프라 관리에 대한 부분도 제거 가능
  • 기본적으로 task definition을 ECR에 등록된 container 와 특정 command, container size 등을 정하고 event bridge 스케쥴에 따라 수행하는 형태
  • Elastic Container Registry(ECR) 을 통해 public docker image가 아닌 AWS Private VPC 안에서 사용 가능한 Repository 제공
  • Container image는 Github Actions를 사용하여 소스 코드 반영과 동시에 ECR에 image를 업데이트 해주는 방식으로 사용

Aicel Financial AI airflow architecture

2. SageMaker

Amazon SageMaker는 데이터 과학자 및 개발자가 모든 규모의 기계 학습 모델을 간편하게 빌드, 학습 및 배포할 수 있도록 하는 완전 관리형 서비스입니다.

  • 학습 및 배포 그리고 모델 관리까지 기본적인 MLOps의 흐름과 관련된 안정적인 서비스를 제공
  • SageMaker studio, notebook 등 개발을 위한 환경 제공
  • 실제 작업을 수행 하기 위한 container 기반의 job 제공(preprocessing job, training job, hyperparameter tuning job, batch transform job)
  • 여러 job들은 container 기반으로 AWS에 customize된 estimator를 제공 하며 결과물들에 대한 template 에 맞추어 관련 job의 데이터들을 저장 및 관리 할 수 있음
  • 실시간 추론를 위한 REST-API Endpoint 서비스 제공

Aicel Financial AI Sagemaker

3. Airflow

Apache Airflow는 배치 지향 workflow를 개발, 예약 및 모니터링 하기 위한 오픈 소스 플랫폼입니다. Airflow의 확장 가능한 Python 프레임워크를 사용하면 거의 모든 기술과 연결되는 workflow를 구축할 수 있습니다. 웹 인터페이스는 workflow 상태를 관리하는 데 도움이 됩니다. Airflow는 랩톱의 단일 프로세스에서 가장 큰 workflow도 지원하는 분산 설정에 이르기까지 다양한 방식으로 배포할 수 있습니다.

  • 데이터 온보딩 및 NLP 처리를 위한 전체 workflow에 대해 Python 기반의 코드로 관리 가능
  • ETL의 기능에 집중한 tool도 있지만, Kafka와 Flink 를 통해 추후 streaming 처리로 전환할 것을 고려 하였을 때 해당 기능의 필요성은 높지 않음
  • Airflow는 기본적으로 scheduler와 Web Server 그리고 실제 task를 수행하는 worker 노드들로 구성되어 있으며, executor의 선택에 따라 단일 랩톱에서 부터 cluster 구성과 Kubernetes와 같은 container 환경에서도 구축 가능
  • Airflow는 ETL tool에서 제공하는 connector의 개념은 존재 하지 않고 Operator라 하는 customize된 라이브러리를 통해 다양한 대상에 대한 task 구현 가능
  • AWS(다수 public cloud 포함) 상에 container 배포와 실행 모니터링, SageMaker 학습 수행 모델 관리 등을 내부적으로 boto3를 이용하여 구현되어 있으며, 필요에 따라 customizing이 가능할 뿐만 아니라 관련 Operator가 없더라도 비교적 간단하게 구현 가능
  • MLOps를 위한 ML 과정의 결과물에 대한 관리가 이루어 지지 않지만 해당 내용들은 SageMaker에서 관리 되는 형태를 사용
  • 리소스에 대한 부분은 실제 worker 들을 확장하여 확보할 수도 있지만 대용량 작업을 위해 ECS Fargate, EKS 와 같은 외부 리소스를 활용하는 형태로 온전히 workflow orchestration tool로서의 역할만 수행 가능
  • 작업을 관리하는 scheduler 서버 이중화가 가능하며 meta 데이터와 작업 전달을 위한 broker로 message 서비스를 사용하여 MSA 구조에 가까운 형태로 안정적인 운영 가능

Evolution of workflow

1. Initial ML Data workflow

Aicel Financial AI workflow
Start workflow

  • Data workflow는 ECS 기반으로 구성하여 K8s 환경에 비해 간편하게 task를 수행할 container들을 대량으로 수행 가능
  • Data workflow는 ECS에 연동된 Eventbridge의 cron event를 통해서 특정 주기로 수행 설정을 하여 각 task들이 dependency를 갖지 않으며 내부적으로 queue 역할을 하는 table을 정의하여 사용
  • ML workflow는 최종 모델 서빙을 SageMaker Inference Endpoint(REST-API) 형태로 제공하기 까지의 단계를 Data Scientist가 EC2나 SageMaker notebook을 통해 필요한 데이터를 업로드 하고 학습, 배포까지 진행

ML/Data 관련 초기 개발 과정에서는 기본적으로 ECS와 SageMaker 기반으로 구성을 시작하였습니다. 지속적으로 개발이 진행 중이었으며 workflow는 추가적으로 늘어나야 하는 상황이었습니다. Github Actions를 활용하여 개별 파이프라인을 구성하였으나, 각각의 파이프라인에 대한 관리와 파이프라인 간의 dependency를 구성하기 위해 전체 workflow를 관리할 tool이 필요한 상황이었습니다.

AWS에서 나온 MWAA(Managed Workflows for Apache) 서비스가 있었으나, 최종 목표는 Kubernetes Executor를 사용하여 EKS(Elastic Kubernetes Service)로 전환하여 지속적으로 늘어나는 workflow를 업무 단위 별로 여러 Airflow를 수행하는 것이 목표이기 때문에 MWAA를 사용하지 않고 직접 EC2 상에 설치 하는 형태로 진행 하였습니다.

2. Data workflow with Airflow

Aicel Data workflow with Airflow
Data workflow with Airflow

  • Data workflow에 대해 먼저 Airflow를 적용하여 데이터 온보딩 부분을 ECS를 사용하여 대량 수행하는 부분과 공공 데이터 수집과 같은 meta성 데이터 수집 부분은 Airflow worker 상에서 Python Operator로 직접 수집 하는 형태로 진행
  • Airflow는 2.0.1 버전을 사용 하였으며 ECS Operator를 사용하여 handling 하였습니다. ECS Operator는 container 실행을 요청함과 동시에 해당 container가 정상 종료 혹은 실패하는 것에 대한 모니터링도 함께 수행하며 container에서 발생하는 stdout, strerr 도 Airflow 쪽의 로그에서 확인이 가능하여 적용 후 다수의 container 에 대한 관리 및 디버깅이 수월해 졌습니다.

Data On-boarding with ECS
Data On-boarding with ECS

데이터 온보딩과 관련하여 현재 배치 수행 형태에서 Airflow의 로깅 기능을 통해 가능한 수행 주기를 판단할 수 있었습니다.

airflow — Task Duration
airflow — Task Duration

현재 데이터 온보딩 주기는 10분 단위로 수행 하며 NLP의 경우 5분 주기로 수행하여 처리할 대상 유무를 Branch python operator를 통해 판단하여 skip 여부를 판단합니다. 또한, 데이터 온보딩 역시 야간 시간대는 30분 주기로 수집하는 것으로 구성되어 있습니다.

Branch Operator를 통한 skip
Branch Operator를 통한 skip

3. ML workflow with Airflow

ML workflow with airflow
ML workflow with airflow

  • ML workflow는 자동화를 위해서 기존 방식에 Model Artifact와 Endpoint Configuration에 대한 부분과 candidate Sets 생성을 위한 부분 추가
  • 이전 방식은 training 단계와 model 배포 단계가 하나의 프로세스로 진행이 되어 Endpoint에 대한 model 관리나 이전 모델로 복구해야 하는 경우에 대해 자동화 하기가 어려운 부분 존재
  • Training job을 통해 model을 생성하고 evaluation을 통해 해당 model에 대한 배포 여부를 판단하여 배포 시에는 model 및 배포 image 와 리소스 사이즈 등을 포함한 Endpoint Configuration을 먼저 생성한 후 기존의 Endpoint에 새로 생성된 configuration으로 update하는 방식으로, 중단 없이 진행 가능하도록 구성
  • Airflow에도 SageMaker Operator가 존재 하지만 해당 Operator로는 위의 프로세스들을 적용할 수가 없어 Python Operator로 AWS Boto3 Python SDK를 통해 진행
  • Data workflow 에서 수집하는 meta data의 경우 Ticker Mapping을 위한 후보군(candidate sets)을 구성하는데, 이 부분에서 text search를 위해 기존 RDS가 아닌 OpenSearch로 Knowledge Graph 형태로 구성
  • 온보딩 데이터에 대한 NLP 처리 중 clustering 추가. clustering을 통해 유사한 데이터에 대해서 유사도가 높은 데이터들을 하나의 cluster로 묶어 주는 작업 진행

ML workflow DAG-1 — Model Training
ML workflow DAG-1 — Model Training

일단 학습 과정에서 생성되는 Model Artifact를 통해 QA형태의 Endpoint를 배포하여 기존 데이터와 신규 데이터에 대한 추론 결과를 Data Scientist가 확인하고 배포 여부를 판단합니다.

ML workflow DAG-1 — Model Deploy and NLP processing
ML workflow DAG-1 — Model Deploy and NLP processing

배포를 하는 시점에는 신규로 구성된 Mapping 대상에 대한 NLP 처리 작업을 진행 합니다.

ML workflow에 대한 자세한 내용은 Ticker Mapping System 글을 참고 하시기 바랍니다.

4. Current ML/Data workflow

AS-IS Aicel workflow
AS-IS Aicel workflow

  • Ticker Mapping 을 위해 기존에 OpenSearch 기반의 text search 부분을 AWS Neptune으로 Knowledge Graph 구성하여 성능 향상
  • 필요에 따른 파이프라인을 안정적이며 간단하게 추가 가능
  • 대부분의 프로세스가 수행되는 리소스가 Airflow의 worker 노드에서 수행 되는 것이 아닌 ECS, SageMaker 등 외부 리소스에서 container 기반으로 수행 됨에 따라 Airflow 노드에 대한 부하나 안정성 이슈가 매우 적게 발생
  • Workflow 뿐만 아니라 관리나 모니터링을 위한 DAG도 추가 구성하여 Slack Operator를 통해 이슈 사항이나 데이터 온보딩 모니터링 진행

slack monitoring dags
slack monitoring dags

Airflow를 통해 안정적이고 간단하게 파이프라인을 구성하고 모니터링 할 수 있는 환경이 구축되었습니다.

Aicel ML/DATA workflow
Aicel ML/DATA workflow

Conclusion and Future work

  • Airflow를 통한 workflow는 관리와 디버깅에 유용
    실제 관리 측면에서 파이프라인의 수행에 대한 모니터링, 수행 시간 확인, 로깅 뿐 아니라 다수의 파이프라인에 대한 확인 뿐만 아니라 개발한 부분과 다수의 container를 사용할 경우 디버깅과 관련한 부분에서 훌륭하게 활용 되었습니다.
  • Workflow tool 들을 관리하는 management tool을 통한 확장
    Airflow는 에이셀 workflow 플랫폼의 첫 번째 선택지였을 뿐입니다. 이후 workflow들을 그룹화 하고 해당 workflow에 최적화된 tool을 적용하고 손쉽게 구동 할 수 있는 모든 tool들을 관리할 수 있는 환경으로 발전시키고자 합니다. 특정 tool에 개발환경을 맞춰 가는 것은 향후 다양한 상황에 대처하기 어려울 수 있다고 판단합니다.
  • Kubernetes 환경으로 이관
    위에 여러 workflow tool들을 적용 하기 위한 필수 선택이 container 환경일 것이고 그 환경 중에 Kubernetes 환경은 모든 public cloud 업체에서 제공하는 가장 표준에 가까운 환경이라고 생각합니다. 표준이 되는 환경을 구성한 뒤, 해당 파이프라인에 최적화된 workflow tool들을 구동하려고 합니다.
up to nav button