본문 바로가기

Python

CHAPTER 5 태스크 간 의존성 정의하기

CHAPTER 5

태스크 간 의존성 정의하기

이 장에서는 다음과 같은 내용을 다룹니다. 
- Airflow DAG에서 태스크 의존성을 정의하는 방법을 확인합니다. 
- 트리거 규칙이 작업 실행에 주는 영향에 대해 기존 지식을 제공합니다. 
- XCom을 이용하여 태스크 사이의 상태 공유 방법을 설명합니다. 
- Airflow 2의 Taskflow API를 사용해 파이썬을 많이 사용하는 DAG를 단순화하는 방법을 설명합니다. 

5.1 기본 의존성 유형

  • 다양한 태스크 의존성 패턴을 살펴보겠습니다.
  • 태스크의 선형 체인 유형과 팬아웃/팬인 유형이 모두 포함됩니다.

5.1.1 선형 의존성 유형

  • 단일 선형 태스크 체인

리스트 5.1 로켓 사진 가져오기 DAG의 태스크

download_launches=BashOperator(..)
get_pictures=PythonOperator(...)
notify=BashOperator(...)

  • 이 유형의 DAG에서는 이전의 태스크 결과가 다음 태스크의 입력 값으로 사용되기 때문에, 다음 태스크로 이동하기 전에 각 태스크를 완료해야 합니다.

리스트 5.2 태스크 간에 의존성 추가하기

download_launches >> get_pictuers # 작업 의존성을 각각 설정하기 
get_pictures >> notify

download_launches >> get_pictures >> notify # 또는 여러 개의 의존성을 설정할 수 있습니다. 
  • 두가지 의존성의 차이는 명시적인가 아닌가

5.1.2 팬인/팬아웃(Fan-in/Fan-Out) 의존성

  • 태스트의 선형 체인 외에도 Airflow의 태스크 의존성을 사용하여 태스크간 복잡한 의존성 구조를 만들 수 있습니다.

  • 의존성 측면에서 이 DAG를 고려하면 데이터를 정제하기 전에 원격지에서 데이터를 가져와야 한다.

리스트 5.3 병렬로 실행되는 선형 의존성 추가하기

fetch_weather >> clean_weather
fetch_sales >> clean_sales

  • 두 가져오기 태스크의 업스트림에 DAG의 시작을 나타내는 더미 태스크를 추가할 수도 있습니다.
  • 작업이 모두 시작되는 DAG 시작 시 발생하는 암묵적인 팬아웃을 설명하는 데 도움이 됩니다.

리스트 5.4 팬아웃 의존성에 추가하기

from airflow.operators.dummy import DummyOperator

start=DummyOperator(task_id="start") # 더미 시작 태스크 생성하기 
start >> [fetch_weather, fetch_sales] # 팬아웃 의존성 태스크 생성하기

  • 병렬 태스크와 달리 결합된 데이터 세트를 만들기 위해서는 두가지 데이터에 대한 입력이 모두 필요합니다.
  • 하나의 태스크가 여러 업스트림 태스크에 영향을 받는 구조는 단일 다운스트림 태스크가 여러 업스트림 태스크에 의존성을 갖는데, 이런 구조를 팬인 구조라고 합니다.

리스트 5.5 팬인 의존성 추가하기

[clean_weather, clean_sales] >> join_datasets

join_datasets 태스크로 팬인 후 DAG의 나머지 부분은 모델 학습 및 모델 배포를 위한 선형 태스크 체인으로 구성됩니다.

리스트 5.6 나머지 태스크에 의존성 추가하기

join_datasets >> train_model >> deploy_model

태스크를 결합하면 그림에 표시된 DAG와 유사한 결과를 얻을 수 있습니다.


5.2 브랜치하기

회사 동료로부터 새로운 소식을 들었다고 가정해 봅시다. 변경이 있더라도 모델 학습이 중단되어서는 안됩니다. 또한 향후 분석에서 과거 판매 데이터를 계속 사용할 수 있도록 이전 시스템과 새로운 시스템 모두, 정상 동작하기를 바랍니다. 이 문제를 해결하기 위해서는 어떻게 해야 할까요?

5.2.1 태스크 내에서 브랜치 하기

첫 번째 접근 방식은 수집 태스크를 다시 작성하여 실행 날짜를 통해 판매 데이터 수집 및 처리를 위한 두 개의 코드로 분리할 수 있습니다.

리스트 5.7 정제 태스크 안에서 브랜치하기

def _clean_sales(**context): # 정체 태스크 
    if context["execution_date"] < ERP_CHANGE_DATE:
        _clean_sales_old(**context)
    else:
        _clean_sales_new(**context)

...
clean_sales_data=PythonOperator(
    task_id="clean_sales",
    python_callable=_clean_sales,
)

이 예에서 _clean_sales_old는 이전 판매 형식에 대한 데이터를 정제하는 함수이고 _clean_sales_new는 새 형식에 대한 동일한 태스크를 수행하는 함수입니다. 두 함수의 결과가 호환되는 한, 나머지 두 ERP 시스템 간 차이로 DAG는 변경하지 않아도 됩니다.


리스트 5.8 수집 태스크 내에서 브랜치하기

def _fetch_sales(**context): # 수집 태스크 
    if context["exeution_date"] < ERP_CHANGE_DATE:
        _fetch_sales_old(**context)
    else:
        _fetch_sales_new(**context)
    ....

DAG는 이를 통해 초기 데이터 수집/정제 작업을 수행하면 데이터 소스와 무관하게 일관된 형식으로 판매 데이터를 처리할 수 있습니다.

❗️어렵게 설명했지만 그냥 태스크(함수) 내에서 조건문으로 분기처리하는 것을 브랜치라고 표현했다.

5.2.2 DAG 내부에서 브랜치하기

두 번째 방법은 두 개의 개별 태스크 세트를 개발하고 DAG가 이전 또는 새로운 ERP 시스템에서 데이터 수집 작업 실행을 선택할 수 있도록 하는 것입니다.

리스트 5.9 추가적인 수집/정제 테스크 추가하기

fetch_sales_old=PythonOperator (...)
clean_sales_old=PythonOperator (...)

fetch_sales_new=PythonOperator (...)
clean_sales_new=PythonOperator (...)

fetch_sales_old >> clean_sales_old
fetch_sales_new >> clean_sales_new

이제 나머지 태스크를 DAG에 연결하고 Airflow가 언제 어떤 작업을 실행해야 하는지 확인합니다.

  • BranchPythonOperator 를 통해서 다운스팀림 태스크 세트중 선택할 수 있는 기능을 제공한다.

❗️조건을 평가해서 태스크 ID를 반환하고(하나 이상일 수 있다)반환된 태스크가 실행된다.

리스트 5.10 BranchPythonOperator를 사용한 브랜치 작업

def _pick_erp_system(**context):
    ... # 여기에 조건플 평가하는 코드가 작성되야 한다. 

pick_erp_system=BranchPythonOperator(
    task_id="pick_erp_system",
    python_callable=_pick_erp_system,
)

다음은 조건을 평가하는 코드가 추가된 BranchPythonOperator이다.

def _pick_erp_system(**context):
    if context ["execution_date"] < ERP_CHANGE_DATE:
        return "fetch_sales_old"
    else:
        return "farch_sales_new" # 리턴 값으로 태스크의 이름을 문자열로 전달해주면 된다. 

pick_erp_system=BranchPythonOperator(
    task_id="pick_erp_system",
    python_callable=_pick_erp_system,
)

pick_erp_system >> [farch_sales_old, fatch_sales_new]

이제 다음의 의존성을 추가한다.

리스트 5.12 시작 태스크에 브랜치 연결하기

start_task >> pick_erp_system

[clean_sales_old, clean_sales_new] >> join_datasets

❗️하지만 이렇게 되면 join_datasets는 모든 다운 스트림 작업을 건너뛰게 됩니다. 기본적으로 업스트림 태스크가 모두 성공적으로 완료되어야 하기 때문입니다.


trigger_rule 을 통해서 규칙을 정의할 수 있습니다.

  • 이를 위한 방법 중 하나는 트리거 규칙을 none_failed 로 변경하는 것입니다.

리스트 5.14 join_datasets 태스크의 트리거 규칙 수정하기

join_datasets=pyhtonOperator(
    ..., 
    trigger_rule="none_failed",
)
  • 모든 상위 태스크가 성공해야 해당 태스크를 실행할 수 있는 규칙은 all_succes로 설정합니다.
  • none_failed 로 설정하면 모든 상위 항목이 실행 완료 및 실패가 없을 시에 즉시 작업이 실행됩니다.

❗️이 방식의 단점은 join_datasets 태스크에 연결되는 태스크가 3개라는 것입니다. join_datasets 태스크에 입력하는 특성이 잘 반영되지 않게 됩니다. DAG에 서로 다른 브랜치를 결합하는 더미 태스크를 추가하여 브랜치 조건을 명확하게 합니다.


Airflow에서 제공하는 내장 DummyOperator를 사용하여 더미 작업을 DAG에 추가할 수 있습니다.

리스트 5.15 명확성을 위해 더미 조인 태스크 추가하기

from airflow.operators.dummy import DummyOperator

join_branch=DummyOperator(
    task_id="join_erp_branch",
    trigger_rule="none_failed"
)

[clean_sales_old, clean_sales_new] >> join_branch
join_branch >> join_datasets

5.3 조건부 태스크

조건부 배포를 구현하는 또 다른 방법은 배포 태스크 자체를 조건부화하는 것입니다. 즉, 미리 정의된 조건에 따라서만 실행됩니다. Airflow에서는 해당 조건을 테스트하고 조건이 실패할 경우 모든 다운스트림 작업을 건너뛰는 태스크를 DAG에 추가하여 태스크를 조건부화할 수 있습니다.

5.3.1 태스크 내에서 조건

리스트 5.16 태스크 내에서 조건 구하기

def _delpoy(**context):
    if context["execution_date"] == ...:
        deploy_model()

deploy_PythonOperator(
    task_id="deploy_model",
    python_callable=_deploy,
)

5.3.2 조건부 태스크 만들기

리스트 5.17 DAG에서 조건부 빌드하기

def _latest_only(**context):
    ...
    latest_only=PythonOperator(
        task_id="latest_only",
        python_callable=_latest_only
    )
latest_only >> deploy_model

  • AirflowSkipException의 등장

리스트 5.18 _latest_only 조건 함수 구현하기

from airflow.exception import AirflowSkipException

def _latest_only(**context):
    left_window=context["dag"].follwing_schedule(context["execution_date"]) # 실행 윈도우에서 경계를 확인합니다. 
    rigth_window=context["dag"].follwing_schedule(left_window)

    now=pendulum.now("UTC") # 현재 시간이 윈도우 안에 있는지 확인합니다. 
    if not left_window < now <= rigth_window:
        raise AirflowSkipException("Not the most recont run!")

5.3.3 내장 오퍼레이터 사용하기

가장 최근 실행한 DAG만 실행하는 예를 구현할 때, Airflow의 내장 클래스인 LastOnlyOperator 클래스를 사용할 수 있습니다. LastOnlyOperator를 사용하면 조건부 배포를 구현하기 위해 복잡한 로직을 작성할 필요가 없습니다.

리스트 5.19 내장 LastOnlyOperator 사용하기

from airfolw.operators.latest_only import LatestOnlyOperator

latest_only=LatestOnlyOperator(
    task_id="latest_only",
    dag=dag
)

join_datasets >> train_model >> deploy_model
latest_only >> deploy_model

5.4 트리거 규칙에 대한 추가 정보

Airflow는 태스크 실행 시기를 어떻게 결정할까요? 그것을 위해 트리거 규칙이 필요합니다.

5.4.1 트리거 규칙이란?

트거 규칙은 태크의 의존성 기능과 같이 Airflow가 태스크가 실행 준비가 되어 있는지 여부를 결정하기 위한 필수적인 조건입니다.

  • 기본 트리거 규칙은 all_success

5.4.2 실패의 영향

만약 실행 중에 태스크 중 하나가 오류를 발생시키면 어떻게 될까요?

5.4.3 기타 트리거 규칙

Airflow는 여러 다른 트리거 규칙도 지원합니다.

5.5 태스크 간 데이터 공유

Airflow의 XCom을 사용하여 태스크 간에 작은 데이터를 공유할 수 있습니다. XCom은 기보적으로 태스크 간에 메시지를 교환하여 특정 상태를 공유할 수 있게 합니다.

5.5.1 XCom을 사용하여 데이터 공유하기

xcom_push(key="model_id", value=model_id)

시스템 환경변수에 등록해서 사용하는것과 비슷해 보인다


반대로 xcom_pull메서드를 사용하여 다른 태스크에서 XCom값을 확인할 수 있습니다.

xcom_pull(
    task_id="train_model", key="model_id"
)

task_id가 필요한 것으로 봐서 서버가 떠있는동안 메모리에 저장되는거 같다.


또한 템플릿을 통해 XCom 값으 참조할 수 있습니다.

리스트 5.22 템플릿에서 XCom 값 사용하기

def _deploy_model(templates_dict, **context):
    model_id=templates_dict["model_id"]
    print(f"Deploying model {model_id}")

deploy_model=PythonOperator(
    task_id="deploy_model",
    python_callable=_deply_model,
    templates_dict={
        "model_id": "{{tesk_instance.xcom_pull(
                task_ids='train_model', key='model_id')}}"
    },
)

BashOperator에는 xcom_push 옵션이 있습니다.

리스트 5.23 리턴을 사용하여 XCom 게시하기

def_train_model(**context):
    model_id=str(uuid.uuid4())
    return model_id

5.5.2 XCom 사용 시 고려사항

  • 풀링 태스크는 필요한 값을 사용하기 위해 태스크 간에 묵시적인 의존 성이 필요합니다 (DAG에 표시되지 않음)
  • XCom은 오퍼레이터의 원자성을 무너뜨리는 패턴이 될 가능성이 있습니다.(토큰 만료등)
  • XCom이 저장하는 모든 값은 직렬화를 지원해야 한다는 기술적 한계 가 존재합니다.
    (람다 또는 여러 다중 멀티프로세스 관련 클래스 같은 일부 파이썬 유형 은 XCom에 저장할 수 없습니다)

❗️Airflow의 메타스토어에 저장되며 다음과 같이 크기가 제한됩니다.

  • SQLite- BLOB 유형으로 저장, 2GB 제한
  • PostgreseL - BYTEA 유형으로 저장, 1GB 제한
  • 간단한 인자 정도만 넘겨라

5.6 Taskflow API로 파이썬 태스크 연결하기

Taskflow API를 사용해서 decorator 기반 API를 사용할수 있습니다.

5.6.1 Taskflow API로 파이썬 태스크 단순화하기

def _train_model (**context) :
    model_id=str (uuid.uuid4())
    context ["task_instance"].xcom_push(key="model_id",value=model_id) 

def _deploy_model (**context) :
    model_id=context ["task_instance"] .xcom_pull(
    task_ids="train_model", key="model_id"
    )
    print(f"Deploying model {model_id}")
with DAG(...) as dag:
    ...
    train_model=PythonOperator (
    tesk_id="train_model", python_callable=_train_model )

    deploy_model=PythonOperator ( task_id="deploy_model",
    python_callable=_deploy_model)
join datasets » train_model »

5.6.2 Taskflow API를 사용하지 않는 경우

장점

  • 태스크 간 의존성을 좀 더 간단하게 구현할 수 있도록 제공합니다.
  • 태스크 간의 의존성을 숨기지 않고 태스크 간의 값을 명시적으로 전달합니다.

단점

  • PythonOperator를 사용하여 구현되는 파이썬 태스크로 제한된다.