Python

S3에 파일을 업로드하는 Airflow custom operator 만들기

hero_pin 2024. 9. 18. 18:55

Airflow_custom_operator 만들기

  • Airflow에서는 커스텀 오퍼레이션을 직접 쉽게 구현해 생성할 수 있다.
  • 이미 대부분의 airflow hook 이 존재하지만 특정 요구 사항을 충족하지 못할 때나 추가적인 기능이 필요할때 생성할 수 있다.
- 특정 API나 서비스와의 통합 필요성 
- 비즈니스 로지겡 맞춘 작업 
- 코드의 재사용성과 유지보수성 향상 
- 보안과 관리의 용이성 
- 성능 최적화 및 고급 에러 핸들링

S3에 파일을 업로드하는 Custom Hook 만들기

import boto3
from airflow.hooks.base import BaseHook
from airflow.exceptions import AirflowException

class AwsS3CustomHook(BaseHook):
    def __init__(self, aws_conn_id='aws_conn_id_finn', region_name=None):
        super().__init__()
        self.aws_conn_id = aws_conn_id
        self.region_name = region_name
        self.session = None
        self.s3_client = None
        self._create_session()

    def _create_session(self):
        # Boto3 session u
        connection = self.get_connection(self.aws_conn_id)
        extra_config = connection.extra_dejson

        # Access key and secret key
        aws_access_key_id = connection.login
        aws_secret_access_key = connection.password
        self.session = boto3.Session(
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
            region_name=self.region_name or extra_config.get('region_name', 'ap-northeast-2')
        )
        self.s3_client = self.session.client('s3')

    def upload_file(self, bucket_name, key, file_name):
        # Upload file
        try:
            self.s3_client.upload_file(file_name, bucket_name, key)
            self.log.info(f"File {file_name} uploaded to S3 bucket {bucket_name} at {key}")
        except Exception as e:
            self.log.error(f"Failed to upload {file_name} to {bucket_name}/{key}: {e}")
            raise AirflowException(e)

Custom Operator 만들기

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from include.custom_aws_hook.aws_hook import AwsS3CustomHook

class S3FileUploadOperator(BaseOperator):
    #Custom Operator 

    @apply_defaults
    def __init__(self, bucket_name, key, file_name, aws_conn_id='aws_conn_id_finn', region_name=None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.bucket_name = bucket_name
        self.key = key
        self.file_name = file_name
        self.aws_conn_id = aws_conn_id
        self.region_name = region_name

    def execute(self, context):
        hook = AwsS3CustomHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name)
        hook.upload_file(bucket_name=self.bucket_name, key=self.key, file_name=self.file_name)
        self.log.info(f"File {self.file_name} uploaded to {self.bucket_name}/{self.key}")

실행 DAG 만들기

from airflow import DAG
from airflow.utils.dates import days_ago
from include.custom_aws_operator.s3_file_upload_operator import S3FileUploadOperator

with DAG(dag_id='finn_aws_custom_operator',
         schedule_interval='@once',
         start_date=days_ago(1),
         catchup=False) as dag:

    upload_task = S3FileUploadOperator(
        task_id='upload_file_to_s3',
        bucket_name='dd-dbx-training',
        key='usage-data/bio/temp_data.csv',
        file_name='/data/temp_data.csv',
        aws_conn_id='aws_conn_id_finn',
        region_name='ap-northeast-2'
    )

    upload_task