본문 바로가기

DataEnginner/Airflow

Apache Airflow 가이드

데이터 오케스트레이션 프레임 워크

Apache Airflow는 여러가지 태스크들(데이터셋 생성, 모델 학습 등)을 일련의 그래프로 연결하고 스케줄링, 모니터링 등 파이프라인 관리를 위한 다양한 기능을 제공하고 있는 Workflow Management Platform입니다.

다음은 Airflow에서 자주 사용되는 기본 개념과 용어입니다.

DAG :Airflow에서 DAG (Directed Acyclic Graph)는 서로에 대한 종속성이 있고 일정에 따라 실행되는 작업 그룹입니다. 각 DAG는 논리적 워크 플로와 동일합니다.

DAG 실행 : DAG에의 실행 특정입니다.

연산자 :연산자는 특정 유형의 작업에 대한 템플릿 역할을하는 Python 클래스입니다. 예를 들면 다음과 같습니다.

작업 인스턴스 :작업 인스턴스는 작업의 특정 실행을 나타내며 상태가 있습니다 (예 : "실행 중", "성공", "실패", "건너 뛰기", "재시도 시작"등).

워크 플로우 생성

DAG (일명 워크 플로)는 Airflow의 DAG_FOLDER에 저장된 Python 파일에 정의되며 DAG 정의, 작업 및 작업 종속성이라는 3 가지 주요 구성 요소를 포함합니다.

기본 인수

default_args사전이 DAG에 전달 되면 DAG에 속한 모든 작업에 적용됩니다.

default_args = {
'owner': 'xinran.waibel',
'start_date': datetime(2019, 12, 1),
'retries': 1,
'on_failure_callback': slack_failure_msg
}
  • start_date: 첫 번째 DAG 실행의 execution_date입니다.
  • end_date: DAG 실행을 중지해야하는 날짜 (일반적으로 없음)입니다.
  • execution_timeout: 작업을 실행할 수있는 최대 시간입니다.
  • retries: 작업이 실패하기 전에 수행 할 수있는 재시도 횟수입니다.
  • retry_delay: 재시도 사이의 지연 시간.
  • dependent_on_past: true로 설정하면 이전에 예약 된 작업 인스턴스가 성공한 경우에만 작업 인스턴스가 실행됩니다.
  • on_failure_callback: 태스크 인스턴스가 실패 할 때 호출 할 함수입니다.
# Create a DAG using context manager (with...as...)
# Benefits of context manager: all tasks within context manager is automatically assigned to the DAG so you don't have to explicitly set dag=dag for all tasks.
with DAG('DAG_NAME', default_args=default_args, schedule_interval='@once') as dag:

  # Create SQL task A using BigQueryOperator
  bq_sql_task_a = bigquery_operator.BigQueryOperator(
    task_id='demo_bq_sql_a',
    sql="INSERT INTO TABLE TABLE_B SELECT * FROM TABLE_A",
    use_legacy_sql=False)

  # Create SQL task B
  bq_sql_task_b = bigquery_operator.BigQueryOperator(
    task_id='demo_bq_sql_b',
    sql="INSERT INTO TABLE TABLE_C SELECT * FROM TABLE_B",
    use_legacy_sql=False)

  # Set dependency between A and B: B depends on A
  bq_sql_task_a >> bq_sql_task_b

  # Create Slack notification task using SlackAPIPostOperator
  slack_msg_task = SlackAPIPostOperator(
    task_id='slack_msg',
    channel='data_notifications',
    token=os.environ['SLACK_API_TOKEN'],
    text="""
    :white_check_mark: Workflow Succeeded
    *Dag*: {dag}
    *DAG Run ID*: {dag_run_id}
    """.format(dag=dag.dag_id, dag_run_id='{{ run_id }}'))

  # Slack task depends on both A and B
  [bq_sql_task_a, bq_sql_task_b] >> slack_msg_task

DAG 생성자에 대한 몇 가지 유용한 매개 변수 :

  • schedule_interval : DAG 실행 빈도를 지정하는 cron 표현식 입니다.
  • 캐치 업 : DAG가 내부적으로 백필을 수행하는 경우 캐치 업을 해제하는 것이 좋습니다.

현재 작업간에 종속성을 설정하는 두 가지 주요 방법이 있습니다.

  • Python의 비트 시프트 연산자 ( >>및 <<)
  • set_upstream()및 set_downstream()방법
# Task B depends on Task A and Task C depends on Task B
task_a >> task_b >> task_c
# Task D depends on Task C
task_c.set_downstream(task_d)
# Task C will run after both Task A and B complete
[task_a, task_b] >> task_c
from airflow.utils.helpers import chain
# Both Task B and C depend on Task A
# Task D depends on both Task B and C
chain(task_a, [task_b, task_c], task_d)
# The statement above is equivalent to:
task_a >> [task_b, task_c] >> task_d
from airflow.utils.helpers import cross_downstream
# Task C and D will run after both Task A and B complete
cross_downstream([task_a, task_b], [task_c, task_d])
# The statement above is equivalent to:
[task_a, task_b] >> task_c
[task_a, task_b] >> task_d

출처 : https://ichi.pro/ko/apache-airflowlo-deiteo-paipeu-lain-guchug-141331500863559

 

 

여러 가지 참고할 튜토리얼이 있지만 

 

https://aldente0630.github.io/data-engineering/2018/06/17/developing-workflows-with-apache-airflow.html 

 

이 분이 쓰신게 가장 설명도 잘되있고 잘 볼수있다.  but 내 것과 버전이 달라서 고쳐야할게 몇개 있다.

 

https://github.com/jo1013/pyspark 내 블로그에서 pyspark + airflow 이고 docker-compose로 구현되있다.