본문 바로가기

Python

Apache Airflow 기반의 데이터 파이프라인 CHAPTER 7

Provider 외부시스템 이용하기

Prodivder docker

DockerOperator 사용법

  • Airflow와 Docker가 동일한 호스트에서 실행되고 있어야 한다.
  • DockerOperator는 Docker 컨테이너를 실행하기 위해 Docker 데몬과 통신해야 한다. (즉 Docker 데몬이 실행중이어야한다.)
  • docker_url parameter를 통해 Docker 데몬의 URL을 지정하게 된다.

airflow provider docker document

https://airflow.apache.org/docs/apache-airflow-providers-docker/2.7.0/_api/airflow/providers/docker/operators/docker/index.html

  • 터미널에서 아래 명령을 실행해 provider docker 를 설치해준다
 pip install apache-airflow-providers-docker
from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
}

with DAG(
    dag_id='finn_provider_docker',
    default_args=default_args,
    start_date=datetime(2024, 8, 25), 
    schedule_interval=None,  # 즉시 실행
    catchup=False,
) as dag:

    t1 = DockerOperator(
        task_id='run_docker_container',
        image='ubuntu:latest',
        api_version='auto',
        auto_remove=True,
        command="echo hello world",
        docker_url="tcp://10.0.1.100:1234",
        network_mode="bridge",
        do_xcom_push=True
    )

    t1

with를 사용하는 이유는 일반적으로 파일을 열고 닫거나 네트워크 연결을 관리할 때 사용되지만 Airflow DAG를 명시적으로 지정하지 않아도 사용하기위해서 쓴다고한다...
dag = dag 요걸 안해도됨

SparkSubmitOperator 사용해보기

from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.utils.dates import days_ago

# DAG 기본 설정
default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
    'retries': 1,
}

dag = DAG(
    'spark_example',
    default_args=default_args,
    schedule_interval=None,
)

# SparkSubmitOperator
spark_submit_task = SparkSubmitOperator(
    application='/data/pyspark_temp01.py',  # 스파크 애플리케이션 경로
    task_id='finn_provider_spark',
    conn_id='spark_default',  # Airflow 연결 ID, Spark 설치 시 설정
    conf={'spark.master': 'yarn', 'spark.submit.deployMode': 'cluster'},  # Spark 설정
    dag=dag,
)

# DAG의 마지막에 작성
spark_submit_task

실행할 Pyspark 프로그램

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

def main():
    # SparkSession 생성
    spark = SparkSession.builder \
        .appName("SimpleSparkApplication") \
        .getOrCreate()

    input_path = "/data/csv_file.csv"
    df = spark.read.csv(input_path, header=True, inferSchema=True)

    df.show()

    # output_path = "/data/Prep_1.csv"
    # df.write.csv(output_path, mode="overwrite", header=True)

    # SparkSession 종료
    spark.stop()

if __name__ == "__main__":
    main()