Ch4. Airflow 태스크 템플릿 및 XCom

2023. 11. 16. 14:06· #️⃣ Data Engineering/Airflow
목차
  1. Ch4. Airflow 태스크 템플릿
  2. 1. 모든 태스크 콘텍스트 변수 목록 및 출력
  3. 2. BashOperator로 태스크 콘텍스트 활용법
  4. 3. PythonOperator로 태스크 콘텍스트 활용법
  5. 4. PythonOperator에 변수제공
  6. 5. 템플릿 인수 검사
  7. 6. XCom이란
  8. 7. PostgresOperator

Ch4. Airflow 태스크 템플릿


1. 모든 태스크 콘텍스트 변수 목록 및 출력

def _print_context(**kwargs): # 혹은 (**context) <- 추천
	print(kwargs)

print_context = PythonOperator(
	tasl_id = "print_context".
	python_callable=_print_context.
	dag=dag
)

 

구분 키 설명
주요 설정 및 dag관련 conf Airflow 구성 접근
  dag 현재 DAG 개체
  dag_run 현재 DagRun 개체
  run_id DagRun의 run_id (일반적으로 접두사+datetime으로 구성된 키)
  var Airflow 변수를 처리하기 위한 헬퍼 개체
execution_date 관련 ds ‘%Y-%m-%d’ 형식의 execution_date
  ds_nodash ‘%Y%m%d’ 형식의 execution_date
  execution_date 태스크 스케줄 간격의 시작 날짜/시간
  next_ds ‘%Y%m%d’ 형식의 다음 스케줄 간격의 execution_date
  next_ds_nodash ‘%Y%m%d’ 형식의 다음 스케줄 간격의execution_date
  next_execution_date 태스크의 다음 스케줄 간격의 시작 datetime
  prev_ds ‘%Y-%m-%d’ 형식의 이전 스케줄 간격의 execution_date
  prev_ds_nodash ‘%Y%m%d’ 형식의 이전 스케줄 간격의 execution_date
  prev_execution_date 태스크 이전 스케줄 간격의 시작 datetime
  prev_execution_date_success 동일한 태스크의 마지막으로 성공적으로 완료된 실행의 시작 datetime (과거에만 해당)
  prev_start_date_success 동일한 태스크의 마지막으로 성공적으로 시작된 날짜와 시간 (과거에만 해당)
  tomorrow_ds ds(실행시간)에서 1을 더함
  tomorrow_ds_nodash ds_nodash에서 1일 더함
  ts ISO8601 포맷의 execution_date
  ts_nodash %Y%m%dT%H%M%S 형식의 execution_date
  ts_nodash_with_tz 시간 정보가 있는 ts_nodash
  yesterday_ds ds(실행시간)에서 1을 뺌
  yesterday_ds_nodash ds_nodash에서 1일 뺌
기타 설정 Inlets task.inlets의 약어; 데이터 계보에 대한 입력 데이터 소스를 추적하는 기능
  macros airflow.macros 모듈
  outlets task.outlets의 약어; 데이터 계보 lineage에 대한 출력 데이터 소스를 추적하는 기능
  params 태스크 콘텍스트에 대한 사용자 제공 변수
task 관련 task 현재 오퍼레이터
  task_instance 현재 TaskInstance 객체
  task_instance_key_str 현재 TaskInstance 객체의 고유 식별자 ({dag_id}{task_id}{ds_nodash})
  templates_dict 태스크 콘텍스트에 대한 사용자 제공 변수
  test_mode Airflow가 테스트모드에서 실행 중인지 여부 (구성 속성)
  ti task_instance와 동일한 현재 TaskInstacne 객체

 

 

2. BashOperator로 태스크 콘텍스트 활용법

이중 중괄호는 “Jinja 템플릿 문자열” 나타냄

  • 템플릿 작성은 코드 작성 시점이 아닌 런타임 시에 값을 할당함
  • execution_date은 Pendulum 라이브러리의 datetime객체이며, 파이썬의 datetime과 호환됨
get_data = BashOperator(
	task_id = "get_data",
	bash_command = (
		"curl -o /tmp/wikipageview.gz"
		"<https://dumps.wikimedia.org/other/pageviews/>"
	    "{{execution_date.year}}/"
		.. (생략)
	    "{{ '{:02}'.format(execution_date.hour} }}0000.gz"
	)

 

 

 

3. PythonOperator로 태스크 콘텍스트 활용법

  1. 키워드 인수로 태스크 콘텍스트 받기
  2. 명시적으로 특정 키워드 인수를 적음. 적지 않은 나머지는 ** 인수에 전달됨
# 1. 키워드 인수로 태스크 콘텍스트 받기
def _print_context(**context): 
	start = context["executiono_date"]
	... (생략)

# 2. 명시적으로 특정 키워드 인수를 적음. 적지 않은 나머지는 ** 인수에 전달됨
def _get_date(execution_date, **context):
	year, month, day, hour, *_ = execution_date.timetuple()

print_context = PythonOperator(
	tasl_id = "print_context",
	python_callable=_print_context,
	dag=dag
)

 

 

 

4. PythonOperator에 변수제공

방법1. op_args로 인수 이용

def _get_date(output_path, **context):
	...(생략)

get_data = PythonOperator(
	task_id="get_date",
	python_callable=_get_data",
	op_args=["/tmp/wikipageview.gz"],
	dag=dag
)

방법2. op_kwargs 인수 사용

  • 사용자 정의 키워드 인수는 호출 가능한 함수에 전달되기 “전”에 템플릿화 함
def _get_date(output_path, year, **context):
	...(생략)

get_data = PythonOperator(
	task_id="get_date",
	python_callable=_get_data",
	op_args={
		"output_path": "/tmp/wikipageview.gz",
		"year" : "{{execution_date.year }}"
	},
	dag=dag
)

 

 

5. 템플릿 인수 검사

작업 실행 후 그래프 or 트리보기 선택 → ‘Rendered Template’ 버튼 클릭하여 템플릿 인수 값 검사 가능

⇒ ‘템플릿 보기’란 렌더링 되는 연산자의 모든 속성과 해당하는 값들을 표시함

아래는 CLI로 템플릿 렌더링 하는 명령어

airflow tasks render [dag_id] [task id] [desired execution date]

 

 

 

6. XCom이란

Airflow 태스크는 설정에 따라 “물리적으로 서로 다른 컴퓨터에서 독립적으로 실행”되므로 메모리에서 데이터 공유 불가함 ⇒ 태스크 간의 데이터는 다른 위치에 유지되어야 함

Airflow는 XCom 기본 매커니즘을 사용하여 Airflow 메타스토어에서 선택 가능한 picklable 개체를 저장하고 나중에 읽을 수 있따.

  • 이때 크기가 작은 오브젝트일때 XCom을 이용한 피클링이 적합하다
  • 좀 더 큰 데이터를 Task간에 전송할 때는 Airflow 외부에 데이터를 유지하는 것이 좋다. (일반적으로 디스크에 파일 생성)

 

 

7. PostgresOperator

사용자가 작성한 쿼리를 실행함

PostgresOperator는 Postgres 와 통신하기 위해 **훅(hook)**을 인스턴스화한다.

인스턴스화 된 훅은 (1)연결 생성, (2)Postgres에 쿼리 전송 후 (3)연결에 대한 종료 작업 처리함

이때 오퍼레이터는 사용자의 요청을 훅으로 전달하는 작업만 담당한다.

from airflow.providers.postgres.operators.postgres import PostgresOperator

dag = DAG(..., template_searchpath="/tmp") # sql 파일 탐색 경로

write_to_postgres=PostgresOperator(
	task_id="write_to_postgres",
	postgres_conn_id="my_postgres", # 연결에 사용할 인증 저보 식별자
	sql="postgres_query.sql", # sql 쿼리 및 쿼리를 포함하는 파일 경로
	dag=dag
)
  • template_searchpath : 각 오퍼레이터는 오퍼레이터에게 파일 경로를 제공하여 특정 확장자 이름으로 파일을 읽고 템플릿화 가능하다.

 


참고 : [책] Apache Airflow 기반의 데이터 파이프라인

저작자표시 비영리 동일조건 (새창열림)

'#️⃣ Data Engineering > Airflow' 카테고리의 다른 글

Ch6. Airflow 워크플로 트리거  (0) 2023.12.04
Ch5. Airflow 태스크 간 의존성  (0) 2023.11.22
Ch2,3. Airflow DAG 구조 및 스케줄링  (0) 2023.11.15
Ch1. Airflow 살펴보기  (0) 2023.11.14
Airflow 운영과 대안  (0) 2023.06.22
  1. Ch4. Airflow 태스크 템플릿
  2. 1. 모든 태스크 콘텍스트 변수 목록 및 출력
  3. 2. BashOperator로 태스크 콘텍스트 활용법
  4. 3. PythonOperator로 태스크 콘텍스트 활용법
  5. 4. PythonOperator에 변수제공
  6. 5. 템플릿 인수 검사
  7. 6. XCom이란
  8. 7. PostgresOperator
'#️⃣ Data Engineering/Airflow' 카테고리의 다른 글
  • Ch6. Airflow 워크플로 트리거
  • Ch5. Airflow 태스크 간 의존성
  • Ch2,3. Airflow DAG 구조 및 스케줄링
  • Ch1. Airflow 살펴보기
HyeM207
HyeM207
"Reflections and Growth Through Records" 회고와 기록을 통한 성장으로
HyeM207
HYEM's Storage
HyeM207
  • ALL (115)
    • #️⃣ CS (Computer Science) (5)
      • Database (2)
      • SQL (2)
      • Git (1)
    • #️⃣ Data Engineering (43)
      • Airflow (18)
      • Spark (8)
      • Snowflake (2)
      • BI,DashBoard (4)
      • ELK Stack (2)
      • Hadoop (5)
      • Kafka (4)
    • #️⃣ Cloud&Container (16)
      • AWS (8)
      • GCP (1)
      • Docker (6)
      • Kubernetes (1)
    • #️⃣ Project 및 개발일지 (37)
      • Mini Project (5)
      • 개발일지 (9)
      • Algorithm 문제 풀이 (20)
    • #️⃣ 책 리뷰 (4)
    • #️⃣ 회고글&프로젝트 후기 (10)

공지사항

인기 글

최근 댓글

블로그 메뉴

  • 홈
  • 태그
  • 방명록
hELLO · Designed By 정상우.v4.2.2
HyeM207
Ch4. Airflow 태스크 템플릿 및 XCom
상단으로

티스토리툴바

개인정보

  • 티스토리 홈
  • 포럼
  • 로그인

단축키

내 블로그

내 블로그 - 관리자 홈 전환
Q
Q
새 글 쓰기
W
W

블로그 게시글

글 수정 (권한 있는 경우)
E
E
댓글 영역으로 이동
C
C

모든 영역

이 페이지의 URL 복사
S
S
맨 위로 이동
T
T
티스토리 홈 이동
H
H
단축키 안내
Shift + /
⇧ + /

* 단축키는 한글/영문 대소문자로 이용 가능하며, 티스토리 기본 도메인에서만 동작합니다.