Provider 외부시스템 이용하기
Prodivder docker
DockerOperator 사용법
- Airflow와 Docker가 동일한 호스트에서 실행되고 있어야 한다.
- DockerOperator는 Docker 컨테이너를 실행하기 위해 Docker 데몬과 통신해야 한다. (즉 Docker 데몬이 실행중이어야한다.)
docker_url
parameter를 통해 Docker 데몬의 URL을 지정하게 된다.
airflow provider docker document
- 터미널에서 아래 명령을 실행해 provider docker 를 설치해준다
pip install apache-airflow-providers-docker
from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
}
with DAG(
dag_id='finn_provider_docker',
default_args=default_args,
start_date=datetime(2024, 8, 25),
schedule_interval=None, # 즉시 실행
catchup=False,
) as dag:
t1 = DockerOperator(
task_id='run_docker_container',
image='ubuntu:latest',
api_version='auto',
auto_remove=True,
command="echo hello world",
docker_url="tcp://10.0.1.100:1234",
network_mode="bridge",
do_xcom_push=True
)
t1
with를 사용하는 이유는 일반적으로 파일을 열고 닫거나 네트워크 연결을 관리할 때 사용되지만 Airflow DAG를 명시적으로 지정하지 않아도 사용하기위해서 쓴다고한다...
dag = dag 요걸 안해도됨
SparkSubmitOperator 사용해보기
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.utils.dates import days_ago
# DAG 기본 설정
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'retries': 1,
}
dag = DAG(
'spark_example',
default_args=default_args,
schedule_interval=None,
)
# SparkSubmitOperator
spark_submit_task = SparkSubmitOperator(
application='/data/pyspark_temp01.py', # 스파크 애플리케이션 경로
task_id='finn_provider_spark',
conn_id='spark_default', # Airflow 연결 ID, Spark 설치 시 설정
conf={'spark.master': 'yarn', 'spark.submit.deployMode': 'cluster'}, # Spark 설정
dag=dag,
)
# DAG의 마지막에 작성
spark_submit_task
실행할 Pyspark 프로그램
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
def main():
# SparkSession 생성
spark = SparkSession.builder \
.appName("SimpleSparkApplication") \
.getOrCreate()
input_path = "/data/csv_file.csv"
df = spark.read.csv(input_path, header=True, inferSchema=True)
df.show()
# output_path = "/data/Prep_1.csv"
# df.write.csv(output_path, mode="overwrite", header=True)
# SparkSession 종료
spark.stop()
if __name__ == "__main__":
main()
'Python' 카테고리의 다른 글
Airflow Custom Sensor (0) | 2024.10.11 |
---|---|
S3에 파일을 업로드하는 Airflow custom operator 만들기 (0) | 2024.09.18 |
Apache Airflow 기반의 데이터 파이프라인 CHAPTER 6 (0) | 2024.08.29 |
CHAPTER 5 태스크 간 의존성 정의하기 (0) | 2024.08.21 |
CHAPTER 4 Airflow 콘텍스크를 사용하여 태스크 템플릿 작업하기 (0) | 2024.08.21 |