Ch4. Airflow 태스크 템플릿
1. 모든 태스크 콘텍스트 변수 목록 및 출력
def _print_context(**kwargs): # 혹은 (**context) <- 추천
print(kwargs)
print_context = PythonOperator(
tasl_id = "print_context".
python_callable=_print_context.
dag=dag
)
구분 | 키 | 설명 |
주요 설정 및 dag관련 | conf | Airflow 구성 접근 |
dag | 현재 DAG 개체 | |
dag_run | 현재 DagRun 개체 | |
run_id | DagRun의 run_id (일반적으로 접두사+datetime으로 구성된 키) | |
var | Airflow 변수를 처리하기 위한 헬퍼 개체 | |
execution_date 관련 | ds | ‘%Y-%m-%d’ 형식의 execution_date |
ds_nodash | ‘%Y%m%d’ 형식의 execution_date | |
execution_date | 태스크 스케줄 간격의 시작 날짜/시간 | |
next_ds | ‘%Y%m%d’ 형식의 다음 스케줄 간격의 execution_date | |
next_ds_nodash | ‘%Y%m%d’ 형식의 다음 스케줄 간격의execution_date | |
next_execution_date | 태스크의 다음 스케줄 간격의 시작 datetime | |
prev_ds | ‘%Y-%m-%d’ 형식의 이전 스케줄 간격의 execution_date | |
prev_ds_nodash | ‘%Y%m%d’ 형식의 이전 스케줄 간격의 execution_date | |
prev_execution_date | 태스크 이전 스케줄 간격의 시작 datetime | |
prev_execution_date_success | 동일한 태스크의 마지막으로 성공적으로 완료된 실행의 시작 datetime (과거에만 해당) | |
prev_start_date_success | 동일한 태스크의 마지막으로 성공적으로 시작된 날짜와 시간 (과거에만 해당) | |
tomorrow_ds | ds(실행시간)에서 1을 더함 | |
tomorrow_ds_nodash | ds_nodash에서 1일 더함 | |
ts | ISO8601 포맷의 execution_date | |
ts_nodash | %Y%m%dT%H%M%S 형식의 execution_date | |
ts_nodash_with_tz | 시간 정보가 있는 ts_nodash | |
yesterday_ds | ds(실행시간)에서 1을 뺌 | |
yesterday_ds_nodash | ds_nodash에서 1일 뺌 | |
기타 설정 | Inlets | task.inlets의 약어; 데이터 계보에 대한 입력 데이터 소스를 추적하는 기능 |
macros | airflow.macros 모듈 | |
outlets | task.outlets의 약어; 데이터 계보 lineage에 대한 출력 데이터 소스를 추적하는 기능 | |
params | 태스크 콘텍스트에 대한 사용자 제공 변수 | |
task 관련 | task | 현재 오퍼레이터 |
task_instance | 현재 TaskInstance 객체 | |
task_instance_key_str | 현재 TaskInstance 객체의 고유 식별자 ({dag_id}{task_id}{ds_nodash}) | |
templates_dict | 태스크 콘텍스트에 대한 사용자 제공 변수 | |
test_mode | Airflow가 테스트모드에서 실행 중인지 여부 (구성 속성) | |
ti | task_instance와 동일한 현재 TaskInstacne 객체 |
2. BashOperator로 태스크 콘텍스트 활용법
이중 중괄호는 “Jinja 템플릿 문자열” 나타냄
- 템플릿 작성은 코드 작성 시점이 아닌 런타임 시에 값을 할당함
- execution_date은 Pendulum 라이브러리의 datetime객체이며, 파이썬의 datetime과 호환됨
get_data = BashOperator(
task_id = "get_data",
bash_command = (
"curl -o /tmp/wikipageview.gz"
"<https://dumps.wikimedia.org/other/pageviews/>"
"{{execution_date.year}}/"
.. (생략)
"{{ '{:02}'.format(execution_date.hour} }}0000.gz"
)
3. PythonOperator로 태스크 콘텍스트 활용법
- 키워드 인수로 태스크 콘텍스트 받기
- 명시적으로 특정 키워드 인수를 적음. 적지 않은 나머지는 ** 인수에 전달됨
# 1. 키워드 인수로 태스크 콘텍스트 받기
def _print_context(**context):
start = context["executiono_date"]
... (생략)
# 2. 명시적으로 특정 키워드 인수를 적음. 적지 않은 나머지는 ** 인수에 전달됨
def _get_date(execution_date, **context):
year, month, day, hour, *_ = execution_date.timetuple()
print_context = PythonOperator(
tasl_id = "print_context",
python_callable=_print_context,
dag=dag
)
4. PythonOperator에 변수제공
방법1. op_args로 인수 이용
def _get_date(output_path, **context):
...(생략)
get_data = PythonOperator(
task_id="get_date",
python_callable=_get_data",
op_args=["/tmp/wikipageview.gz"],
dag=dag
)
방법2. op_kwargs 인수 사용
- 사용자 정의 키워드 인수는 호출 가능한 함수에 전달되기 “전”에 템플릿화 함
def _get_date(output_path, year, **context):
...(생략)
get_data = PythonOperator(
task_id="get_date",
python_callable=_get_data",
op_args={
"output_path": "/tmp/wikipageview.gz",
"year" : "{{execution_date.year }}"
},
dag=dag
)
5. 템플릿 인수 검사
작업 실행 후 그래프 or 트리보기 선택 → ‘Rendered Template’ 버튼 클릭하여 템플릿 인수 값 검사 가능
⇒ ‘템플릿 보기’란 렌더링 되는 연산자의 모든 속성과 해당하는 값들을 표시함
아래는 CLI로 템플릿 렌더링 하는 명령어
airflow tasks render [dag_id] [task id] [desired execution date]
6. XCom이란
Airflow 태스크는 설정에 따라 “물리적으로 서로 다른 컴퓨터에서 독립적으로 실행”되므로 메모리에서 데이터 공유 불가함 ⇒ 태스크 간의 데이터는 다른 위치에 유지되어야 함
Airflow는 XCom 기본 매커니즘을 사용하여 Airflow 메타스토어에서 선택 가능한 picklable 개체를 저장하고 나중에 읽을 수 있따.
- 이때 크기가 작은 오브젝트일때 XCom을 이용한 피클링이 적합하다
- 좀 더 큰 데이터를 Task간에 전송할 때는 Airflow 외부에 데이터를 유지하는 것이 좋다. (일반적으로 디스크에 파일 생성)
7. PostgresOperator
사용자가 작성한 쿼리를 실행함
PostgresOperator는 Postgres 와 통신하기 위해 **훅(hook)**을 인스턴스화한다.
인스턴스화 된 훅은 (1)연결 생성, (2)Postgres에 쿼리 전송 후 (3)연결에 대한 종료 작업 처리함
이때 오퍼레이터는 사용자의 요청을 훅으로 전달하는 작업만 담당한다.
from airflow.providers.postgres.operators.postgres import PostgresOperator
dag = DAG(..., template_searchpath="/tmp") # sql 파일 탐색 경로
write_to_postgres=PostgresOperator(
task_id="write_to_postgres",
postgres_conn_id="my_postgres", # 연결에 사용할 인증 저보 식별자
sql="postgres_query.sql", # sql 쿼리 및 쿼리를 포함하는 파일 경로
dag=dag
)
- template_searchpath : 각 오퍼레이터는 오퍼레이터에게 파일 경로를 제공하여 특정 확장자 이름으로 파일을 읽고 템플릿화 가능하다.
참고 : [책] Apache Airflow 기반의 데이터 파이프라인
'#️⃣ Data Engineering > Airflow' 카테고리의 다른 글
Ch6. Airflow 워크플로 트리거 (0) | 2023.12.04 |
---|---|
Ch5. Airflow 태스크 간 의존성 (0) | 2023.11.22 |
Ch2,3. Airflow DAG 구조 및 스케줄링 (0) | 2023.11.15 |
Ch1. Airflow 살펴보기 (0) | 2023.11.14 |
Airflow 운영과 대안 (0) | 2023.06.22 |