Ch5. 태스크 간 의존성
5.1 기본 의존성 유형
- 다양한 태스크 의존성 패턴이 있음
- 태스크 의존성을 명시적으로 지정 시 장점: 여러 태스크에서 (암묵적인) 순서를 명확하게 정의 가능
- 업스트림 태스크/다운스트림 태스크 : A (업스트림)→B (다운스트림태스크)
5.1.1. 선형 의존성 유형
>> 로 의존성 만듦
이때 기본 값으로 설정 시, 모든 오류는 Airflow에 의해 다운스트림 태스크로 전달되어 실행을 지연시킴
5.1.2. 팬인/팬아웃(Fan-in/Fan-out 의존성)
#️⃣팬아웃(Fan-out):
여러개의 입력 태스크 연결 수 제한) - 업스트림 DAG의 시작을 나타내는 더미 태스크 추가하면 암묵적인 팬아웃 설명하는데 도움됨
#️⃣팬인 구조(Fan-in) :
단일 다운스트림 태스크가 여러 업스트림 태스크에 의존성 갖는 구조 ⇒ 팬인(다 대 일) 의존성
- 이때 앞쪽의 업스트림 태스크는 병렬로 실행됨
5.2 브랜치하기
5.2.1 태스크 내에서 브랜치 (비추천)
e.g. 코드로 if문 분기하여 특정 함수 실행
- 장점: DAG에서 약간의 유연성 허용
- 단점: 특정 DAG 실행 중에 Airflow가 어떤 코드 분기를 사용하고 있는지 확인하기 어려움
5.2.2 DAG 내부에서 브랜치
BranchPythonOperator : 다운스트림 태스크 세트 중 선택할 수 있는 기능 제공
- 작업 결과로 다운스트림 태스크ID 혹은 리스트전달함 → 이로 다운스트림 태스크 결정함
Tip! 브랜치 조건 명확히 하는 법
트리거 규칙으로 브랜치 조건을 바꿀 수 있지만, 서로 다른 브랜치를 결합하는 더미 태스크 추가하는 것이 더 좋다!
- 이유 : DAG의 다른 태스크에 대한 트리거 규칙 변경할 필요 없이, 더미태스크 추가하면 기존의 작업을 그대로 둘 수 있기 때문이다. ⇒ 브랜치 더 독립적으로 유지 가능
5.3 조건부 태스크
특정 조건에 따라 DAG에서 특정 태스크를 건너뛸 수 있는 방법 소개함
5.3.1 태스크 내에서 조건 (비추천)
if문으로 특정 조건 만족 시 특정 태스크 실행
→ 이는 배포 로직 조건이 혼용되고, PythonOperator 이외의 다른 기본 제공 오퍼레이터를 더 이상 사용할 없음.
→ 또한 Airlfow UI로 태스크 결과 추적시 혼란스러움
5.3.2 조건부 태스크 만들기
특정 태스크 자체를 조건부화 하는것
새로 만든 조건부화 태스크 안에서 if문 사용하여 그 케이스가 아니라면 raise AirflowSkipException()를 사용하여 조건과 모든 다운스트림 태스크를 건너뛰도록 만들 수 있음
5.3.3 내장 오퍼레이터 사용하기
가장 최근 실행한 DAG만 실행하는 예를 구현한다면 LastOnlyOperator 클래스를 사용한다.
- 만약 더 복잡한 경우라면 PythonOperator 기반으로 구현하는 것이 더 효율적임
5.4 트리거 규칙
5.4.0 Airflow가 DAG 실행 내에서 작업 실행하는 방법
Airflow가 DAG를 실행 시
- 각 태스크를 지속적으로 확인하여 실행 여부를 확인함
- 태스크 실행 여부가 가능하다면, 바로 스케줄러에 의해 선택된 후 실행 예약함
5.4.1 트리거 규칙
- 트리거 규칙 : Airflow가 태스크가 ‘실행준비’가 되어 있는지 여부 결정하기 위한 필수적인 조건
- 전파 : 업스트림 태스크 결과가 다운 스트림 태스크에도 영향을 미치는 동작 유형
트리거 규칙 | 동작 | 사용사례 |
all_success (Default) | 모든 상위 태스크가 성공적으로 완료되면 트리거됨 | 일반적인 워크플로에 대한 기본 트리거 |
all_failed | 모든 상위태스크가 실패했거나 상위 태스크가 오류로 실패된 경우 트리거됨 | 태스크 그룹에서 하나 이상 실패가 예상되는 상황에서 오류 처리 코드 트리거 |
all_done | 결과 상태 관계없이 모든 부모가 실행완료하면 트리거됨 | 모든 태스크가 완료되었을 때 실행할 청소 코드 실행 (예. 시스템 종료 또는 클러스터 중지) |
one_failed | 하나 이상의 상위 태스크가 실패하자마자 트리거됨. 다른 상위 태스크의 실행 완료 기다리지 않음 | 알림or 롤백과 같은 일부 오류 처리 코드 빠르게 트리거 |
one_success | 한 부모가 성공하자마자 트리거되며 다른 상위 태스크의 실행 완료 기다리지 않음 | 하나의 결과를 사용할 수 있게 되는 즉시 다운스트림 연산/알림을 빠르게 트리거함 |
none_failed | 실패한 상위 태스크가 없지만, 태스크가 성공 or skip한 경우 트리거됨 | Airlfow DAG상 조건부 브랜치 결합 |
none_skipped | 건너뛴 상위태스크가 없지만 태스크가 성공 또는 실패한 경우 트리거됨 | 모든 업스트림 태스크가 실행된 경우, 해당 결과를 모두 무시하고 트리거함 |
dummy | 업스트림 태스크의 상태와 관계없이 트리거됨 | 테스트 시 |
5.5 태스크 간 데이터 공유
5.5.1 XCom 사용하기
XCom 이용하여 태스크 간 데이터를 공유함
- Airflow 컨택스트의 태스크 인스턴스의 xcom_push 매서드와 xcom_pull 사용하여 값을 넣고 가져올 수 있음
- xcom_pull : dag_id및 실행날짜 정의 가능하며, 디폴트로 현재 DAG와 실행 날짜로 설정됨
context["task_instance"].xcom_push(key="example_key", value="값")
...
example_key = context["task_instance"].xcom_pull(task_ids="test_task", key="example_key")
- 일부 오퍼레이터는 XCom을 자동으로 게시하는 기능 제공함
- BashOperator: xcom_push 옵션을 true로 설정시 ,’return_value’ 키 이름으로 return 값이 XCom으로 게시됨
- PythonOperator: 호출 가능한 인수에서 반환된 값을 XCom 값으로 게시함
5.5.2 XCom 사용 시 고려사항 (단점)
- 태스크 간의 묵시적인 의존성 필요함
- 이는 DAG에 표시되지 않으며 태스크 스케줄 시에도 고려되지 않으니, 의존성 있는 작업이 올바른 순서로 실행할 수 있도록 해야됨
- XCom은 오퍼레이터의 원자성을 무너뜨리는 패턴이 될 가능성 있음
- API 토큰 새로 고침하는 작업 수행을 따로 Task로 빼기보다는 API 수행하는 태스크안에서 같이 수행되는것이 좋음
- XCom이 저장하는 모든 값은 직렬화를 지원해야되는 기술적 한계 있음
- 람다, 여러 다중 멀티프로세스 관련 클래스와 같은 파이썬 일부 유형은 저장 불가함
- 또한 저장 크기또한 제한됨 (XCom은 Airflow 메타스토어에 저장됨)
5.5.3 커스텀 XCom 백엔드 사용하기
- Airflow 2에서는 ‘커스텀 XCom 백엔드’를 지정할 수 있게 됨.
- → 이는 XCom을 더 유연하게 활용 가능하게 하며, XCom 값 저장 선택을 다양하게 함.
- 활용 예시: 클라우드 서비스를 위한 커스텀 백엔드 구현 가능함
- 구현 법 : (1) BaseXCom 기본 클래스가 상속되어야 하고, (2) 값을 직렬화/역직렬화하기 위해 두 가지 정적 매서드를 각각 구현해야됨
from airflow.models.xcom impoort BaseXCom
class CustomXComBackend(BaseXCom):
@staticsmethod
def serialize_value(value: Any):
...
@staticsmethod
def deserialize_value(result) -> Any:
...
5.6 Taskflow API로 파이썬 태스크 연결하기
- 기존 방식 단점 (일반 API): 먼저 함수를 정의한 후, PythonOperator를 이용해 Airflow 태스크를 생성해야됨. xcom_push, xcom_pull 사용하여 값 전달해야됨
- Taskflow API : 파이썬 태스크 및 의존성을 정의하기 위한 새로운 데코레이터 기반 API를 추가적으로 제공함.
- 파이선 함수 == 태스크 @task 라는 데코레이터로 변환
- Xcom을 명시적으로 게시 안하고 함수 반환값으로 다음 태스크(의 인수)로 전달 가능함
- Taskflow API 사용하지 않을 때 (단점) : TaskFow API는 PythonOperator를 사용하여 구현되는 파이썬 태스크로 제한됨.
- 다른 Airflow 오퍼레이터와 관련된 태스크는 일반 API를 사용하여 태스크 및 태스크 의존성을 정의해야됨
- 기존 일반 API 방식과 Taskflow API 방식 혼용은 가능하지만, 코드가 복잡해보일 수도 있음
참고 : [책] Apache Airflow 기반의 데이터 파이프라인

'#️⃣ Data Engineering > Airflow' 카테고리의 다른 글
Ch7. Airflow 외부 시스템과 통신하기 (0) | 2023.12.07 |
---|---|
Ch6. Airflow 워크플로 트리거 (0) | 2023.12.04 |
Ch4. Airflow 태스크 템플릿 및 XCom (0) | 2023.11.16 |
Ch2,3. Airflow DAG 구조 및 스케줄링 (0) | 2023.11.15 |
Ch1. Airflow 살펴보기 (0) | 2023.11.14 |
Ch5. 태스크 간 의존성
5.1 기본 의존성 유형
- 다양한 태스크 의존성 패턴이 있음
- 태스크 의존성을 명시적으로 지정 시 장점: 여러 태스크에서 (암묵적인) 순서를 명확하게 정의 가능
- 업스트림 태스크/다운스트림 태스크 : A (업스트림)→B (다운스트림태스크)
5.1.1. 선형 의존성 유형
>> 로 의존성 만듦
이때 기본 값으로 설정 시, 모든 오류는 Airflow에 의해 다운스트림 태스크로 전달되어 실행을 지연시킴
5.1.2. 팬인/팬아웃(Fan-in/Fan-out 의존성)
#️⃣팬아웃(Fan-out):
여러개의 입력 태스크 연결 수 제한) - 업스트림 DAG의 시작을 나타내는 더미 태스크 추가하면 암묵적인 팬아웃 설명하는데 도움됨
#️⃣팬인 구조(Fan-in) :
단일 다운스트림 태스크가 여러 업스트림 태스크에 의존성 갖는 구조 ⇒ 팬인(다 대 일) 의존성
- 이때 앞쪽의 업스트림 태스크는 병렬로 실행됨
5.2 브랜치하기
5.2.1 태스크 내에서 브랜치 (비추천)
e.g. 코드로 if문 분기하여 특정 함수 실행
- 장점: DAG에서 약간의 유연성 허용
- 단점: 특정 DAG 실행 중에 Airflow가 어떤 코드 분기를 사용하고 있는지 확인하기 어려움
5.2.2 DAG 내부에서 브랜치
BranchPythonOperator : 다운스트림 태스크 세트 중 선택할 수 있는 기능 제공
- 작업 결과로 다운스트림 태스크ID 혹은 리스트전달함 → 이로 다운스트림 태스크 결정함
Tip! 브랜치 조건 명확히 하는 법
트리거 규칙으로 브랜치 조건을 바꿀 수 있지만, 서로 다른 브랜치를 결합하는 더미 태스크 추가하는 것이 더 좋다!
- 이유 : DAG의 다른 태스크에 대한 트리거 규칙 변경할 필요 없이, 더미태스크 추가하면 기존의 작업을 그대로 둘 수 있기 때문이다. ⇒ 브랜치 더 독립적으로 유지 가능
5.3 조건부 태스크
특정 조건에 따라 DAG에서 특정 태스크를 건너뛸 수 있는 방법 소개함
5.3.1 태스크 내에서 조건 (비추천)
if문으로 특정 조건 만족 시 특정 태스크 실행
→ 이는 배포 로직 조건이 혼용되고, PythonOperator 이외의 다른 기본 제공 오퍼레이터를 더 이상 사용할 없음.
→ 또한 Airlfow UI로 태스크 결과 추적시 혼란스러움
5.3.2 조건부 태스크 만들기
특정 태스크 자체를 조건부화 하는것
새로 만든 조건부화 태스크 안에서 if문 사용하여 그 케이스가 아니라면 raise AirflowSkipException()를 사용하여 조건과 모든 다운스트림 태스크를 건너뛰도록 만들 수 있음
5.3.3 내장 오퍼레이터 사용하기
가장 최근 실행한 DAG만 실행하는 예를 구현한다면 LastOnlyOperator 클래스를 사용한다.
- 만약 더 복잡한 경우라면 PythonOperator 기반으로 구현하는 것이 더 효율적임
5.4 트리거 규칙
5.4.0 Airflow가 DAG 실행 내에서 작업 실행하는 방법
Airflow가 DAG를 실행 시
- 각 태스크를 지속적으로 확인하여 실행 여부를 확인함
- 태스크 실행 여부가 가능하다면, 바로 스케줄러에 의해 선택된 후 실행 예약함
5.4.1 트리거 규칙
- 트리거 규칙 : Airflow가 태스크가 ‘실행준비’가 되어 있는지 여부 결정하기 위한 필수적인 조건
- 전파 : 업스트림 태스크 결과가 다운 스트림 태스크에도 영향을 미치는 동작 유형
트리거 규칙 | 동작 | 사용사례 |
all_success (Default) | 모든 상위 태스크가 성공적으로 완료되면 트리거됨 | 일반적인 워크플로에 대한 기본 트리거 |
all_failed | 모든 상위태스크가 실패했거나 상위 태스크가 오류로 실패된 경우 트리거됨 | 태스크 그룹에서 하나 이상 실패가 예상되는 상황에서 오류 처리 코드 트리거 |
all_done | 결과 상태 관계없이 모든 부모가 실행완료하면 트리거됨 | 모든 태스크가 완료되었을 때 실행할 청소 코드 실행 (예. 시스템 종료 또는 클러스터 중지) |
one_failed | 하나 이상의 상위 태스크가 실패하자마자 트리거됨. 다른 상위 태스크의 실행 완료 기다리지 않음 | 알림or 롤백과 같은 일부 오류 처리 코드 빠르게 트리거 |
one_success | 한 부모가 성공하자마자 트리거되며 다른 상위 태스크의 실행 완료 기다리지 않음 | 하나의 결과를 사용할 수 있게 되는 즉시 다운스트림 연산/알림을 빠르게 트리거함 |
none_failed | 실패한 상위 태스크가 없지만, 태스크가 성공 or skip한 경우 트리거됨 | Airlfow DAG상 조건부 브랜치 결합 |
none_skipped | 건너뛴 상위태스크가 없지만 태스크가 성공 또는 실패한 경우 트리거됨 | 모든 업스트림 태스크가 실행된 경우, 해당 결과를 모두 무시하고 트리거함 |
dummy | 업스트림 태스크의 상태와 관계없이 트리거됨 | 테스트 시 |
5.5 태스크 간 데이터 공유
5.5.1 XCom 사용하기
XCom 이용하여 태스크 간 데이터를 공유함
- Airflow 컨택스트의 태스크 인스턴스의 xcom_push 매서드와 xcom_pull 사용하여 값을 넣고 가져올 수 있음
- xcom_pull : dag_id및 실행날짜 정의 가능하며, 디폴트로 현재 DAG와 실행 날짜로 설정됨
context["task_instance"].xcom_push(key="example_key", value="값")
...
example_key = context["task_instance"].xcom_pull(task_ids="test_task", key="example_key")
- 일부 오퍼레이터는 XCom을 자동으로 게시하는 기능 제공함
- BashOperator: xcom_push 옵션을 true로 설정시 ,’return_value’ 키 이름으로 return 값이 XCom으로 게시됨
- PythonOperator: 호출 가능한 인수에서 반환된 값을 XCom 값으로 게시함
5.5.2 XCom 사용 시 고려사항 (단점)
- 태스크 간의 묵시적인 의존성 필요함
- 이는 DAG에 표시되지 않으며 태스크 스케줄 시에도 고려되지 않으니, 의존성 있는 작업이 올바른 순서로 실행할 수 있도록 해야됨
- XCom은 오퍼레이터의 원자성을 무너뜨리는 패턴이 될 가능성 있음
- API 토큰 새로 고침하는 작업 수행을 따로 Task로 빼기보다는 API 수행하는 태스크안에서 같이 수행되는것이 좋음
- XCom이 저장하는 모든 값은 직렬화를 지원해야되는 기술적 한계 있음
- 람다, 여러 다중 멀티프로세스 관련 클래스와 같은 파이썬 일부 유형은 저장 불가함
- 또한 저장 크기또한 제한됨 (XCom은 Airflow 메타스토어에 저장됨)
5.5.3 커스텀 XCom 백엔드 사용하기
- Airflow 2에서는 ‘커스텀 XCom 백엔드’를 지정할 수 있게 됨.
- → 이는 XCom을 더 유연하게 활용 가능하게 하며, XCom 값 저장 선택을 다양하게 함.
- 활용 예시: 클라우드 서비스를 위한 커스텀 백엔드 구현 가능함
- 구현 법 : (1) BaseXCom 기본 클래스가 상속되어야 하고, (2) 값을 직렬화/역직렬화하기 위해 두 가지 정적 매서드를 각각 구현해야됨
from airflow.models.xcom impoort BaseXCom
class CustomXComBackend(BaseXCom):
@staticsmethod
def serialize_value(value: Any):
...
@staticsmethod
def deserialize_value(result) -> Any:
...
5.6 Taskflow API로 파이썬 태스크 연결하기
- 기존 방식 단점 (일반 API): 먼저 함수를 정의한 후, PythonOperator를 이용해 Airflow 태스크를 생성해야됨. xcom_push, xcom_pull 사용하여 값 전달해야됨
- Taskflow API : 파이썬 태스크 및 의존성을 정의하기 위한 새로운 데코레이터 기반 API를 추가적으로 제공함.
- 파이선 함수 == 태스크 @task 라는 데코레이터로 변환
- Xcom을 명시적으로 게시 안하고 함수 반환값으로 다음 태스크(의 인수)로 전달 가능함
- Taskflow API 사용하지 않을 때 (단점) : TaskFow API는 PythonOperator를 사용하여 구현되는 파이썬 태스크로 제한됨.
- 다른 Airflow 오퍼레이터와 관련된 태스크는 일반 API를 사용하여 태스크 및 태스크 의존성을 정의해야됨
- 기존 일반 API 방식과 Taskflow API 방식 혼용은 가능하지만, 코드가 복잡해보일 수도 있음
참고 : [책] Apache Airflow 기반의 데이터 파이프라인

'#️⃣ Data Engineering > Airflow' 카테고리의 다른 글
Ch7. Airflow 외부 시스템과 통신하기 (0) | 2023.12.07 |
---|---|
Ch6. Airflow 워크플로 트리거 (0) | 2023.12.04 |
Ch4. Airflow 태스크 템플릿 및 XCom (0) | 2023.11.16 |
Ch2,3. Airflow DAG 구조 및 스케줄링 (0) | 2023.11.15 |
Ch1. Airflow 살펴보기 (0) | 2023.11.14 |