#️⃣ 1. Airflow 커스텀 컴포넌트
Airflow는 기본 내장 기능 확장 가능하다 → 공통 작업을 커스텀 오퍼레이터/센서/훅으로 개발 가능 ⇒ 이후 파이썬 라이브러리로 구현하여 구조적으로 만들 수 있음
- 커스텀 훅으로 Airflow가 지원하지 않는 시스템과 연동 가능
- 커스텀 오퍼레이터 개발은 Airflow 기본 내장 Operator로 처리할 수 없는 태스크를 수행할 수 있게 함, 코드 재사용 가능
- 커스텀 센서는 특정(외부) 이벤트 발생할 때까지 대기하는 컴포넌트 구현 가능
#️⃣ 2. 실습
예제 : 영화 평점 API에서 데이터를 가져와서 인기 영화 랭킹을 추출하는 실습
- 영화 평점 데이터는 JSON 포맷
- 데이터 API 호출 시 offset, limit, start_date과 end_date 파라미터를 설정하여 필터링 가능함
- 파이썬 코드에서 API 호출시 session을 이용하고 API 결과의 페이지 처리를 구현해야됨
- 페이지 처리는 제너레이터 이용함
- 실제 DAG 동작 : 스케줄 간격마다 평점데이터를 날짜별로 파티션하여 JSON 출력파일로 덤프함
✅2-1. 커스텀 훅 만들기
모든 훅은 추상 클래스인 BaseHook 클래스의 서브클래스로 생성함
아래는 영화 API에 대한 연결부분을 처리하는 Airflow 커스텀 훅 코드 이다.
import requests
from airflow.hooks.base_hook import BaseHook
class MovielensHook(BaseHook):
"""
conn_id : str
ID of the connection to use to connect to the Movielens API. Connection
is expected to include authentication details (login/password) and the
host that is serving the API.
"""
DEFAULT_SCHEMA = "http"
DEFAULT_PORT = 5000
def __init__(self, conn_id, retry=3):
super().__init__()
self._conn_id = conn_id
self._retry = retry
self._session = None # API 세션 캐싱 추가
self._base_url = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def get_conn(self):
"""
Returns the connection used by the hook for querying data.
Should in principle not be used directly.
"""
if self._session is None: # 캐싱 확인
# Fetch config for the given connection (host, login, etc).
# 웹 UI로 Connection 정보 등록하면, get_connection 함수로 메타스토어 저장된 커넥션 ID에 대한 연결 세부 정보를 가져온다.
config = self.get_connection(self._conn_id)
if not config.host:
raise ValueError(f"No host specified in connection {self._conn_id}")
schema = config.schema or self.DEFAULT_SCHEMA
port = config.port or self.DEFAULT_PORT
self._base_url = f"{schema}://{config.host}:{port}"
# Build our session instance, which we will use for any
# requests to the API.
self._session = requests.Session()
if config.login:
self._session.auth = (config.login, config.password)
return self._session, self._base_url
def close(self):
"""Closes any active session."""
if self._session:
self._session.close()
self._session = None
self._base_url = None
# API methods:
def get_movies(self):
"""Fetches a list of movies."""
raise NotImplementedError()
def get_users(self):
"""Fetches a list of users."""
raise NotImplementedError()
def get_ratings(self, start_date=None, end_date=None, batch_size=100):
"""
Fetches ratings between the given start/end date.
Parameters
----------
start_date : str
Start date to start fetching ratings from (inclusive). Expected
format is YYYY-MM-DD (equal to Airflow's ds formats).
end_date : str
End date to fetching ratings up to (exclusive). Expected
format is YYYY-MM-DD (equal to Airflow's ds formats).
batch_size : int
Size of the batches (pages) to fetch from the API. Larger values
mean less requests, but more data transferred per request.
"""
yield from self._get_with_pagination(
endpoint="/ratings",
params={"start_date": start_date, "end_date": end_date},
batch_size=batch_size,
)
# 페이지 처리
def _get_with_pagination(self, endpoint, params, batch_size=100):
"""
Fetches records using a get request with given url/params,
taking pagination into account.
"""
session, base_url = self.get_conn()
url = base_url + endpoint
offset = 0
total = None
while total is None or offset < total:
response = session.get(
url, params={**params, **{"offset": offset, "limit": batch_size}}
)
response.raise_for_status()
response_json = response.json()
yield from response_json["result"]
offset += batch_size
total = response_json["total"]
✅2-2. 커스텀 오퍼레이터 만들기
시작/종료 날짜 정의와 데이터 파일 저장에 대한 반복적인 코드들이 있다.
이를 커스텀 오퍼레이터로 만들어 코드 반복 최소화한다.
Airflow 모든 오퍼레이터는 BaseOperator 클래스의 서브 클래스로 만들어야된다.
- BaseOperator는 일반적인 동작을 정의하는 제네릭 인수들을 많이 갖고 있으므로, 인수 전달 시 **kwargs 구문 사용함
init (…) :
- 함수에 추가 키워드 인수 전달 가능
- init 함수 위에 @apply_defaults 데코레이터 : 기본 DAG 인수를 커스텀 오퍼레이터에 전달하기 위한 데코레이터
- 실제로 커스텀 오퍼레이터를 정의할 때 apply_defaults 데코레이터를 항상 포함해야됨
execute(self, context) :
메인메서드. context 하나의 파리미터만 보유; context는 Airflow의 모든 콘텍스트 변수를 담고 있는 dict 객체이다.
template_fields = (”변수명” , … ):
커스텀 오퍼레이터에서 템플릿화할 인스턴스 변수들을 Airflow에 알려줌
self.log.
로깅은 BaseOperator에서 제공하는 logger 사용함
import json
import os
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from custom.hooks import MovielensHook
class MovielensFetchRatingsOperator(BaseOperator):
"""
Operator that fetches ratings from the Movielens API (introduced in Chapter 8).
Parameters
----------
conn_id : str
ID of the connection to use to connect to the Movielens API. Connection
is expected to include authentication details (login/password) and the
host that is serving the API.
output_path : str
Path to write the fetched ratings to.
start_date : str
(Templated) start date to start fetching ratings from (inclusive).
Expected format is YYYY-MM-DD (equal to Airflow's ds formats).
end_date : str
(Templated) end date to fetching ratings up to (exclusive).
Expected format is YYYY-MM-DD (equal to Airflow's ds formats).
batch_size : int
Size of the batches (pages) to fetch from the API. Larger values
mean less requests, but more data transferred per request.
"""
# 템플릿화할 인스턴스 변수 목록
template_fields = ("_start_date", "_end_date", "_output_path")
@apply_defaults # 필수
# 추가 키워드 인수 추가 가능
def __init__(
self,
conn_id,
output_path,
start_date="{{ds}}",
end_date="{{next_ds}}",
batch_size=1000,
**kwargs,
):
super(MovielensFetchRatingsOperator, self).__init__(**kwargs)
self._conn_id = conn_id
self._output_path = output_path
self._start_date = start_date
self._end_date = end_date
self._batch_size = batch_size
# pylint: disable=unused-argument,missing-docstring
def execute(self, context): # 메인 메소드
hook = MovielensHook(self._conn_id) # 위에서 만든 커스텀 훅 인스턴스 생성
try: # 예외 처리 추가
self.log.info( # 로깅은 BaseOperator에서 제공하는 logger 사용함
f"Fetching ratings for {self._start_date} to {self._end_date}"
)
ratings = list(
hook.get_ratings(
start_date=self._start_date,
end_date=self._end_date,
batch_size=self._batch_size,
)
)
self.log.info(f"Fetched {len(ratings)} ratings")
finally:
# Make sure we always close our hook's session.
hook.close() # 훅을 닫아서 리소스 해제함
self.log.info(f"Writing ratings to {self._output_path}")
# Make sure output directory exists.
output_dir = os.path.dirname(self._output_path)
os.makedirs(output_dir, exist_ok=True)
# Write output as JSON.
with open(self._output_path, "w") as file_:
json.dump(ratings, fp=file_)
✅2-3. 커스텀 센서 만들기
센서는 오퍼레이터의 한 유형으로, DAG 안에서 다운스트림 태스크를 실행하기 전에 특정 조건이 충족될 때까지 대기하기 위해 사용된다.
위 예제에서는 DAG를 계속 실행하기 전에 주어진 날짜의 평점 데이터를 사용 가능한지 체크하는 센서를 구현한다.
커스텀 센서는 BaseSensorOperator 클래스를 상속하고, poke 메서드를 구현해야된다.
poke ()
메인 메소드. 리턴값으로 Boolean 반환함. 센서 상태가 True/False임
- False: 센서가 상태를 다시 체크할 때까지 몇 초 정도 대기상태로 들어감. 이는 True나 타임아웃이 될때까지 반복함
- True : 다운스트림 태스크를 실행함
"""Module containing file system sensors."""
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from custom.hooks import MovielensHook
class MovielensRatingsSensor(BaseSensorOperator):
"""
Sensor that waits for the Movielens API to have ratings for a time period.
start_date : str
(Templated) start date of the time period to check for (inclusive).
Expected format is YYYY-MM-DD (equal to Airflow's ds formats).
end_date : str
(Templated) end date of the time period to check for (exclusive).
Expected format is YYYY-MM-DD (equal to Airflow's ds formats).
"""
template_fields = ("_start_date", "_end_date") # 템플릿화할 변수 설정
@apply_defaults # 필수
# 추가 인수 추가
def __init__(self, conn_id, start_date="{{ds}}", end_date="{{next_ds}}", **kwargs):
super().__init__(**kwargs)
self._conn_id = conn_id
self._start_date = start_date
self._end_date = end_date
# pylint: disable=unused-argument,missing-docstring
def poke(self, context): # 메인 구현 함수
hook = MovielensHook(self._conn_id) # 훅 생성
try: # 예외처리
next( # 훅에서 하나 가져오는 것을 시도함 (첫번째 레코드를 next로 가져옴)
hook.get_ratings(
start_date=self._start_date, end_date=self._end_date, batch_size=1
)
)
# If no StopIteration is raised, the request returned at least one record.
# This means that there are records for the given period, which we indicate
# to Airflow by returning True.
self.log.info(
f"Found ratings for {self._start_date} to {self._end_date}, continuing!"
)
return True # next 성공이면 적어도 하나의 레코드가 있으므로 true 반환
except StopIteration: # StopIteration 실패
self.log.info(
f"Didn't find any ratings for {self._start_date} "
f"to {self._end_date}, waiting..."
)
# If StopIteration is raised, we know that the request did not find
# any records. This means that there a no ratings for the time period,
# so we should return False.
return False # False는 레코드 콜렉션 비어있다는 뜻이므로 false 반환
finally:
# Make sure we always close our hook's session.
hook.close() # 훅 close하여 리소스 해제
✅2-4. 컴포넌트 패키지화
컴포넌트 배포하는 더 나은 방법은 파이썬 패키지에 코드 넣는 것이다.
- 장점: DAG와 별도 코드 유지함으로써, 커스텀 코드에 대한 CI/CD 프로세스 구성 가능 &코드 공유 더 쉽고 협업에 도움됨
파이썬 패키지 부트스트랩 작업
- 디렉터리 생성
- /src/폴더명
- 기본 구조 생성 및 setup.py 작성
airflow-movielens
ㄴsetup.py
ㄴ src
ㄴ airflow-movielens
ㄴ __init__.py
ㄴ hooks.py
ㄴ operators.py
ㄴ sensors.py
3. 패키지 배포
a. 깃헙 저장소 설치
b. PyPI에 pip 패키지 피드 사용해서 설치
c. 파일 직접 설치
참고 : [책] Apache Airflow 기반의 데이터 파이프라인
'#️⃣ Data Engineering > Airflow' 카테고리의 다른 글
Ch.11 Airflow로 효율적인 데이터 파이프라인 작성하는 방법 (0) | 2024.01.22 |
---|---|
Ch9. Airflow 무결성 및 단위 테스트, 환경 구성 (0) | 2023.12.19 |
Ch7. Airflow 외부 시스템과 통신하기 (0) | 2023.12.07 |
Ch6. Airflow 워크플로 트리거 (0) | 2023.12.04 |
Ch5. Airflow 태스크 간 의존성 (0) | 2023.11.22 |