이전글(링크)에서 Airflow에 대해 알아보았다.
이어서 Airflow를 설치하고 간단한 실습을 해보겠다.
설치는 직접 설치와 클라우드 서비스 이용하는 방법 2가지로 나뉘는데, 직접 설치하는 법(도커)을 중점적으로 정리한다.
01. Airflow 설치 방법
실무에서는 Airflow가 기본으로 서버 3대로 구동되어 비용 만만치 않음
가장 편리하고 좋은 것은 사양 좋은 ec2 인스턴스에 도커를 설치하고 이 위에 airflow를 구동하는 것임 (다만 최소 8gb의 메모리 요함)
개인 실습 용으로는 랩탑에 도커 설치하기
(1) 직접 설치
1. 로컬에 도커로 Airflow 설치
윈도우에서는 6GB, Mac기준 4GB 필요함
2. AWS EC2 리눅스(20.04)서버에 설치
t3.small나 t3a.small 인스턴스안에 Airflow 직접 설치함 (프리티어 아님)
- 서울 기준 t3.small/ t3a.small 인스턴스의 경우 각각 한달 비용이 약 $18, $16
(한 달에 약 2만원 정도 소모됨)
(2) 클라우드 (완전관리형 서비스)
- AWS: MWAA (Managed Workflows for Apache Airflow)
- GCP: Cloud Composer
- Azure: Azure Data Factory의 Airflow DAGs
02. 도커로 직접 설치
공식 사이트에서 제공하는 docker-compose 파일을 이용하여 쉽게 설치 가능하다.
- 설치하고자 하는 파일로 이동
cd airflow-setup
2. yml 파일 다운로드
curl -LfO "https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml"
3. 이미지 다운로드 및 컨테이너 실행
docker-compose -f docker-compose.yaml pull
docker-compose -f docker-compose.yaml up
4. 실행 후 접속하여 확인 (http://localhost:8080)
초기 id와 pw는 airflow이다.
03. Airflow 코드 기본 구조
- DAG 대표하는 객체 생성
- DAG이름
- 실행 주기
- 실행 날짜
- 오너
- DAG 구성하는 태스크 만듦
- task별 적절한 오퍼레이터 선택
- task Id 부여 후 해야할 작업의 세부사항 지정
- 최종적으로 태스크간 실행 순서 결정
04. 예제
1. 모든 테스크에 공통으로 적용될 설정 작성
from datetime import datetime, timedelta
default_args = {
'owner': 'hyemin',
'email': ['hmk9667@gmail.com'],
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
- owner : 소유자
- email : 이메일
- retries : 실패 시 재시도 횟수
- retry_dalay : 재시도 시 시간 간격
- on_failure_callback : task 실패 시 호출할 함수 지정
- on_success_callback : task 성공 시 호출할 함수 지정
2. DAG 객체 작성
from airflow import DAG
dag = DAG(
"dag_v1", # DAG name
start_date=datetime(2020,8,7,hour=0,minute=00),
schedule="0 * * * *",
tags=["example"],
catchup=False,
# common settings
default_args=default_args
)
- DAG 이름
- schedule : 매시 0분에 시작
- * * * * *: 분/시간/일/월/요일
- 한 번만 실행할 것이면 None이나 @once 설정
- 그 외 @hourly, @daily, @weekly, @monthly, @yearly
- tags : 기본으로 example 줌
- catchup : 기본은 True
- 만약 start_date이 실행 기준으로 과거이면, 기본적으로 과거의 안 된일을 catchup(실행)함.
- False로 지정하면, 해당 catchup을 안함
- full refresh하는 일이면 False로 지정하고, incremental update하는 형태라면 True로 지정한다.
- default_args: default_args에 위에서 만든 기본 설정 딕셔너리 설정
3. Bash Operator를 이용해 DAG 속 TASK 작성
TestDAG.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'hyemin',
'start_date': datetime(2023, 5, 27, hour=0, minute=00),
'email': ['hmk9667@gmail.com'],
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
test_dag = DAG(
"dag_v1", # DAG name
schedule="0 9 * * *",
tags=['test'],
catchup=False,
default_args=default_args
)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=test_dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
dag=test_dag)
t3 = BashOperator(
task_id='ls',
bash_command='ls /tmp',
dag=test_dag)
t1 >> [ t2, t3 ]
- dag_v1 은 총 3개의 테스크로 구성됨
- t1은 현재 시간 출력 (date)
- t2는 5초 대기 후 종료 (sleep 5)
- t3은 서버의 /tmp 디렉토리 내용 출력 (ls /tmp)
- t1이 끝나고 t2와 t3을 병렬로 실행 (t1 >> [ t2, t3])
4. 실행 (Web UI)
파일은 docker-compose up pull과 up을 실행했던 폴더에 dags 폴더 안에 TestDAG.py 이름으로 작성함
Airflow UI인 http://localhost:8080에 접속하여 dag_v1의 왼쪽 토글을 클릭하여 실행한다.
dag_v1을 눌러 작업의 상세 내용도 확인 가능하다.
- Grid : Task 별 진행 상황 왼쪽 그림으로 확인 가능.
- 각 task 별로 Logs 확인 가능
- Graph: 그래프 형식으로 task 순서 확인 가능
- Task Duration : 태스크 별로 걸린 시간 확인
- code : 작성한 DAG에 대한 코드 확인 가능
5. 실행 (터미널로 실행)
0) docker 컨테이너 접속 (쉘 스크립트로 접속)
docker ps
docker exec -it (Airflow 스케줄러 컨테이너id) sh
1) dags 조회
airflow dags list
2) 특정 Dag의 task 확인
airflow tasks list (DAG이름)
3) Dag 실행
airflow tasks test (DAG이름) (Task이름) (날짜)
airflow tasks run (DAG이름) (Task이름) (날짜)
- test : 실행. 결과가 메타 데이터 db에 저장 안 됨
- run : 실행. 결과가 메타 데이터 db에 저장됨
- 날짜는 YYYY-MM-DD
- start_date보다 과거인 경우는 실행이 되지만 오늘 날짜보다 미래인 경우 실행 안됨 (execution_date의 값)
'#️⃣ Data Engineering > Airflow' 카테고리의 다른 글
Airflow 실습_OpenAPI와 Upsert를 이용한 DAG (0) | 2023.06.08 |
---|---|
Airflow 실습_기본코드 정리 및 ETL 코드 개선하기 (0) | 2023.06.07 |
Airflow란? 구성요소 알아보기 (0) | 2023.06.07 |
데이터 파이프라인(ETL) 정의 및 설계 시 고려할 점 + 실습 (0) | 2023.06.05 |
[Airflow] Airflow 설치 (Docker 이용) (0) | 2022.02.06 |