전체 글 (30) 썸네일형 리스트형 Airflow Custom Sensor 1. Custom Sensor의 구조custom sensor는 일반적으로 다음과 같은 구조를 가진다. airflow BaseSensorOperator를 상속받는다. __init__ 메서드를 정의해 필요한 매개변수를 설정한다. poke 메서드를 구현해 실제 센싱 작업을 수행한다. mode에 poke와 reschedule이 이쓴ㄴ데 poke만 정의하는 이유는 reschedule도 내부적으로는 poke를 실행하기 때문이다.2. 전체 코드dags/include/custom_sensors/finn_hbase.py 에 코드를 작성한다. HBaseRowExistsSEnsor는 Hbase테이블에서 특정 row key 가 존재하는지 확인하는 역할을 수행한다.rowkey에 rowkey 리스트를 입력하거나, row_star.. S3에 파일을 업로드하는 Airflow custom operator 만들기 Airflow_custom_operator 만들기Airflow에서는 커스텀 오퍼레이션을 직접 쉽게 구현해 생성할 수 있다. 이미 대부분의 airflow hook 이 존재하지만 특정 요구 사항을 충족하지 못할 때나 추가적인 기능이 필요할때 생성할 수 있다. - 특정 API나 서비스와의 통합 필요성 - 비즈니스 로지겡 맞춘 작업 - 코드의 재사용성과 유지보수성 향상 - 보안과 관리의 용이성 - 성능 최적화 및 고급 에러 핸들링S3에 파일을 업로드하는 Custom Hook 만들기import boto3from airflow.hooks.base import BaseHookfrom airflow.exceptions import AirflowExceptionclass AwsS3CustomHook(BaseHook): .. Apache Airflow 기반의 데이터 파이프라인 CHAPTER 7 Provider 외부시스템 이용하기Prodivder dockerDockerOperator 사용법Airflow와 Docker가 동일한 호스트에서 실행되고 있어야 한다.DockerOperator는 Docker 컨테이너를 실행하기 위해 Docker 데몬과 통신해야 한다. (즉 Docker 데몬이 실행중이어야한다.)docker_url parameter를 통해 Docker 데몬의 URL을 지정하게 된다.airflow provider docker documenthttps://airflow.apache.org/docs/apache-airflow-providers-docker/2.7.0/_api/airflow/providers/docker/operators/docker/index.html터미널에서 아래 명령을 실행해 .. Apache Airflow 기반의 데이터 파이프라인 CHAPTER 6 CHATPTER 6워크플로 트리거이 장에서는 다음과 같은 내용을 다룹니다. - 특정 조건을 센서에 만족하도록 대기하기 - 서로 다른 DAG의 테스트간 의존성 설정하기 - CLI 및 REST API를 통해 워크플로 실행하기 위크 플로를 실행하는 다른 방법을 알아봅시다.6.1 센서를 사용한 폴링 조건센서는 특정 조건이 true인지 지속적으로 확인하고 true라면 성공합니다. 만약 flase인 경우, 센서는 상태가 true가 될 때까지 또는 타임아웃이 될 때까지 계속 확인합니다. 리스트 6.1 Filesensor는 파일 경로가 생성될 때까지 기다립니다. from airflow.sensors.filesystem import Filesensorwait_for_supermarket_1=Filesensor( t.. CHAPTER 5 태스크 간 의존성 정의하기 CHAPTER 5태스크 간 의존성 정의하기이 장에서는 다음과 같은 내용을 다룹니다. - Airflow DAG에서 태스크 의존성을 정의하는 방법을 확인합니다. - 트리거 규칙이 작업 실행에 주는 영향에 대해 기존 지식을 제공합니다. - XCom을 이용하여 태스크 사이의 상태 공유 방법을 설명합니다. - Airflow 2의 Taskflow API를 사용해 파이썬을 많이 사용하는 DAG를 단순화하는 방법을 설명합니다. 5.1 기본 의존성 유형다양한 태스크 의존성 패턴을 살펴보겠습니다.태스크의 선형 체인 유형과 팬아웃/팬인 유형이 모두 포함됩니다.5.1.1 선형 의존성 유형단일 선형 태스크 체인리스트 5.1 로켓 사진 가져오기 DAG의 태스크download_launches=BashOperator(..)get_p.. CHAPTER 4 Airflow 콘텍스크를 사용하여 태스크 템플릿 작업하기 CHAPTER 4Airflow 콘텍스크를 사용하여 태스크 템플릿 작업하기이 장에서는 다음과 같은 내용을 다룹니다. - 템플릿을 사용하여 런타임 시에 변수 할당하기 - PythonOperator 및 다른 오퍼레이터를 사용해 변수 템플릿 작업하기- 디버깅을 위해 템플릿 변수 할다하기 - 외부 시스템에서 태스크 수행하기 DAG와 Operator가 어떻게 함께 작동하는지오퍼레이터가 무엇인지Database Data load원격 환경에서 명령을 실행4.1 Airflow로 처리할 데이터 검사하기StokcSense가 적용된 주식시장 예측 도구를 사용하여 오퍼레이터의 몇 가지 구성 요소를 알아봅니다.4.1.1 증분 데이터를 적재하는 방법 결정하기(중요하지 않은 설명... 생략)4.2 태스크 콘텍스트와 Jinja 템플릿 .. Apache Airflow 기반의 데이터 파이프라인 CHAPTER 3 CHAPTER 3Airflow의 스케줄링이 장에서 설명하는 내용은 다음과 같습니다. - 일정한 간격으로 DAG를 실행하기- 증분 데이터를 처리하기 위한 동적 DAG 구성하기 - 과거의 데이터 세트를 적재 및 재처리하기 - 신뢰할 수 있는 태스크를 위한 모범 사례 적용하기3.1 예시: 사용자 이벤트 처리하기먼저, Airflow의 스케줄 동작을 이해하기 위해 간단한 예를 살펴봅니다. 웹사이드에서 사용자 동작을 추적하고 사용자가 웹사이트에서 엑세스한 페이지(IP주소로 식별)를 분석할 수 있는 서비스가 있다고 가정해 보겠습니다.이 예제를 시뮬레이션하기 위해 간단한 API(로컬)를 만들었습니다. 예를 들어, 다음 API를 호출하면 지난 30일 동안 모든 이벤트 목록이 반환됩니다.curl -o /temp/event.. Apache Airflow 기반의 데이터 파이프라인 CHPATER 2 CHAPTER 2Airflow DAG의 구조이 장에서는 다음을 설명합니다. - 독자의 PC에서 Airflow 실행하기 - 첫 번째 워크플로 작성 및 실행하기 - Airflow 인터페이스에서 첫 번째 뷰 내용 검토하기 - Airflow 에서 실패한 태스트 처리하기 앞 장에서 데이터 작업 분야에서 데이터와 많은 도구를 이용해 작업하는 것이 쉽지 않은 이유에 대해 학습했습니다. 이 장에서는 Airflow를 실행한 후 예제 워크플로를 통해 기본 컴포넌트를 사용해 봅니다.2.1 다양한 소스에서 데이터 수집로켓은 공학정 측면에서 가장 경이로운 작업 중 하나이며, 로켓 발사 소식이 들리면 전 세계가 주목합니다. 이 장에서는 모든 로켓 발사에 관심이 있어서 이를 추적하고자 하는 로켓 애호가 John의 일상을 예로 소개.. 이전 1 2 3 4 다음