지난 교육에서는 Bash Operator를 이용하여 간단한 실습을 진행했다.
이번에는 Python Opertaor와 Connection/Variables 그리고 Xcom 등을 이용하여 저번에 작성한 간단한 ETL 프로세스 코드를 개선하는 실습을 진행했다.
≫ 이를 통해 Airflow 구동에 필요한 기본 문법들을 배울 수 있었다.
본 글에서는 코드와 관련된 기본 개념을 정리 후, 실습 내용을 정리한다.
벌써부터 앞으로 진행할 심화된 실습들이 기대된다 🏃♂️🔥
PART1. 기본 코드 및 개념 정리
#️⃣ 01. default_args
모든 테스크에 공통으로 적용될 설정
from datetime import datetime, timedelta
default_args = {
'owner': 'hyemin',
'email': ['이메일주소@gmail.com'],
'retries': 1,
'retry_delay': timedelta(minutes=3),
# on_failure_callback
# on_success_callback
}
- owner : 소유자
- email : 이메일
- retries : 실패 시 재시도 횟수
- retry_dalay : 재시도 시 시간 간격
- on_failure_callback : task 실패 시 호출할 함수 지정
- on_success_callback : task 성공 시 호출할 함수 지정
#️⃣ 02. DAG
DAG 작성
from airflow import DAG
dag = DAG(
"dag_v1", # DAG name
start_date=datetime(2020,8,7,hour=0,minute=00),
schedule="0 * * * *",
tags=["example"],
catchup=False,
# common settings
default_args=default_args
)
- dag_id : DAG 이름
- schedule : 실행 주기
- * * * * *: 분/시간/일/월/요일
- 한 번만 실행할 것이면 None이나 @once 설정
- 그 외 @hourly, @daily, @weekly, @monthly, @yearly
- tags : 기본으로 example 줌
- catchup : (True). 과거의 backfill 여부
- incremental update하는 DAG에서만 중요
- default_args: default_args에 위에서 만든 기본 설정 딕셔너리 설정
- max_active_runs: 한 번에 동시에 실행할 DAG 수
- max_active_tasks: DAG에 속한 task를 동시에 실행할 task 수
⭐max_active_runs와 max_active_tasks의 max limit은 워커 노드의 CPU의 총 개수임
#️⃣ 03. Operator 및 Airflow Decorators
03-1. Bash Operator
Bash쉘에서 명령 실행 시 사용
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=test_dag)
- task_id
- bash_command
공식 문서 : https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/bash.html
예제 (Python Operator)
2개의 task(print_hello, print_goodbye) 단순 문자열 출력해주는 함수
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
dag = DAG(
dag_id='HelloWorld',
start_date=datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule='0 2 * * *')
def print_hello():
print("hello!")
return "hello!"
def print_goodbye():
print("goodbye!")
return "goodbye!"
print_hello = PythonOperator(
task_id = 'print_hello',
#python_callable param points to the function you want to run
python_callable = print_hello,
#dag param points to the DAG that this task is a part of
dag = dag)
print_goodbye = PythonOperator(
task_id = 'print_goodbye',
python_callable = print_goodbye,
dag = dag)
print_hello >> print_goodbye
- 하루에 한 번 2시 0분에 실행
- print_hello 함수 호출 후 print_goodbye 호출함 (print_hello >> print_goodbye)
- 만약 순서 지정 안 하면 두 task가 동시에 실행됨
03-2. Python Operator
함수 단위로 python코드를 사용
from airflow.operators.python import PythonOperator
load_nps = PythonOperator(
dag=dag,
task_id='task_id',
python_callable=python_func,
params={
'table': 'delighted_nps',
'schema': 'raw_data'
},
)
# 실행할 함수
def python_func(**cxt):
table = cxt["params"]["table"]
schema = cxt["params"]["schema"]
ex_date = cxt["execution_date"]
# do what you need to do
...
- python_callable : 호출할 python 함수명 (operator 실행시 python 함수도 호출함)
- params : python 함수 호출 시 넘겨줄 인자
- **ctx: 딕셔너리 형태로, params라는 키 밑에 operator에서 정의한 딕셔너리가 저장됨
공식 문서 : https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html
03-3. Airflow Decorators
python 함수에 @task 어노테이션을 붙여 task임을 명시함
@task
def print_hello():
print("hello!")
return "hello!"
@task
def print_goodbye():
print("goodbye!")
return "goodbye!"
with DAG(
dag_id = 'HelloWorld_v2',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule = '0 2 * * *'
) as dag:
# Assign the tasks to the DAG in order
print_hello() >> print_goodbye()
- @task : 엔트리 함수임을 나타냄
- 위의 예제1과 달리 PythonOperator 작성 안 해도 됨
- 순서는 python 이름으로 지정 가능 ( print_hello() >> print_goodbye())
#️⃣ 04. Connections, Variables
Connections
- PostgreSQL이나 Redshfit와 같은 연결 정보를 Airflow단에서 저장하고 관리함
- 호스트이름, port, access credential 등 연결 정보 저장
Variables
- API 키나 configuration 정보 저장
- 값을 암호화하려면 "access"나 "secret"을 이름에 사용함
위 정보는 WebUI나 Python 코드로 설정 가능하다.
#️⃣ 05. Xcom
태스크(Operator)들간에 데이터를 주고 받기 위한 방식
- 보통 한 Operator의 리턴값을 다른 Operator에서 읽어가는 형태가 됨
- 이 값들은 Airflow 메타 데이터 DB에 저장이 되기에 큰 데이터를 주고받는데는 사용불가
- 데이터가 크면 보통 s3에 저장함
- task를 함수 단위로 여러개 둘 때, 유용하게 사용 가능함
#️⃣ 06. PostgresHook
PostgreSQL 데이터베이스에서 SQL 쿼리를 실행하거나 데이터를 가져올 수 있는 모듈
from airflow.providers.postgres.hooks.postgres import PostgresHook
# from plugins import slack
def get_Redshift_connection(autocommit=True):
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
- PostgresHook 객체에, Airflow connection에 정의한 연결정보의 id를 지정한다.
#️⃣ 07. Tips
07-1. PostgresHook의 autocommit 파라미터
PostgresHook의 autocommit 파라미터는 Default가 False인데,
해당 경우 BEGIN은 아무런 영향 없다.
07-2. DAG에서 Task 분리는 어느 정도가 좋을까?
Task 너무 많이 만들면, 전체 DAG 실행시간 증가&스케줄러 부하
Task 너무 적게 만들면, 모듈화 안 되어 실패시 재실행 시간 증가
⭐오래걸리는 DAG는 다수의 Task로 나누기⭐
07-3. Variable 관리 vs 코드 관리 둘 중 무엇이 좋은가?
Variable 사용시 코드 푸시 필요성 X
단, 관리/테스트가 안 되어 사고가 날 수 있으니
⭐중요한 SQL등은 코드로 관리하는 것을 추천함⭐
#️⃣ 08. airflow.cfg 의 주요 값 정리
1.DAGs 폴더는 어디에 지정되는가?
- 키 이름 : dags_folder
- 기본값 : /opt/airflow/dags
- 기본적으로 Airflow가 설치된 디렉토리 밑의 dags 폴더
2. DAGs 폴더에 새로운 Dag를 만들면 언제 실제로 Airflow 시스템에서 이를 알게 되나? 이 스캔 주기를 결정해주는 키의 이름이 무엇인가?
- 키 이름 : dag_dir_list_interval
- 기본값 : 300 (초단위_기본값 5분)
3. 이 파일에서 Airflow를 API 형태로 외부에서 조작하고 싶다면 어느 섹션을 변경해야하는가?
- 키 이름 : [api] 섹션의 auth_backend
- 변경값 : airflow.api.auth.backend.basic_auth
- 가장 간단한 방법이 basic_auth임 (id와 pw로 가능)
4. Variable에서 변수의 값이 encrypted가 되려면 변수의 이름에 어떤 단어들이 들어가야 하는데 이 단어들은 무엇일까?
- 단어 목록: password, passwd, secret, authorization, api_key, apikey, access_token
5. 이 환경 설정 파일이 수정되었다면 이를 실제로 반영하기 위해서 해야 하는 일은?
- 액션 : Airflow의 웹 서버와 스케줄러를 다시 시작해야 된다.
- sudo systemctl restart airflow-webserver
- sudo systemctl restart airflow-schduler
- airflow db init은 아님. 이건 백엔드 db가 바뀌었을 때 실행하는 것임
6. Metadata DB의 내용을 암호화하는데 사용되는 키는 무엇인가?
- 키 이름 : fernet_key
7. 타임존
기본은 UTC
- default_timezone : 실제 스케줄 시 참고하는 값들임
- start_date, end_date, schedule이 default_timezone 에 지정된 타임존을 따름
- default_ui_timezone : ui에서 표시할 timezone
- execution_data과 로그시간 : 위의 2개와 상관없이 항상 UTC를 따름 → execution_date 사용할 때는 타임존 고려해서 변환후 사용해야됨
⇒ 그래서 혼동되니, BEST는 모두 다 UTC 기준으로 하는 것이 안 헷갈리고 좋음
8. dag_dir_list_interval
- 위 변수에 설정된 값(분)을 기준으로 ‘dags_folder’ 변수로 지정된 dags 폴더를 주기적으로 스캔함
- 이때 DAG 모듈이 들어있는 모든 파일의 메인함수를 실행함
- ⇒ 개발중인 테스트 코드도 실행될 수 있기 때문에 주의요함
PART2. ETL 코드 개선하기
강의에서는 개선 단계를 1부터 5까지 나눠서 진행했지만,
본 글에서는 개선된 최종코드인 5단계 코드 위주로만 정리한다.
1. 최종 버전 코드 작성하기
코드 전문 보기
from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task
from datetime import datetime
from datetime import timedelta
import requests
import logging
def get_Redshift_connection(autocommit=True):
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
@task
def extract(url):
logging.info(datetime.utcnow())
f = requests.get(url)
return f.text
@task
def transform(text):
lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
records = []
for l in lines:
(name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
records.append([name, gender])
logging.info("Transform ended")
return records
@task
def load(schema, table, records):
logging.info("load started")
cur = get_Redshift_connection()
#트랜잭션 FULL REFRESH)
try:
cur.execute("BEGIN;")
cur.execute(f"DELETE FROM {schema}.name_gender;")
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
cur.execute(sql)
cur.execute("COMMIT;")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
logging.info("load done")
with DAG(
dag_id='namegender_v5',
start_date=datetime(2022, 10, 6),
schedule='0 2 * * *',
max_active_runs=1,
catchup=False,
default_args={
'retries': 1,
'retry_delay': timedelta(minutes=3),
# 'on_failure_callback': slack.on_failure_callback,
}
) as dag:
url = Variable.get("csv_url")
schema = 'hmk9667'
table = 'name_gender'
lines = transform(extract(url))
load(schema, table, lines)
2. Airflow_Variables 설정
주의! 따옴표 쓰면 안 된다!
이렇게 설정한 것은 CLI로도 조회 가능하다.
docker exec -it (스케줄러컨테이너 ID) sh
ls -lt
cd dags
airflow dags list
airflwo tasks list (dag명)
airflow variables (하면 뒤에옵션들 확인 가능)
airflow variables list (등록된 variables 목록 확인 가능)
airflow variables get csv_url
3. Airflow_Connections 설정
4. 실행 (WebUI)
스크립트가 모두 에러 없이 잘 뜨는 것 확인 완료
첫번째 실행 때 코드 오류가 있었지만, 2차 시도때에는 수정하여 에러없이 잘 되었다.
'#️⃣ Data Engineering > Airflow' 카테고리의 다른 글
Airflow Backfill_Execution Date과 StartDate (0) | 2023.06.08 |
---|---|
Airflow 실습_OpenAPI와 Upsert를 이용한 DAG (0) | 2023.06.08 |
Airflow 설치(Docker-compose)와 간단한 실습 (0) | 2023.06.07 |
Airflow란? 구성요소 알아보기 (0) | 2023.06.07 |
데이터 파이프라인(ETL) 정의 및 설계 시 고려할 점 + 실습 (0) | 2023.06.05 |