Python

Airflow Custom Sensor

hero_pin 2024. 10. 11. 22:05

1. Custom Sensor의 구조

custom sensor는 일반적으로 다음과 같은 구조를 가진다.

  • airflow BaseSensorOperator를 상속받는다.
  • __init__ 메서드를 정의해 필요한 매개변수를 설정한다.
  • poke 메서드를 구현해 실제 센싱 작업을 수행한다. mode에 poke와 reschedule이 이쓴ㄴ데 poke만 정의하는 이유는 reschedule도 내부적으로는 poke를 실행하기 때문이다.

2. 전체 코드

dags/include/custom_sensors/finn_hbase.py 에 코드를 작성한다.

HBaseRowExistsSEnsor는 Hbase테이블에서 특정 row key 가 존재하는지 확인하는 역할을 수행한다.
rowkey에 rowkey 리스트를 입력하거나, row_start, row_end로 범위를 지정할 수 있다.

from typing import Optional, Sequence
from airflow.sensors.base import BaseSensorOperator
from include.custom_hook.hbase_hook import HbaseHook

class HbaseRowkeySensor(BaseSensoprOperator):
  def __init__(
    self,
    *,
    table: str,
    rowkeys: Optional[Sequence[str]] = None,
    row_start: Optional[str] = None,
    row_stop: Optional[str] = None,
    hbase_conn_id: str = 'hbase_default',
    **kwargs
  ):
    super().__init__(**kwargs)
    self.table = table
    self.rowkeys = rowkeys
    self.row_start = row_start
    self.row_stop = row_stop
    self.hbase_conn_id = hbase_conn_id

    if (rowkeys and (row_start or row_stop)) or (not rowkeys and not (row_start and row_stop)):
      raise ValueError("value Error...")

  def poke(self, context):

    hook = HbaseHook(hbase_conn_id=self.hbase_conn_id)
    conn = hook.get_conn()
    table = conn.table(self.table)

    if self.rowkeys:
      return self._check_rowkeys(table)
    else:
      return self._check_rowkey_range(table)

  def _check_rowkeys(self, table):
    for rowkeys in self.rowkeys:
      if not table.row(rowkey):
        self.log.info('rowkey not found exception')
        return False
      self.log.info('sucess...')
      return True

  def _check_rowkey_range(self, table):
    scanner = table.scan(row_start=self.row_start, row_stop=self.row_stop, limit=1)
    for key, data in scanner:
        self.log.info('success...')
        return True
    self.log.info('rowkey not found...')
    return False