-
Ch6. Airflow 워크플로 트리거
-
워크플로 트리거 방법
-
6.1 센서를 사용한 폴링 조건
-
Sensor: Airflow 오퍼레이터의 특수타입(서브클래스)
-
Poke(포크) / Poking
-
FileSensor
-
PythonSensor
-
센서 처리
-
센서 데드록 (Sensor Deadlock 상태)
-
센서 Mode : poke와 reschedule
-
6.2 다른 DAG 트리거
-
TriggerDagRunOperator
-
TriggerDagRunOperator를 활용한 태스크 분리
-
ExternalTaskSensor
-
ExternalTaskSensor의 인수: execution_delta
-
번외. ExternalTaskSensor의 인수: execution_date_fn 함수
-
6.3 REST/CLI로 워크플로 시작
-
Airflow CLI로 DAG 트리거
-
Airflow REST API로 DAG 트리거
Ch6. Airflow 워크플로 트리거
워크플로 트리거 방법
- 스케줄 간격 ← ch2
- 특정 태스크 수행 후 트리거 ← ch6
6.1 센서를 사용한 폴링 조건
Sensor: Airflow 오퍼레이터의 특수타입(서브클래스)
- 센서는 특정 조건이 true인지 지속적으로 확인하고 true라면 성공함
Poke(포크) / Poking
Poking은 센서를 실행하고 상태를 확인하기 위해 Airflow에서 사용하는 이름
e.g. 센서는 대략 1분에 한 번씩 주어진 파일이 있는지 포크(Poke)한다.
FileSensor
- FileSensor : 파일 존재 여부에 따라 true/false반환.
- False시 해당 센서는 지정된 시간 (기본 60초) 대기 후 다시 시도함
- 글로빙(globbing)을 사용하여 파일/디렉토리 이름과 패턴을 일치시킴
from airflow.sensors.filesystem import FileSensor
wait = FileSensor(
task_id="wait_for_supermarket_1", filepath="/data/supermarket1/data.csv", dag=dag
)
PythonSensor
- PythonSensor: 파이썬 callable을 지원하며, PythonSensor callable은 성공적으로 조건 충족 여부에 따라 true/false의 Boolean 값을 반환하는 것으로 제한됨
- 두 가지 조건 이상을 확인할 때 가독성이 좋음
from airflow.sensors.python import PythonSensor
def _wait_for_supermarket(supermarket_id_):
supermarket_path = Path("/data/" + supermarket_id_)
data_files = supermarket_path.glob("data-*.csv")
success_file = supermarket_path / "_SUCCESS"
return data_files and success_file.exists()
wait_for_supermarket_1 = PythonSensor(
task_id="wait_for_supermarket_1",
python_callable=_wait_for_supermarket,
op_kwargs={"supermarket_id": "supermarket1"},
dag=dag,
)
센서 처리
- timeout : 센서의 최대 실행 허용 시간(초)을 지정하는 인수
- 실행시간이 타임아웃 설정값 초과 시 실패 반환
- 기본값 : 7일
- Dag의 Concurrency : DAG의 최대 동시 태스크 수 설정하여 Sensor의 대시 태스크 수에 제한을 줄 수 있음
센서 데드록 (Sensor Deadlock 상태)
Sensor를 설정한 DAG의 실행할 수 있는 최대 태스크 수에 도달해 차단되는 것. 혹은 Airflow 전역 설정 최대 태스크 제한에 걸려 전체 시스템 정지되는 현상
센서 Mode : poke와 reschedule
센서 클래스는 mode인수가 있어 poke(기본)과 reschedule로 설정 가능하다.
- poke (기본) : 최대 태스크 제한 도달시 새로운 태스크 차단됨
- 즉, 센서 태스크가 실행중인 동안 태스크 슬롯 차지함
- reschedule : 포크 동작을 실행할 때만 슬롯 차지. 대기 시간 동안은 슬롯 차지 X
6.2 다른 DAG 트리거
위에 Sensor를 이용한 트리거는 Task 대상임
지금부터는 DAG를 트리거하는 것을 알아봄
▪️ [참고] 각 DAG 실행의 run_id 필드
schedule__ : 스케줄되어 DAG 실행 시작됨
backfill__ : 백필 태스크에 의해 DAG 실행됨
manual__ : 수동으로 DAG 실행됨을 나타냄
TriggerDagRunOperator
TriggerDagRunOperator를 이용하여 다른 DAG를 트리거함.
- TriggerDagRunOperator가 포함된 DAG에서 태스크를 삭제하면, 이전에 트리거된 해당 DAG 실행을 지우는 대신 새 DAG 실행이 트리거됨
trigger_create_metrics_dag = TriggerDagRunOperator(
task_id=f"trigger_create_metrics_dag_supermarket_{supermarket_id}",
trigger_dag_id="listing_6_04_dag02",
dag=dag1,
)
- task_id : TriggerDagRunOperator의 고유 task id
- trigger_dag_id : 해당 dag 다음에 실행할 (trigger할) dag id
- dag: 현재 TriggerDagRunOperator가 속한 Dag
예제 DAG 코드 (전문)
예제 DAG 코드 (전문)
from pathlib import Path
import airflow.utils.dates
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.python import PythonSensor
dag1 = DAG(
dag_id="listing_6_04_dag01",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="0 16 * * *",
)
dag2 = DAG(
dag_id="listing_6_04_dag02",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval=None,
)
def _wait_for_supermarket(supermarket_id_):
supermarket_path = Path("/data/" + supermarket_id_)
data_files = supermarket_path.glob("data-*.csv")
success_file = supermarket_path / "_SUCCESS"
return data_files and success_file.exists()
for supermarket_id in range(1, 5):
wait = PythonSensor(
task_id=f"wait_for_supermarket_{supermarket_id}",
python_callable=_wait_for_supermarket,
op_kwargs={"supermarket_id_": f"supermarket{supermarket_id}"},
dag=dag1,
)
copy = DummyOperator(task_id=f"copy_to_raw_supermarket_{supermarket_id}", dag=dag1)
process = DummyOperator(task_id=f"process_supermarket_{supermarket_id}", dag=dag1)
trigger_create_metrics_dag = TriggerDagRunOperator(
task_id=f"trigger_create_metrics_dag_supermarket_{supermarket_id}",
trigger_dag_id="listing_6_04_dag02",
dag=dag1,
)
wait >> copy >> process >> trigger_create_metrics_dag
compute_differences = DummyOperator(task_id="compute_differences", dag=dag2)
update_dashboard = DummyOperator(task_id="update_dashboard", dag=dag2)
notify_new_data = DummyOperator(task_id="notify_new_data", dag=dag2)
compute_differences >> update_dashboard
TriggerDagRunOperator를 활용한 태스크 분리
DAG가 아주 복잡해지는 경우 태스크 명확성을 위해 아래와 같이 TriggerDagRunOperator로 다양한 의존성 구현이 가능함
- 첫번째 DAG를 여러개의 DAG로 분할
DAG 1 → DAG 2 - 각각의 해당 DAG에 대해 TriggerDagRunOperator 태스크를 수행
DAGS 1,2, and 3 (TriggerDagRunOperator 위치함)→ DAG4 - 여러 다운스트림 DAG를 트리거하는 하나의 DAG TriggerDagRunOperator를 사용
DAG1 (TriggerDagRunOperator 위치함) → DAG 2,3 and 4
ExternalTaskSensor
다른 DAG에서 태스크 상태를 포크하는 센서.
즉, 다른 DAG의 태스크를 지정하여 해당 태스크의 상태를 확인하는 것 (의존성 있음)
- e.g. DAG1,2,3이 각각 데이터 추출,변환 및 적재하고, 3개 DAG가 완료된 경우만 DAG4(집계된 지표의 데이터 세트를 계산)를 실행함.
- 이때 DAG4에 ExternalTaskSensor를 두어 세 개의 DAG가 모두 완료된 상태를 확인하는 프락시 역할을 함
wait = ExternalTaskSensor(
task_id="wait_for_process_supermarket",
external_dag_id="figure_6_20_dag_1",
external_task_id="process_supermarket",
execution_delta=datetime.timedelta(hours=6),
dag=dag2,
)
report = DummyOperator(task_id="report", dag=dag2)
wait >> report
ExternalTaskSensor의 인수: execution_delta
ExternalTaskSensor 기본 동작은 자신과 정확히 동일한 실행 날짜를 가진 태스크에 대한 성공만 확인함
즉, 시간이 맞지 않으면 timeout 까지만 기다리다가 종료된다.
→ 다른 태스크를 검색할 수 있도록 오프셋 (offset(간격)) 설정할 수 있음
execution_delta=datetime.timedelta(hours=6),
양수의 timedelta값은 시간을 거슬러 올라가는 것을 의미함
번외. ExternalTaskSensor의 인수: execution_date_fn 함수
해당 인수를 통해 timedelta의 목록을 반환하는 함수를 제공할 수 있음
현재 실행일을 받는 함수 원하는 실행 날짜를 쿼리로 반환함.
!! execution_delta 둘 중 하나 또는 execution_date_fn을 ExternalTaskSensor에 전달할 수 있지만 둘 다 전달은 불가
아래는 매 시 50분 마다 실행하는 가정
execution_date_fn=lambda x: x - timedelta(minutes=50),
아래는 sensing(감지)하는 task의 execution_date에 맞게 1시간 일찍 트리거함
execution_date_fn=lambda x: x - timedelta(hours=1),
airflow.sensors.external_task_sensor — Airflow Documentation
execution_delta (datetime.timedelta) – time difference with the previous execution to look at, the default is the same execution_date as the current task or DAG. For yesterday, use [positive!] datetime.timedelta(days=1). Either execution_delta or executi
airflow.apache.org
6.3 REST/CLI로 워크플로 시작
Airflow 외부에서 워크플로를 시작하는 경우에 사용됨
- CI/CD 파이프라인 일부
- AWS S3버킷에 임의 시간에 저장되는 데이터 확인을 위해 Airflow 센서 대신, AWS Lambda 함수로 DAG 트리거
Airflow CLI로 DAG 트리거
1. 실행 날짜가 현재 날짜 및 시간으로 설정된 dag 트리거
airflow dags trigger dag_id
2. 추가 구성으로 DAG 트리거하기
airflow dags trigger -c '{"supermarket_id": 1}' dag1
airflow dags trigger --conf '{"supermarket_id": 1}' dag1
Airflow REST API로 DAG 트리거
curl \\
-u admin:admin \\
-x POST \\
"<https://localhost:8080/api/v1/dags/pring_dag_run_conf/dagRuns>" \\
-H "Content-Type: apllication/json" \\
-d '{"conf":{}}'
- -u admin:admin : 이는 바람직하지 X. 올바른 방법은 공식 문서 참고
- -d '{"conf":{}}' : 추가 구성이 설정되지 않은 경우에도 엔드포인트에는 데이터 필요함
참고 : [책] Apache Airflow 기반의 데이터 파이프라인

'#️⃣ Data Engineering > Airflow' 카테고리의 다른 글
Ch8. Airflow 커스텀 컴포넌트 빌드 (3) | 2023.12.12 |
---|---|
Ch7. Airflow 외부 시스템과 통신하기 (0) | 2023.12.07 |
Ch5. Airflow 태스크 간 의존성 (0) | 2023.11.22 |
Ch4. Airflow 태스크 템플릿 및 XCom (0) | 2023.11.16 |
Ch2,3. Airflow DAG 구조 및 스케줄링 (0) | 2023.11.15 |
Ch6. Airflow 워크플로 트리거
워크플로 트리거 방법
- 스케줄 간격 ← ch2
- 특정 태스크 수행 후 트리거 ← ch6
6.1 센서를 사용한 폴링 조건
Sensor: Airflow 오퍼레이터의 특수타입(서브클래스)
- 센서는 특정 조건이 true인지 지속적으로 확인하고 true라면 성공함
Poke(포크) / Poking
Poking은 센서를 실행하고 상태를 확인하기 위해 Airflow에서 사용하는 이름
e.g. 센서는 대략 1분에 한 번씩 주어진 파일이 있는지 포크(Poke)한다.
FileSensor
- FileSensor : 파일 존재 여부에 따라 true/false반환.
- False시 해당 센서는 지정된 시간 (기본 60초) 대기 후 다시 시도함
- 글로빙(globbing)을 사용하여 파일/디렉토리 이름과 패턴을 일치시킴
from airflow.sensors.filesystem import FileSensor
wait = FileSensor(
task_id="wait_for_supermarket_1", filepath="/data/supermarket1/data.csv", dag=dag
)
PythonSensor
- PythonSensor: 파이썬 callable을 지원하며, PythonSensor callable은 성공적으로 조건 충족 여부에 따라 true/false의 Boolean 값을 반환하는 것으로 제한됨
- 두 가지 조건 이상을 확인할 때 가독성이 좋음
from airflow.sensors.python import PythonSensor
def _wait_for_supermarket(supermarket_id_):
supermarket_path = Path("/data/" + supermarket_id_)
data_files = supermarket_path.glob("data-*.csv")
success_file = supermarket_path / "_SUCCESS"
return data_files and success_file.exists()
wait_for_supermarket_1 = PythonSensor(
task_id="wait_for_supermarket_1",
python_callable=_wait_for_supermarket,
op_kwargs={"supermarket_id": "supermarket1"},
dag=dag,
)
센서 처리
- timeout : 센서의 최대 실행 허용 시간(초)을 지정하는 인수
- 실행시간이 타임아웃 설정값 초과 시 실패 반환
- 기본값 : 7일
- Dag의 Concurrency : DAG의 최대 동시 태스크 수 설정하여 Sensor의 대시 태스크 수에 제한을 줄 수 있음
센서 데드록 (Sensor Deadlock 상태)
Sensor를 설정한 DAG의 실행할 수 있는 최대 태스크 수에 도달해 차단되는 것. 혹은 Airflow 전역 설정 최대 태스크 제한에 걸려 전체 시스템 정지되는 현상
센서 Mode : poke와 reschedule
센서 클래스는 mode인수가 있어 poke(기본)과 reschedule로 설정 가능하다.
- poke (기본) : 최대 태스크 제한 도달시 새로운 태스크 차단됨
- 즉, 센서 태스크가 실행중인 동안 태스크 슬롯 차지함
- reschedule : 포크 동작을 실행할 때만 슬롯 차지. 대기 시간 동안은 슬롯 차지 X
6.2 다른 DAG 트리거
위에 Sensor를 이용한 트리거는 Task 대상임
지금부터는 DAG를 트리거하는 것을 알아봄
▪️ [참고] 각 DAG 실행의 run_id 필드
schedule__ : 스케줄되어 DAG 실행 시작됨
backfill__ : 백필 태스크에 의해 DAG 실행됨
manual__ : 수동으로 DAG 실행됨을 나타냄
TriggerDagRunOperator
TriggerDagRunOperator를 이용하여 다른 DAG를 트리거함.
- TriggerDagRunOperator가 포함된 DAG에서 태스크를 삭제하면, 이전에 트리거된 해당 DAG 실행을 지우는 대신 새 DAG 실행이 트리거됨
trigger_create_metrics_dag = TriggerDagRunOperator(
task_id=f"trigger_create_metrics_dag_supermarket_{supermarket_id}",
trigger_dag_id="listing_6_04_dag02",
dag=dag1,
)
- task_id : TriggerDagRunOperator의 고유 task id
- trigger_dag_id : 해당 dag 다음에 실행할 (trigger할) dag id
- dag: 현재 TriggerDagRunOperator가 속한 Dag
예제 DAG 코드 (전문)
예제 DAG 코드 (전문)
from pathlib import Path
import airflow.utils.dates
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.python import PythonSensor
dag1 = DAG(
dag_id="listing_6_04_dag01",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="0 16 * * *",
)
dag2 = DAG(
dag_id="listing_6_04_dag02",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval=None,
)
def _wait_for_supermarket(supermarket_id_):
supermarket_path = Path("/data/" + supermarket_id_)
data_files = supermarket_path.glob("data-*.csv")
success_file = supermarket_path / "_SUCCESS"
return data_files and success_file.exists()
for supermarket_id in range(1, 5):
wait = PythonSensor(
task_id=f"wait_for_supermarket_{supermarket_id}",
python_callable=_wait_for_supermarket,
op_kwargs={"supermarket_id_": f"supermarket{supermarket_id}"},
dag=dag1,
)
copy = DummyOperator(task_id=f"copy_to_raw_supermarket_{supermarket_id}", dag=dag1)
process = DummyOperator(task_id=f"process_supermarket_{supermarket_id}", dag=dag1)
trigger_create_metrics_dag = TriggerDagRunOperator(
task_id=f"trigger_create_metrics_dag_supermarket_{supermarket_id}",
trigger_dag_id="listing_6_04_dag02",
dag=dag1,
)
wait >> copy >> process >> trigger_create_metrics_dag
compute_differences = DummyOperator(task_id="compute_differences", dag=dag2)
update_dashboard = DummyOperator(task_id="update_dashboard", dag=dag2)
notify_new_data = DummyOperator(task_id="notify_new_data", dag=dag2)
compute_differences >> update_dashboard
TriggerDagRunOperator를 활용한 태스크 분리
DAG가 아주 복잡해지는 경우 태스크 명확성을 위해 아래와 같이 TriggerDagRunOperator로 다양한 의존성 구현이 가능함
- 첫번째 DAG를 여러개의 DAG로 분할
DAG 1 → DAG 2 - 각각의 해당 DAG에 대해 TriggerDagRunOperator 태스크를 수행
DAGS 1,2, and 3 (TriggerDagRunOperator 위치함)→ DAG4 - 여러 다운스트림 DAG를 트리거하는 하나의 DAG TriggerDagRunOperator를 사용
DAG1 (TriggerDagRunOperator 위치함) → DAG 2,3 and 4
ExternalTaskSensor
다른 DAG에서 태스크 상태를 포크하는 센서.
즉, 다른 DAG의 태스크를 지정하여 해당 태스크의 상태를 확인하는 것 (의존성 있음)
- e.g. DAG1,2,3이 각각 데이터 추출,변환 및 적재하고, 3개 DAG가 완료된 경우만 DAG4(집계된 지표의 데이터 세트를 계산)를 실행함.
- 이때 DAG4에 ExternalTaskSensor를 두어 세 개의 DAG가 모두 완료된 상태를 확인하는 프락시 역할을 함
wait = ExternalTaskSensor(
task_id="wait_for_process_supermarket",
external_dag_id="figure_6_20_dag_1",
external_task_id="process_supermarket",
execution_delta=datetime.timedelta(hours=6),
dag=dag2,
)
report = DummyOperator(task_id="report", dag=dag2)
wait >> report
ExternalTaskSensor의 인수: execution_delta
ExternalTaskSensor 기본 동작은 자신과 정확히 동일한 실행 날짜를 가진 태스크에 대한 성공만 확인함
즉, 시간이 맞지 않으면 timeout 까지만 기다리다가 종료된다.
→ 다른 태스크를 검색할 수 있도록 오프셋 (offset(간격)) 설정할 수 있음
execution_delta=datetime.timedelta(hours=6),
양수의 timedelta값은 시간을 거슬러 올라가는 것을 의미함
번외. ExternalTaskSensor의 인수: execution_date_fn 함수
해당 인수를 통해 timedelta의 목록을 반환하는 함수를 제공할 수 있음
현재 실행일을 받는 함수 원하는 실행 날짜를 쿼리로 반환함.
!! execution_delta 둘 중 하나 또는 execution_date_fn을 ExternalTaskSensor에 전달할 수 있지만 둘 다 전달은 불가
아래는 매 시 50분 마다 실행하는 가정
execution_date_fn=lambda x: x - timedelta(minutes=50),
아래는 sensing(감지)하는 task의 execution_date에 맞게 1시간 일찍 트리거함
execution_date_fn=lambda x: x - timedelta(hours=1),
airflow.sensors.external_task_sensor — Airflow Documentation
execution_delta (datetime.timedelta) – time difference with the previous execution to look at, the default is the same execution_date as the current task or DAG. For yesterday, use [positive!] datetime.timedelta(days=1). Either execution_delta or executi
airflow.apache.org
6.3 REST/CLI로 워크플로 시작
Airflow 외부에서 워크플로를 시작하는 경우에 사용됨
- CI/CD 파이프라인 일부
- AWS S3버킷에 임의 시간에 저장되는 데이터 확인을 위해 Airflow 센서 대신, AWS Lambda 함수로 DAG 트리거
Airflow CLI로 DAG 트리거
1. 실행 날짜가 현재 날짜 및 시간으로 설정된 dag 트리거
airflow dags trigger dag_id
2. 추가 구성으로 DAG 트리거하기
airflow dags trigger -c '{"supermarket_id": 1}' dag1
airflow dags trigger --conf '{"supermarket_id": 1}' dag1
Airflow REST API로 DAG 트리거
curl \\
-u admin:admin \\
-x POST \\
"<https://localhost:8080/api/v1/dags/pring_dag_run_conf/dagRuns>" \\
-H "Content-Type: apllication/json" \\
-d '{"conf":{}}'
- -u admin:admin : 이는 바람직하지 X. 올바른 방법은 공식 문서 참고
- -d '{"conf":{}}' : 추가 구성이 설정되지 않은 경우에도 엔드포인트에는 데이터 필요함
참고 : [책] Apache Airflow 기반의 데이터 파이프라인

'#️⃣ Data Engineering > Airflow' 카테고리의 다른 글
Ch8. Airflow 커스텀 컴포넌트 빌드 (3) | 2023.12.12 |
---|---|
Ch7. Airflow 외부 시스템과 통신하기 (0) | 2023.12.07 |
Ch5. Airflow 태스크 간 의존성 (0) | 2023.11.22 |
Ch4. Airflow 태스크 템플릿 및 XCom (0) | 2023.11.16 |
Ch2,3. Airflow DAG 구조 및 스케줄링 (0) | 2023.11.15 |