☀️Ch2. Airflow DAG 구조
- Airflow 워크플로는 DAG 로 표시함
- 태스크를 나누는 방법에는 정답이나 오답은 없다. 다만 몇 사지 고려사항에 맞게 적용하기
- 태스크 실행 순서 정의 (= 태스크 의존성 설정) : >> 로 표시함
- “오퍼레이터와 태스크 차이점”
- 오퍼레이터(Operator) : 단일 작업 수행 역할
- 태스크(Task)는 오퍼레이터 상태를 관리하고 사용자에게 상태 변경을 표시하는 Airflow 내장 컴포넌트
- TASK 코드 작성 시, 변수 이름을 실행할 함수 명과 task_id를 동일하게 하면 가독성 좋음
- Airflow 웹서버 웹 브라우저에서 실패한 태스크 옵션
- Clear: 태스크를 초기화하고 이를 재실행함 (reset)
- OK: 실패한 태스크와 연속된 태스크가 초기화됨
☀️Ch3. Airflow 스케줄링
Execution_date(실행날짜)는 스케줄 간격의 실행 시작 시간 (DAG 시작 시간 X)
- execution_date: 해당 "스케줄 간격”의 시작 시간
- next_execution_date: 스케줄 간격의 종료 시간
- previous_execution_date: 과거 스케줄 간격의 시간
위 두 매개변수(next_execution_date, previous_execution_date)는 스케줄 간격 이후의 DAG 실행을 통해서만 정의되므로, UI나 CLI로 수동으로 실행 시 매개변수 값이 정의 되지 않아 사용할 수 없음
데이터 파티셔닝
“파티셔닝(Partitioning)” 데이터 세트를 관리하기 쉬운 작은 단위로 나누어, 데이터를 저장 및 처리하는 전략
”파티션(Partition)”은 데이터 세트의 작은 부분
def _cal_stats(**context):
"""Calculates event statistics"
input_path = context["templates_dict"]["input_path"]
output_path = context["templates_dict"]["output_path"]
... (생략)
cal_stats = PythonOperator(
task_id="cal_stats",
python_callable=_cal_stats,
templates_dict={
"input_path" = "/data/events/{{ds}}.json",
"output_path" = "/date/events/{{ds}}.csv"
},
dag=dag,
}
간격 기반 스케줄링
증분 데이터 처리 유형을 수행하는데 적합함
(1) 자주 사용되는 스케줄 간격에 대한 Airflow 프리셋
@once | 1회만 실행하도록 스케줄 |
@hourly | 매 정각 1회 실행 |
@daily | 매일 자정에 1회 실행 |
@weekly | 매주 일요일 자정에 1회 실행 |
@monthly | 매월 1일 자정에 1회 실행 |
@yearly | 매년 1월 1일 자정에 1회 실행 |
(2) cron 기반 스케줄 간격
* * * * *
분 시간 일 월 요일
0~59 0~23 1~31 1~12 0(일)~6(토)
- * : 신경쓰지 않음
- - : 값의 범위
- , : 값의 리스트
0 * * * * | 매 시간 |
0 0 * * * | 매일 자정 |
0 0 * * 0 | 매주 일요일 자정 |
0 0 1 * * | 매월 1일 자정 |
45 23 * * SAT | 매주 토요일 23시 45분 |
0 0 * * MON, WED, FRI | 월,수,금 매일 자정 |
0 0 * * MON-FRI | 월~금 매일 자정 |
0 0,12 * * * | 매일 정오와 자정에 실행 |
(3) 빈도 기반 스케줄 간격 설정
timedelta로 특정 시간 간격으로 설정함
dag=DAG(
dag_id="test",
schedule_interval=**dt.timedelta(days=3),**
...
)
- days
- minutes
- hours
특정 날짜 지정 및 진자 템플릿 활용
"start_date={{execution_date.strftime('%Y-%m-%d')}}"
"&end_date={{next_execution_date.strftime('%Y-%m-%d')}}"
혹은
"start_date={{ds}}"
"&end_date={{next_ds}"
- next_ds: 스케줄 간격의 종료시간 (YYYY-MM-DD 형태)
- next_ds_nodash: 스케줄 간격의 종료시간 YYYYMMDD 형태
- prev_ds: 과거 스케줄 간격의 시간 (YYYY-MM-DD 형태)
- prev_ds_nodash: 과거 스케줄 간격의 시간(YYYYMMDD형태)
백필 설정
백필이란, 과거 시작 날짜부터 과거 간격을 정의한 것. 이를 통해 DAG의 과거 기록을 실행할 수 있음
- DAG의 catchup매개변수를 true가 디폴트 값으로, 과거의 스케줄 간격을 포함하여 백필 실행함
태스크 디자인 모범사례
1. 원자성 (Atomicity)
모든 것이 완료되거나 완료되지 않도록 보장해야됨.
Airflow의 태스크는 성공적으로 수행하여 적절한 결과 생성 or 시스템에 영향 주지 않고 실패해야됨
- 원자성 유지하기 위해 일련의 과정을 다수의 태스크로 분리할 수 있음
- 다만, 두 태스크 사이에 강한 의존성이 있을 경우에는 하나의 일관된 태스크 단위 형성이 더 나음 (e.g. api 호출 전 로그인이 필요한 경우)
2. 멱등성 (Idempotency)
동일한 입력으로 동일한 태스크 여러번 호출해도 결과가 동일해야됨.
(즉, 입력에 변화가 없으면 다시 실행해도 전체 결과 변경이 없어야 됨)
멱등성은 일관성과 장애 처리를 보장함
- 멱등성 태스크 : {{ds}} 명으로 각 json파일 존재하여 해당 파일에 이벤트 추가
- 비멱등성 태스크 : 단일 json파일을 사용하고 해당 파일에 이벤트 추가
책 [Apache Airflow기반의 데이터 파이프라인] 을 읽으며 정리한 내용을 기록했습니다.
'#️⃣ Data Engineering > Airflow' 카테고리의 다른 글
Ch5. Airflow 태스크 간 의존성 (0) | 2023.11.22 |
---|---|
Ch4. Airflow 태스크 템플릿 및 XCom (0) | 2023.11.16 |
Ch1. Airflow 살펴보기 (0) | 2023.11.14 |
Airflow 운영과 대안 (0) | 2023.06.22 |
Airflow 실습_OLTP에서 OLAP으로 데이터 적재하기 (0) | 2023.06.09 |