본문 바로가기

Programming/[Python]

[Airflow] Airflow 예제를 통한 DAG 구조 확인 및 실행

반응형

참고도서 : Apache Airflow 기반의 데이터 파이프라인

가장 기본이 되는 예제를 연습해보았습니다.

 

환경 : Vscode, WSL2 (Ubuntu 20.04), anaconda(python 3.10)

 

 

1. 환경설정

 

airflow 설치를 위해 아나콘다에서 가상환경을 생성합니다.

conda create -n airflow python=3.10
conda activate airflow

 

 

가상환경에서 airflow와 virtualenv를 설치합니다.

# airflow 설치
pip install apache-airflow

# virtualenv 설치
pip install virtualenv
# 설치하지 않은 경우 web ui에서 다음과 같은 에러로그를 출력하였음.

#Broken DAG: [/home/kwanghori/anaconda3/envs/airflow/lib/python3.10/site-packages/airflow/example_dags/example_branch_operator_decorator.py] Traceback (most recent call last):
#  File "/home/kwanghori/anaconda3/envs/airflow/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 437, in apply_defaults
#    result = func(self, **kwargs, default_args=default_args)
#  File "/home/kwanghori/anaconda3/envs/airflow/lib/python3.10/site-packages/airflow/operators/python.py", line 581, in __init__
#    raise AirflowException("PythonVirtualenvOperator requires virtualenv, please install it.")
#airflow.exceptions.AirflowException: PythonVirtualenvOperator requires virtualenv, please install it.

 

 

커맨드를 입력하여 airflow 메타스토어를 초기화하고, airflow 사용자를 생성합니다. init 명령어 실행 시 현재 경로에 airflow 디렉터리가 생성됩니다.

# airflow 메타스토어 초기화
airflow db init # 실행 시 airflow 디렉터리 생성과 logs 폴더, airflow.cfg, airflow.db 생성

# airflow 사용자 생성
airflow users create --username kwanghori --password kwanghori --firstname kwangho --lastname lee --role Admin --email [이메일주소]

 

 

사용자를 생성하였으면 DAG 실행을 위한 스크립트를 작성해야합니다.

https://ll.thespacedevs.com/2.0.0/launch/upcoming API에 요청한 결과값을 지정된 경로에 저장하고, json 파싱을 통해 이미지를 다운로드하고 몇 개의 이미지를 저장했는지 출력하는 DAG를 작성합니다.

 

https://ll.thespacedevs.com/2.0.0/launch/upcoming 요청 결과 예시

 

 

이때 DAG 스크립트는 파이썬으로 작성합니다.

import json
import pathlib

import airflow
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

def _get_pictures():
    '''
    https://ll.thespacedevs.com/2.0.0/launch/upcoming의 결과값을 파싱하고
    모든 로켓 사진을 다운로드하는 함수
    '''
    
    pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)      # 경로가 존재하는 지 확인
    
    # /tmp/launches.json 파일에 있는 모든 그림 파일 다운로드
    with open("/tmp/launches.json") as f:
        launches=json.load(f)
        image_urls=[launch['image'] for launch in launches['results']]
        for image_url in image_urls:
            try:
                response=requests.get(image_url)
                image_filename=image_url.split("/")[-1]
                target_file=f"/tmp/images/{image_filename}"
                with open(target_file, "wb") as f:
                    f.write(response.content)
                print(f"Downloaded {image_url} to {target_file}")
            except requests_exceptions.MissingSchema:
                print(f"{image_url} appears to be an invalid URL.")
            except requests_exceptions.ConnectionError:
                print(f"Could not connect to {image_url}.")
    


dag = DAG(  # 객체의 인스턴스 생성(구체화) - 모든 워크플로우의 시작점
    dag_id="download_rocket_launches",              # DAG 이름
    start_date=airflow.utils.dates.days_ago(14),    # DAG 첫 실행 시작 날짜
    schedule_interval=None,                         # DAG 실행 간격
)

download_launches = BashOperator(
    task_id="download_launches",
    bash_command="curl -o /tmp/launches.json -L 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'",       # /tmp 경로에 launches.json으로 결과값 다운로드
    dag=dag,
)

get_pictures=PythonOperator(    # DAG에서 PythonOperator를 사용하여 파이썬 함수 호출
    task_id="get_pictures",
    python_callable=_get_pictures,
    dag=dag,
)

notify=BashOperator(
    task_id="notify",
    bash_command='echo "There are now $(ls /tmp/images/ | wc -l) images."',
    dag=dag,
)

download_launches >> get_pictures >> notify     # 태스크 실행 순서 결정

 

 

작성한 스크립트를 dags 디렉터리에 복사합니다.

cp download_rocket_launches.py ~/airflow/dags/

 

 

웹서버와 스케줄러를 실행합니다. 웹 ui는 flask로 만들어져있습니다. 웹서버는 8080포트를 사용합니다.

# 웹서버 시작 (백그라운드 실행)
airflow webserver &

# 스케줄러 시작 (백그라운드 실행)
airflow scheduler &

 

 

2. Airflow Web UI 접속 및 작업 실행

 

localhost:8080 주소로 접근 후 사용자 생성 username, password 정보로 로그인 합니다.

 

DAGs 리스트가 출력됩니다. download_rocket_launches를 클릭합니다.

 

 

 

DAG에 들어온 Graph를 클릭하면 설정한 task가 DAG 스크립트에 지정한 플로우 순서대로 출력됩니다. DAG on/off 토글을 클릭하고, 재생 버튼을 눌러 실행합니다.

 

 

세 개의 task 작업이 모두 완료되었으면 notify 태스크를 클릭하여 log를 확인합니다.

 

 

log에서 10개의 이미지를 저장한 것을 확인하였습니다. 이제 실제 지정 경로 (/tmp/images) 디렉터리에 이미지가 저장되었는지 확인합니다.

 

 

지금까지 스케줄러 주기를 설정하지 않은 airflow DAG 설정 및 실행과정이었습니다. 아래는 작업 과정에서 발생한 시행착오입니다.

 

 

3. 발생 오류 정리

 

1. DAG를 정상적으로 불러오지 못했다는 메세지가 표시됨.

Broken DAG: [/home/kwanghori/anaconda3/envs/airflow/lib/python3.10/site-packages/airflow/example_dags/example_branch_operator_decorator.py] Traceback (most recent call last):
  File "/home/kwanghori/anaconda3/envs/airflow/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 437, in apply_defaults
    result = func(self, **kwargs, default_args=default_args)
  File "/home/kwanghori/anaconda3/envs/airflow/lib/python3.10/site-packages/airflow/operators/python.py", line 581, in __init__
    raise AirflowException("PythonVirtualenvOperator requires virtualenv, please install it.")
airflow.exceptions.AirflowException: PythonVirtualenvOperator requires virtualenv, please install it.

 

 

virtualenv를 설치하여 해결하였습니다.

pip install virtualenv

 

 

2.DAG를 실행하였는데 queued 상태에서 진행되지 않음. (맨 위의 노란색 경고 문구)

the scheduler does not appear to be running. last heartbeat was received 1 hours ago. 
the dags list may not update, and new tasks will not be scheduled.

 

스케줄러를 실행하지 않아서 발생한 문제였습니다.

airflow scheduler &

 

 

3. PythonOperator 태스크에서 에러 발생

Traceback (most recent call last):
File "/home/kwanghori/anaconda3/envs/airflow/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 433, in _execute_task
result = execute_callable(context=context, **execute_callable_kwargs)
File "/home/kwanghori/anaconda3/envs/airflow/lib/python3.10/site-packages/airflow/operators/python.py", line 199, in execute
return_value = self.execute_callable()
File "/home/kwanghori/anaconda3/envs/airflow/lib/python3.10/site-packages/airflow/operators/python.py", line 216, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/kwanghori/airflow/dags/download_rocket_launches.py", line 20, in _get_pictures
with ("/tmp/launches.json") as f:
AttributeError: **enter**

 

파이썬 소스코드를 잘못입력하여 발생한 오류였습니다.

반응형