Python
Airflow Custom Sensor
hero_pin
2024. 10. 11. 22:05
1. Custom Sensor의 구조
custom sensor는 일반적으로 다음과 같은 구조를 가진다.
- airflow BaseSensorOperator를 상속받는다.
__init__
메서드를 정의해 필요한 매개변수를 설정한다.- poke 메서드를 구현해 실제 센싱 작업을 수행한다.
mode에 poke와 reschedule이 이쓴ㄴ데 poke만 정의하는 이유는 reschedule도 내부적으로는 poke를 실행하기 때문이다.
2. 전체 코드
dags/include/custom_sensors/finn_hbase.py
에 코드를 작성한다.
HBaseRowExistsSEnsor는 Hbase테이블에서 특정 row key 가 존재하는지 확인하는 역할을 수행한다.
rowkey에 rowkey 리스트를 입력하거나, row_start, row_end로 범위를 지정할 수 있다.
from typing import Optional, Sequence
from airflow.sensors.base import BaseSensorOperator
from include.custom_hook.hbase_hook import HbaseHook
class HbaseRowkeySensor(BaseSensoprOperator):
def __init__(
self,
*,
table: str,
rowkeys: Optional[Sequence[str]] = None,
row_start: Optional[str] = None,
row_stop: Optional[str] = None,
hbase_conn_id: str = 'hbase_default',
**kwargs
):
super().__init__(**kwargs)
self.table = table
self.rowkeys = rowkeys
self.row_start = row_start
self.row_stop = row_stop
self.hbase_conn_id = hbase_conn_id
if (rowkeys and (row_start or row_stop)) or (not rowkeys and not (row_start and row_stop)):
raise ValueError("value Error...")
def poke(self, context):
hook = HbaseHook(hbase_conn_id=self.hbase_conn_id)
conn = hook.get_conn()
table = conn.table(self.table)
if self.rowkeys:
return self._check_rowkeys(table)
else:
return self._check_rowkey_range(table)
def _check_rowkeys(self, table):
for rowkeys in self.rowkeys:
if not table.row(rowkey):
self.log.info('rowkey not found exception')
return False
self.log.info('sucess...')
return True
def _check_rowkey_range(self, table):
scanner = table.scan(row_start=self.row_start, row_stop=self.row_stop, limit=1)
for key, data in scanner:
self.log.info('success...')
return True
self.log.info('rowkey not found...')
return False