Python
S3에 파일을 업로드하는 Airflow custom operator 만들기
hero_pin
2024. 9. 18. 18:55
Airflow_custom_operator 만들기
- Airflow에서는 커스텀 오퍼레이션을 직접 쉽게 구현해 생성할 수 있다.
- 이미 대부분의 airflow hook 이 존재하지만 특정 요구 사항을 충족하지 못할 때나 추가적인 기능이 필요할때 생성할 수 있다.
- 특정 API나 서비스와의 통합 필요성
- 비즈니스 로지겡 맞춘 작업
- 코드의 재사용성과 유지보수성 향상
- 보안과 관리의 용이성
- 성능 최적화 및 고급 에러 핸들링
S3에 파일을 업로드하는 Custom Hook 만들기
import boto3
from airflow.hooks.base import BaseHook
from airflow.exceptions import AirflowException
class AwsS3CustomHook(BaseHook):
def __init__(self, aws_conn_id='aws_conn_id_finn', region_name=None):
super().__init__()
self.aws_conn_id = aws_conn_id
self.region_name = region_name
self.session = None
self.s3_client = None
self._create_session()
def _create_session(self):
# Boto3 session u
connection = self.get_connection(self.aws_conn_id)
extra_config = connection.extra_dejson
# Access key and secret key
aws_access_key_id = connection.login
aws_secret_access_key = connection.password
self.session = boto3.Session(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=self.region_name or extra_config.get('region_name', 'ap-northeast-2')
)
self.s3_client = self.session.client('s3')
def upload_file(self, bucket_name, key, file_name):
# Upload file
try:
self.s3_client.upload_file(file_name, bucket_name, key)
self.log.info(f"File {file_name} uploaded to S3 bucket {bucket_name} at {key}")
except Exception as e:
self.log.error(f"Failed to upload {file_name} to {bucket_name}/{key}: {e}")
raise AirflowException(e)
Custom Operator 만들기
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from include.custom_aws_hook.aws_hook import AwsS3CustomHook
class S3FileUploadOperator(BaseOperator):
#Custom Operator
@apply_defaults
def __init__(self, bucket_name, key, file_name, aws_conn_id='aws_conn_id_finn', region_name=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.bucket_name = bucket_name
self.key = key
self.file_name = file_name
self.aws_conn_id = aws_conn_id
self.region_name = region_name
def execute(self, context):
hook = AwsS3CustomHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name)
hook.upload_file(bucket_name=self.bucket_name, key=self.key, file_name=self.file_name)
self.log.info(f"File {self.file_name} uploaded to {self.bucket_name}/{self.key}")
실행 DAG 만들기
from airflow import DAG
from airflow.utils.dates import days_ago
from include.custom_aws_operator.s3_file_upload_operator import S3FileUploadOperator
with DAG(dag_id='finn_aws_custom_operator',
schedule_interval='@once',
start_date=days_ago(1),
catchup=False) as dag:
upload_task = S3FileUploadOperator(
task_id='upload_file_to_s3',
bucket_name='dd-dbx-training',
key='usage-data/bio/temp_data.csv',
file_name='/data/temp_data.csv',
aws_conn_id='aws_conn_id_finn',
region_name='ap-northeast-2'
)
upload_task