Ch6. Airflow 워크플로 트리거

2023. 12. 4. 23:24· #️⃣ Data Engineering/Airflow
목차
  1. Ch6. Airflow 워크플로 트리거
  2. 워크플로 트리거 방법
  3. 6.1 센서를 사용한 폴링 조건
  4. Sensor: Airflow 오퍼레이터의 특수타입(서브클래스)
  5. Poke(포크) / Poking
  6. FileSensor
  7. PythonSensor
  8. 센서 처리
  9. 센서 데드록 (Sensor Deadlock 상태)
  10. 센서 Mode : poke와 reschedule
  11. 6.2 다른 DAG 트리거
  12. TriggerDagRunOperator
  13. TriggerDagRunOperator를 활용한 태스크 분리
  14. ExternalTaskSensor
  15. ExternalTaskSensor의 인수: execution_delta
  16. 번외. ExternalTaskSensor의 인수: execution_date_fn 함수
  17. 6.3 REST/CLI로 워크플로 시작
  18. Airflow CLI로 DAG 트리거
  19. Airflow REST API로 DAG 트리거

Ch6. Airflow 워크플로 트리거

워크플로 트리거 방법

  1. 스케줄 간격 ← ch2
  2. 특정 태스크 수행 후 트리거 ← ch6

6.1 센서를 사용한 폴링 조건


Sensor: Airflow 오퍼레이터의 특수타입(서브클래스)

  • 센서는 특정 조건이 true인지 지속적으로 확인하고 true라면 성공함

Poke(포크) / Poking

Poking은 센서를 실행하고 상태를 확인하기 위해 Airflow에서 사용하는 이름

e.g. 센서는 대략 1분에 한 번씩 주어진 파일이 있는지 포크(Poke)한다.

FileSensor

  • FileSensor : 파일 존재 여부에 따라 true/false반환.
    • False시 해당 센서는 지정된 시간 (기본 60초) 대기 후 다시 시도함
  • 글로빙(globbing)을 사용하여 파일/디렉토리 이름과 패턴을 일치시킴
from airflow.sensors.filesystem import FileSensor

wait = FileSensor(
    task_id="wait_for_supermarket_1", filepath="/data/supermarket1/data.csv", dag=dag
)

PythonSensor

  • PythonSensor: 파이썬 callable을 지원하며, PythonSensor callable은 성공적으로 조건 충족 여부에 따라 true/false의 Boolean 값을 반환하는 것으로 제한됨
    • 두 가지 조건 이상을 확인할 때 가독성이 좋음
from airflow.sensors.python import PythonSensor

def _wait_for_supermarket(supermarket_id_):
    supermarket_path = Path("/data/" + supermarket_id_)
    data_files = supermarket_path.glob("data-*.csv")
    success_file = supermarket_path / "_SUCCESS"
    return data_files and success_file.exists()

wait_for_supermarket_1 = PythonSensor(
    task_id="wait_for_supermarket_1",
    python_callable=_wait_for_supermarket,
    op_kwargs={"supermarket_id": "supermarket1"},
    dag=dag,
)

센서 처리

  • timeout : 센서의 최대 실행 허용 시간(초)을 지정하는 인수
    • 실행시간이 타임아웃 설정값 초과 시 실패 반환
    • 기본값 : 7일
  • Dag의 Concurrency : DAG의 최대 동시 태스크 수 설정하여 Sensor의 대시 태스크 수에 제한을 줄 수 있음

센서 데드록 (Sensor Deadlock 상태)

Sensor를 설정한 DAG의 실행할 수 있는 최대 태스크 수에 도달해 차단되는 것. 혹은 Airflow 전역 설정 최대 태스크 제한에 걸려 전체 시스템 정지되는 현상

센서 Mode : poke와 reschedule

센서 클래스는 mode인수가 있어 poke(기본)과 reschedule로 설정 가능하다.

  • poke (기본) : 최대 태스크 제한 도달시 새로운 태스크 차단됨
    • 즉, 센서 태스크가 실행중인 동안 태스크 슬롯 차지함
  • reschedule : 포크 동작을 실행할 때만 슬롯 차지. 대기 시간 동안은 슬롯 차지 X

6.2 다른 DAG 트리거

위에 Sensor를 이용한 트리거는 Task 대상임

지금부터는 DAG를 트리거하는 것을 알아봄

▪️ [참고] 각 DAG 실행의 run_id 필드
schedule__ : 스케줄되어 DAG 실행 시작됨
backfill__ : 백필 태스크에 의해 DAG 실행됨
manual__ : 수동으로 DAG 실행됨을 나타냄

TriggerDagRunOperator

TriggerDagRunOperator를 이용하여 다른 DAG를 트리거함.

  • TriggerDagRunOperator가 포함된 DAG에서 태스크를 삭제하면, 이전에 트리거된 해당 DAG 실행을 지우는 대신 새 DAG 실행이 트리거됨
trigger_create_metrics_dag = TriggerDagRunOperator(
        task_id=f"trigger_create_metrics_dag_supermarket_{supermarket_id}",
        trigger_dag_id="listing_6_04_dag02",
        dag=dag1,
    )
  • task_id : TriggerDagRunOperator의 고유 task id
  • trigger_dag_id : 해당 dag 다음에 실행할 (trigger할) dag id
  • dag: 현재 TriggerDagRunOperator가 속한 Dag

예제 DAG 코드 (전문)

더보기

예제 DAG 코드 (전문)

from pathlib import Path

import airflow.utils.dates
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.python import PythonSensor

dag1 = DAG(
    dag_id="listing_6_04_dag01",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="0 16 * * *",
)
dag2 = DAG(
    dag_id="listing_6_04_dag02",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval=None,
)


def _wait_for_supermarket(supermarket_id_):
    supermarket_path = Path("/data/" + supermarket_id_)
    data_files = supermarket_path.glob("data-*.csv")
    success_file = supermarket_path / "_SUCCESS"
    return data_files and success_file.exists()


for supermarket_id in range(1, 5):
    wait = PythonSensor(
        task_id=f"wait_for_supermarket_{supermarket_id}",
        python_callable=_wait_for_supermarket,
        op_kwargs={"supermarket_id_": f"supermarket{supermarket_id}"},
        dag=dag1,
    )
    copy = DummyOperator(task_id=f"copy_to_raw_supermarket_{supermarket_id}", dag=dag1)
    process = DummyOperator(task_id=f"process_supermarket_{supermarket_id}", dag=dag1)
    trigger_create_metrics_dag = TriggerDagRunOperator(
        task_id=f"trigger_create_metrics_dag_supermarket_{supermarket_id}",
        trigger_dag_id="listing_6_04_dag02",
        dag=dag1,
    )
    wait >> copy >> process >> trigger_create_metrics_dag

compute_differences = DummyOperator(task_id="compute_differences", dag=dag2)
update_dashboard = DummyOperator(task_id="update_dashboard", dag=dag2)
notify_new_data = DummyOperator(task_id="notify_new_data", dag=dag2)
compute_differences >> update_dashboard

TriggerDagRunOperator를 활용한 태스크 분리

DAG가 아주 복잡해지는 경우 태스크 명확성을 위해 아래와 같이 TriggerDagRunOperator로 다양한 의존성 구현이 가능함

  1. 첫번째 DAG를 여러개의 DAG로 분할
    DAG 1 → DAG 2
  2. 각각의 해당 DAG에 대해 TriggerDagRunOperator 태스크를 수행
    DAGS 1,2, and 3 (TriggerDagRunOperator 위치함)→ DAG4
  3. 여러 다운스트림 DAG를 트리거하는 하나의 DAG TriggerDagRunOperator를 사용
    DAG1 (TriggerDagRunOperator 위치함) → DAG 2,3 and 4

ExternalTaskSensor

다른 DAG에서 태스크 상태를 포크하는 센서.

즉, 다른 DAG의 태스크를 지정하여 해당 태스크의 상태를 확인하는 것 (의존성 있음)

  • e.g. DAG1,2,3이 각각 데이터 추출,변환 및 적재하고, 3개 DAG가 완료된 경우만 DAG4(집계된 지표의 데이터 세트를 계산)를 실행함.
  • 이때 DAG4에 ExternalTaskSensor를 두어 세 개의 DAG가 모두 완료된 상태를 확인하는 프락시 역할을 함
wait = ExternalTaskSensor(
    task_id="wait_for_process_supermarket",
    external_dag_id="figure_6_20_dag_1",
    external_task_id="process_supermarket",
    execution_delta=datetime.timedelta(hours=6),
    dag=dag2,
)
report = DummyOperator(task_id="report", dag=dag2)
wait >> report

ExternalTaskSensor의 인수: execution_delta

ExternalTaskSensor 기본 동작은 자신과 정확히 동일한 실행 날짜를 가진 태스크에 대한 성공만 확인함

즉, 시간이 맞지 않으면 timeout 까지만 기다리다가 종료된다.

→ 다른 태스크를 검색할 수 있도록 오프셋 (offset(간격)) 설정할 수 있음

execution_delta=datetime.timedelta(hours=6),

양수의 timedelta값은 시간을 거슬러 올라가는 것을 의미함

번외. ExternalTaskSensor의 인수: execution_date_fn 함수

해당 인수를 통해 timedelta의 목록을 반환하는 함수를 제공할 수 있음

현재 실행일을 받는 함수 원하는 실행 날짜를 쿼리로 반환함. 
!! execution_delta 둘 중 하나 또는 execution_date_fn을 ExternalTaskSensor에 전달할 수 있지만 둘 다 전달은 불가

 

아래는 매 시 50분 마다 실행하는 가정

execution_date_fn=lambda x: x - timedelta(minutes=50),

 

아래는 sensing(감지)하는 task의 execution_date에 맞게 1시간 일찍 트리거함

execution_date_fn=lambda x: x - timedelta(hours=1),

 

 

https://airflow.apache.org/docs/apache-airflow/1.10.3/_api/airflow/sensors/external_task_sensor/index.html 

 

airflow.sensors.external_task_sensor — Airflow Documentation

execution_delta (datetime.timedelta) – time difference with the previous execution to look at, the default is the same execution_date as the current task or DAG. For yesterday, use [positive!] datetime.timedelta(days=1). Either execution_delta or executi

airflow.apache.org

 


6.3 REST/CLI로 워크플로 시작

Airflow 외부에서 워크플로를 시작하는 경우에 사용됨

  • CI/CD 파이프라인 일부
  • AWS S3버킷에 임의 시간에 저장되는 데이터 확인을 위해 Airflow 센서 대신, AWS Lambda 함수로 DAG 트리거

Airflow CLI로 DAG 트리거

1. 실행 날짜가 현재 날짜 및 시간으로 설정된 dag 트리거

airflow dags trigger dag_id

2. 추가 구성으로 DAG 트리거하기

airflow dags trigger -c '{"supermarket_id": 1}' dag1
airflow dags trigger --conf '{"supermarket_id": 1}' dag1

Airflow REST API로 DAG 트리거

curl \\ 
-u admin:admin \\ 
-x POST \\
"<https://localhost:8080/api/v1/dags/pring_dag_run_conf/dagRuns>" \\
-H "Content-Type: apllication/json" \\ 
-d '{"conf":{}}'
  • -u admin:admin : 이는 바람직하지 X. 올바른 방법은 공식 문서 참고
  • -d '{"conf":{}}' : 추가 구성이 설정되지 않은 경우에도 엔드포인트에는 데이터 필요함

 

 


참고 : [책] Apache Airflow 기반의 데이터 파이프라인

 

저작자표시 비영리 동일조건 (새창열림)

'#️⃣ Data Engineering > Airflow' 카테고리의 다른 글

Ch8. Airflow 커스텀 컴포넌트 빌드  (3) 2023.12.12
Ch7. Airflow 외부 시스템과 통신하기  (0) 2023.12.07
Ch5. Airflow 태스크 간 의존성  (0) 2023.11.22
Ch4. Airflow 태스크 템플릿 및 XCom  (0) 2023.11.16
Ch2,3. Airflow DAG 구조 및 스케줄링  (0) 2023.11.15
  1. Ch6. Airflow 워크플로 트리거
  2. 워크플로 트리거 방법
  3. 6.1 센서를 사용한 폴링 조건
  4. Sensor: Airflow 오퍼레이터의 특수타입(서브클래스)
  5. Poke(포크) / Poking
  6. FileSensor
  7. PythonSensor
  8. 센서 처리
  9. 센서 데드록 (Sensor Deadlock 상태)
  10. 센서 Mode : poke와 reschedule
  11. 6.2 다른 DAG 트리거
  12. TriggerDagRunOperator
  13. TriggerDagRunOperator를 활용한 태스크 분리
  14. ExternalTaskSensor
  15. ExternalTaskSensor의 인수: execution_delta
  16. 번외. ExternalTaskSensor의 인수: execution_date_fn 함수
  17. 6.3 REST/CLI로 워크플로 시작
  18. Airflow CLI로 DAG 트리거
  19. Airflow REST API로 DAG 트리거
'#️⃣ Data Engineering/Airflow' 카테고리의 다른 글
  • Ch8. Airflow 커스텀 컴포넌트 빌드
  • Ch7. Airflow 외부 시스템과 통신하기
  • Ch5. Airflow 태스크 간 의존성
  • Ch4. Airflow 태스크 템플릿 및 XCom
HyeM207
HyeM207
"Reflections and Growth Through Records" 회고와 기록을 통한 성장으로
HyeM207
HYEM's Storage
HyeM207
  • ALL (115)
    • #️⃣ CS (Computer Science) (5)
      • Database (2)
      • SQL (2)
      • Git (1)
    • #️⃣ Data Engineering (43)
      • Airflow (18)
      • Spark (8)
      • Snowflake (2)
      • BI,DashBoard (4)
      • ELK Stack (2)
      • Hadoop (5)
      • Kafka (4)
    • #️⃣ Cloud&Container (16)
      • AWS (8)
      • GCP (1)
      • Docker (6)
      • Kubernetes (1)
    • #️⃣ Project 및 개발일지 (37)
      • Mini Project (5)
      • 개발일지 (9)
      • Algorithm 문제 풀이 (20)
    • #️⃣ 책 리뷰 (4)
    • #️⃣ 회고글&프로젝트 후기 (10)

공지사항

인기 글

최근 댓글

블로그 메뉴

  • 홈
  • 태그
  • 방명록
hELLO · Designed By 정상우.v4.2.2
HyeM207
Ch6. Airflow 워크플로 트리거
상단으로

티스토리툴바

개인정보

  • 티스토리 홈
  • 포럼
  • 로그인

단축키

내 블로그

내 블로그 - 관리자 홈 전환
Q
Q
새 글 쓰기
W
W

블로그 게시글

글 수정 (권한 있는 경우)
E
E
댓글 영역으로 이동
C
C

모든 영역

이 페이지의 URL 복사
S
S
맨 위로 이동
T
T
티스토리 홈 이동
H
H
단축키 안내
Shift + /
⇧ + /

* 단축키는 한글/영문 대소문자로 이용 가능하며, 티스토리 기본 도메인에서만 동작합니다.