데이터 오케스트레이션 프레임 워크
Apache Airflow는 여러가지 태스크들(데이터셋 생성, 모델 학습 등)을 일련의 그래프로 연결하고 스케줄링, 모니터링 등 파이프라인 관리를 위한 다양한 기능을 제공하고 있는 Workflow Management Platform입니다.
다음은 Airflow에서 자주 사용되는 기본 개념과 용어입니다.
DAG :Airflow에서 DAG (Directed Acyclic Graph)는 서로에 대한 종속성이 있고 일정에 따라 실행되는 작업 그룹입니다. 각 DAG는 논리적 워크 플로와 동일합니다.
DAG 실행 : DAG에의 실행 특정입니다.
연산자 :연산자는 특정 유형의 작업에 대한 템플릿 역할을하는 Python 클래스입니다. 예를 들면 다음과 같습니다.
- BashOperator: bash 명령 실행
- PythonOperator: Python 함수 실행
- PythonVirtualenvOperator: 자동으로 생성 및 삭제되는 가상 환경에서 함수 실행
- BigQueryOperator:BigQuery에서 데이터 쿼리 및 처리
- PapermillOperator: Jupyter 노트북 실행
작업 인스턴스 :작업 인스턴스는 작업의 특정 실행을 나타내며 상태가 있습니다 (예 : "실행 중", "성공", "실패", "건너 뛰기", "재시도 시작"등).
워크 플로우 생성
DAG (일명 워크 플로)는 Airflow의 DAG_FOLDER에 저장된 Python 파일에 정의되며 DAG 정의, 작업 및 작업 종속성이라는 3 가지 주요 구성 요소를 포함합니다.
기본 인수
default_args사전이 DAG에 전달 되면 DAG에 속한 모든 작업에 적용됩니다.
default_args = {
'owner': 'xinran.waibel',
'start_date': datetime(2019, 12, 1),
'retries': 1,
'on_failure_callback': slack_failure_msg
}
- start_date: 첫 번째 DAG 실행의 execution_date입니다.
- end_date: DAG 실행을 중지해야하는 날짜 (일반적으로 없음)입니다.
- execution_timeout: 작업을 실행할 수있는 최대 시간입니다.
- retries: 작업이 실패하기 전에 수행 할 수있는 재시도 횟수입니다.
- retry_delay: 재시도 사이의 지연 시간.
- dependent_on_past: true로 설정하면 이전에 예약 된 작업 인스턴스가 성공한 경우에만 작업 인스턴스가 실행됩니다.
- on_failure_callback: 태스크 인스턴스가 실패 할 때 호출 할 함수입니다.
# Create a DAG using context manager (with...as...)
# Benefits of context manager: all tasks within context manager is automatically assigned to the DAG so you don't have to explicitly set dag=dag for all tasks.
with DAG('DAG_NAME', default_args=default_args, schedule_interval='@once') as dag:
# Create SQL task A using BigQueryOperator
bq_sql_task_a = bigquery_operator.BigQueryOperator(
task_id='demo_bq_sql_a',
sql="INSERT INTO TABLE TABLE_B SELECT * FROM TABLE_A",
use_legacy_sql=False)
# Create SQL task B
bq_sql_task_b = bigquery_operator.BigQueryOperator(
task_id='demo_bq_sql_b',
sql="INSERT INTO TABLE TABLE_C SELECT * FROM TABLE_B",
use_legacy_sql=False)
# Set dependency between A and B: B depends on A
bq_sql_task_a >> bq_sql_task_b
# Create Slack notification task using SlackAPIPostOperator
slack_msg_task = SlackAPIPostOperator(
task_id='slack_msg',
channel='data_notifications',
token=os.environ['SLACK_API_TOKEN'],
text="""
:white_check_mark: Workflow Succeeded
*Dag*: {dag}
*DAG Run ID*: {dag_run_id}
""".format(dag=dag.dag_id, dag_run_id='{{ run_id }}'))
# Slack task depends on both A and B
[bq_sql_task_a, bq_sql_task_b] >> slack_msg_task
DAG 생성자에 대한 몇 가지 유용한 매개 변수 :
- schedule_interval : DAG 실행 빈도를 지정하는 cron 표현식 입니다.
- 캐치 업 : DAG가 내부적으로 백필을 수행하는 경우 캐치 업을 해제하는 것이 좋습니다.
현재 작업간에 종속성을 설정하는 두 가지 주요 방법이 있습니다.
- Python의 비트 시프트 연산자 ( >>및 <<)
- set_upstream()및 set_downstream()방법
# Task B depends on Task A and Task C depends on Task B
task_a >> task_b >> task_c
# Task D depends on Task C
task_c.set_downstream(task_d)
# Task C will run after both Task A and B complete
[task_a, task_b] >> task_c
from airflow.utils.helpers import chain
# Both Task B and C depend on Task A
# Task D depends on both Task B and C
chain(task_a, [task_b, task_c], task_d)
# The statement above is equivalent to:
task_a >> [task_b, task_c] >> task_d
from airflow.utils.helpers import cross_downstream
# Task C and D will run after both Task A and B complete
cross_downstream([task_a, task_b], [task_c, task_d])
# The statement above is equivalent to:
[task_a, task_b] >> task_c
[task_a, task_b] >> task_d
출처 : https://ichi.pro/ko/apache-airflowlo-deiteo-paipeu-lain-guchug-141331500863559
여러 가지 참고할 튜토리얼이 있지만
이 분이 쓰신게 가장 설명도 잘되있고 잘 볼수있다. but 내 것과 버전이 달라서 고쳐야할게 몇개 있다.
https://github.com/jo1013/pyspark 내 블로그에서 pyspark + airflow 이고 docker-compose로 구현되있다.
'DataEnginner > Airflow' 카테고리의 다른 글
Docker-Compose로 구현 (Airflow + postgresql) + (pyspark+jupyter) + (mysql) (0) | 2021.06.07 |
---|