⭐핵심 요약
DAG 무결성 테스트와 개별 단위 테스트 설명 중점적임
책에서는 대중적으로 많이 쓰이고 격리 기능 포함된 pytest를 이용하여 설명함
[ 참고 ]
- 보통 단위 > 통합 > 승인 테스트 순서로 진행함
- 테스트는 각각 격리된 환경에서 진행되어야 한다. ⇒ 이를위해 목업을 이용 (pytest-mock)=>
(목업은 특정 작업이나 객체를 모조로 만드는 것)
#️⃣ DAG 무결성 테스트 (with. CI/CD)
모든 DAG의 무결성 (e.g. DAG에 사이클 있는지 여부, DAG의 task id 고유한 경우 등) 검사한다.
이때 CI/CD 파이프라인을 이용하여 변경된 DAG코드를 지속적으로 확인 및 검증 후 프로덕션 환경으로 배포할 수 있게 한다.
과정
- pytest 설치
- 프로젝트 최상단 폴더에 별도에 /tests 디렉토리 생성
- 검사할 코드 그대로 복사한 후 파일명에 test_ 접두사 붙이기
- DAG 무결성 테스트 코드 작성
- CI/CD 를 위한 YAML 파일 작성 (Github Action등)
- Flake8, Pylint, Black등으로 정적 코드 분석 가능
[ 예제 코드 ] DAG 무결성 테스트 코드
"""Test integrity of DAGs."""
import glob
import importlib.util
import os
import pytest
from airflow.models import DAG
# 모든 DAG 파일경로
DAG_PATH = os.path.join(os.path.dirname(__file__), "..", "..", "dags/**.py")
DAG_FILES = glob.glob(DAG_PATH)
@pytest.mark.parametrize("dag_file", DAG_FILES)
def test_dag_integrity(dag_file):
# 파일 로드하기
module_name, _ = os.path.splitext(dag_file)
module_path = os.path.join(DAG_PATH, dag_file)
mod_spec = importlib.util.spec_from_file_location(module_name, module_path)
module = importlib.util.module_from_spec(mod_spec)
mod_spec.loader.exec_module(module)
# 파일에서 발견된 모든 DAG 클래스의 객체
dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)]
assert dag_objects # 검사1
for dag in dag_objects:
dag.test_cycle() # 검사 2
- @pytest.mark.parametrize : 발견된 모든 파이썬 파일에 대해 테스트 진행함
- assert dag_objects : /dags 경로 있는 모든 파이썬 파일에 DAG객체가 하나 이상 포함되어 있는지 확인
- 목적 : 특정 경로에 DAG 파일만 저장되어 있는지 확인하기 위함
- dag.test_cycle() : DAG 객체에 순환 주기 존재 여부 확인
- 만약 이를 직접 검사하려면 assert dag.dag_id.startswith(”dag이름접미사”)로 코드 바꾸면 됨
#️⃣ 개별 단위 태스트
코드 참고 : https://github.com/HyeM207/data-pipelines-with-apache-airflow/tree/main/chapter09/tests/dags
✅ 1. 목업 이용한 테스트 작성법
테스트는 각각 격리된 환경에서 진행되어야 한다. ⇒ 이를위해 목업을 이용 (pytset-mock)
(목업은 특정 작업이나 객체를 모조로 만드는 것)
- mocker 객체로, mocker.patch.object로 객체 속성 패치
- assert로 호출 횟수 검증 및 mock_get으로 속성들 이용하여 동작 검증
[ 예제 코드 ]
from airflow.models import Connection
from airflow.operators.bash import BashOperator
# 목업 구현시, 호출되는 위치에서 목업을 구성해야됨
from airflowbook.operators.movielens_operator import (
MovielensPopularityOperator,
MovielensHook,
)
# mocker : 목업 객체는 런타임시 임의의 값으로 존재
def test_movielenspopularityoperator(mocker):
# 목업 객체로 객체 속성 패치함
mock_get = mocker.patch.object(
MovielensHook, # 패치할 객체
"get_connection", # 패치할 객체 내 함수
return_value=Connection(conn_id="test", login="airflow", password="airflow"), # 반환 값
)
task = MovielensPopularityOperator(
task_id="test_id",
conn_id="testconn",
start_date="2015-01-01",
end_date="2015-01-03",
top_n=5,
)
result = task.execute(context=None)
assert len(result) == 5
assert mock_get.call_count == 1 # 한번만 호출된 것인지 확인
mock_get.assert_called_with("testconn") # 예상값이 호출되었는지 확인
def test_example():
task = BashOperator(task_id="test", bash_command="echo 'hello!'", xcom_push=True)
result = task.execute(context={})
assert result == "hello!"
- import 부분 : 목업 구현 시 정의되는 위치가 아닌 호출되는 위치에서 목업을 구성해야됨
- execute() : 오퍼레이터를 실행하기 위해 호출할 수 있는 가장 낮는 수준의 함수
- 모든 오퍼레이터가 해당 기능을 수행하기 위해 구현되는 메서드
- mock_get = mocker.patch.object() : 목업 객체를 변수에 할당하여 목업된 객체에 대한 모든 호출을 캡쳐하고 지정된 입력, 호출 수 등을 확인 가능함
- 이때 패치할 객체와 함수, 반환 값을 설정해준다.
- assert mock_get.call_count == 1, mock_get.assert_called_with("testconn"): 한 번만 호출된 것인지, 예상값이 호출되었는지 확인함
✅ 2. 디스크 파일 이용하는 테스트 작성법
파이썬의 임시 저장소 작업을 위한 tempfile 모듈을 사용
pytest에서는 tmp_dir및 tmp_path라는 tempfile모듈에 대한 편리한 사용 방법 제공함
- tmp_path인수 이용: 각 테스트 시 호출되는 함수를 나타냄 (’픽스처’라고 함)
- 픽스처는 모든 테스트 영역에서 적용가능함
[ 예제 코드 ]
import csv
import json
from pathlib import Path
from airflowbook.operators.json_to_csv_operator import JsonToCsvOperator
def test_json_to_csv_operator(tmp_path: Path): #tmp_path는 고정으로 사용
print(tmp_path.as_posix()) # 경로 출력함
# tmp_path이용하여 경로 정의
input_path = tmp_path / "input.json"
output_path = tmp_path / "output.csv"
# 파일에 적을 내용
input_data = [
{"name": "bob", "age": "41", "sex": "M"},
{"name": "alice", "age": "24", "sex": "F"},
{"name": "carol", "age": "60", "sex": "F"},
]
with open(input_path, "w") as f:
f.write(json.dumps(input_data))
# Run task
operator = JsonToCsvOperator(
task_id="test", input_path=input_path, output_path=output_path
)
operator.execute(context={})
# Read result
with open(output_path, "r") as f:
reader = csv.DictReader(f)
result = [dict(row) for row in reader]
# Assert (인풋 값과 실제 적은 데이터가 같은지 확인함)
assert result == input_data
✅ 3. 태스크 콘텍스트 작업하는 테스트 작성법
- operator.execute(context = {}) 로 테스트 할 때는 ds 변수를 사용할 수 없으므로, 실제 메서드인 operator.run()을 호출해야된다.
- test 실행 시, Airflow 메타스토어 문제가 있다면, (1) 모든 db호출에 대해 목업 환경을 구성하거나 (2)airflow db init 으로 쿼리할 메타스토어를 설정해줘야 한다.
- 만약 DAG를 사용하는 테스트가 여러번 있으면, pytest 픽스처의 conftest.py파일 을 사용하여 하위 디렉터리의 여러파일을 재사용할 수 있게 한다.
⇒ 즉, 테스트 파일이 있는 최상단 위치에 conftest.py를 만들고, 테스트 전체에서 DAG를 재사용하기 위한 pytest 픽스처 코드를 작성한다.
[ 예시 코드 ]
import datetime
import pytest
from airflow import DAG
@pytest.fixture
def test_dag(tmpdir):
return DAG(
"test_dag",
default_args={"owner": "airflow", "start_date": datetime.datetime(2018, 1, 1)},
template_searchpath=str(tmpdir),
schedule_interval="@daily",
)
✅ 4. 외부 시스템 연결하는 테스트 작성법
pytest-docker-tools를 이용하면 도커로 테스트할 컨테이너 환경을 만들어서 테스트 할 수 있다. (e.g. Postgres 데이터베이스)
- fetch(repository=””) : 실행 중인 시스템에서 docker pul을 트리거하고 가져온 이미지를 반환함
- 이때 fetch함수 자체는 pytest 픽스처이므로 바로 호출(사용)이 불가하므로, 테스트에 매개변수로 전달한다.
- container(image=””, ports=””, environments=””, volumns={} … ) : 컨테이너 시작함 (pytest 픽스처)
- ports : 호스트 포트와 컨테이너 포트 매핑
- key : 컨테이너 포트,
- value : 호스트 시스템에 매핑된 ports (None 설정시 호스트의 임의의 열린 포트로 매핑)
- pytest-docker-tools를 이용하면 호스트 시스템에 할당된 포트를 픽스처 자체의 포트를 내부적으로 매핑한다.
- volumns : 파일 마운트로 시작 스크립트 지정 가능함
- key : 호스트의 해당 파일 절대 경로
- value (dict형)
- key: 마운트 유형 (예. bind)
- value : 컨테이너 내부 위치 값
- ports : 호스트 포트와 컨테이너 포트 매핑
[ 예제 코드 ]
import os
import pytest
from airflow.models import Connection
from pytest_docker_tools import fetch, container
from airflowbook.operators.movielens_operator import (
MovielensHook,
MovielensToPostgresOperator,
PostgresHook,
)
# 1. Docker 컨테이너 환경 준비
postgres_image = fetch(repository="postgres:11.1-alpine")
postgres = container(
image="{postgres_image.id}",
environment={"POSTGRES_USER": "testuser", "POSTGRES_PASSWORD": "testpass"},
ports={"5432/tcp": None},
volumes={
os.path.join(os.path.dirname(__file__), "postgres-init.sql"): {
"bind": "/docker-entrypoint-initdb.d/postgres-init.sql"
}
},
)
# mocker 이용한 테스트코드
def test_movielens_to_postgres_operator(mocker, test_dag, postgres):
mocker.patch.object(
MovielensHook,
"get_connection",
return_value=Connection(conn_id="test", login="airflow", password="airflow"),
)
mocker.patch.object(
PostgresHook,
"get_connection",
return_value=Connection(
conn_id="postgres",
conn_type="postgres",
host="localhost",
login="testuser",
password="testpass",
port=postgres.ports["5432/tcp"][0],
),
)
task = MovielensToPostgresOperator(
task_id="test",
movielens_conn_id="movielens_id",
start_date="{{ prev_ds }}",
end_date="{{ ds }}",
postgres_conn_id="postgres_id",
insert_query=(
"INSERT INTO movielens (movieId,rating,ratingTimestamp,userId,scrapeTime) "
"VALUES ({0}, '{{ macros.datetime.now() }}')"
),
dag=test_dag,
)
pg_hook = PostgresHook()
row_count = pg_hook.get_first("SELECT COUNT(*) FROM movielens")[0]
assert row_count == 0 # 1. 먼저 테이블 행의 개수가 0인지 확인
pytest.helpers.run_airflow_task(task, test_dag) # 2. test task 실행
row_count = pg_hook.get_first("SELECT COUNT(*) FROM movielens")[0]
assert row_count > 0 # 3. 데이터 삽입되었는지 확인함
#️⃣ 개발시 테스트 사용하기
✅ 1. 로컬과 운영환경에서 디버깅 방법
- 로컬 환경에서는 IDE PyCharm같은거 사용해서 중단점(BP) 설정 후 디버깅 하면서, 변수 상태나 콘텍스트 값을 검사할 수 있다.
- 만약 로컬이 아닌 실제 운영 환경에서라면 활용할 수 있는 디버깅 방법은 아래와 같다.
- 로컬 PyCharm디버거를 원격으로 실행
- 커맨드 라인 디버거(파이썬 내장 디버거 pdb)이용
위 코드 추가하면 디버거 작동함import pdb; pdb.set_trace()
✅ 2. 실제 프로덕션 환경 재현하기
완벽한 프로덕션 환경 재현은 어려움 → 가능한 한 구현할 수 있는 프로덕션 환경을 만들어 테스트한다.
- Whirl 프로젝트 이용 : 이는 도커 컨테이에서 운영환경의 모든 구성 요소를 시뮬레이션하고, docker compose로 이 모든 구성 요소를 관리하는 것임
- 제어하기 위한 CLI 유틸리티 제공하지만, 모든 것이 도커 이미지로 제공되지 않는다는 단점 존재
- 격리된 DTAP 환경 설정
- DTAP (Develop 개발 Test 테스트 Acceptance 인수 Production 프로덕션)
Airflow 프로젝트 관점에서는 Github 사용시 개발 브랜치와 프로덕션 브랜치등 하나의 전용 브랜치를 만드는 것을 권장한다.
참고 : [책] Apache Airflow 기반의 데이터 파이프라인
'#️⃣ Data Engineering > Airflow' 카테고리의 다른 글
Ch.11 Airflow로 효율적인 데이터 파이프라인 작성하는 방법 (0) | 2024.01.22 |
---|---|
Ch8. Airflow 커스텀 컴포넌트 빌드 (3) | 2023.12.12 |
Ch7. Airflow 외부 시스템과 통신하기 (0) | 2023.12.07 |
Ch6. Airflow 워크플로 트리거 (0) | 2023.12.04 |
Ch5. Airflow 태스크 간 의존성 (0) | 2023.11.22 |