오음

Airflow와 몽고DB 본문

데이터 엔지니어링/Airflow

Airflow와 몽고DB

오준돌 2023. 6. 2. 10:00

 

크롤링을 통한 데이터들을 json형태로 get하기 때문에 전처리 과정을 생략하고 Mysql이 아닌 MongoDB에 저장을   

 

1. 중고서적 판매사이트인 알라딘에서 API를 받아와 도서의 목록을 json파일의 형태로 받는다.

2. MongoDB에 적재시키는 작업을 airflow자동화  

 

<시작>

- airflow 컨테이너 실행 

2023.05.31 - [데이터 엔지니어링/Airflow] - Airflow 컨테이너 띄우기

 

Airflow 컨테이너 띄우기

저번시간에 활용한 Crontab과 비슷한 역할을 하는 Airflow에 대해서 알아보고자 한다. Airflow란? - 유연한 파이썬 프레임워크를 사용해 쉽게 데이터 파이프라인을 구축할 수 있게 해주며, 최신 기술

oh-um.tistory.com

- 설정한 경로의 dags파일에 파이썬파일 넣기

  • DAG 설정
  • 알라딘 API이용하여 크롤링하는 함수
  • MongoDB와 연결하고 적재하는 함수
  • 참고

https://www.mongodb.com/developer/products/mongodb/mongodb-apache-airflow/#currency-over-time

from datetime import date, datetime
import requests
import pandas as pd
import json
import pathlib
import airflow.utils.dates
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from pymongo.mongo_client import MongoClient
from airflow.models import XCom
import os
dag = DAG(
    dag_id="aladin_book_mongo", #dag id
    description="aladin book data", #dag의 설명
    start_date=datetime(2023, 5, 24, 0, 0),  # 시작 날짜 및 시간 설정
    schedule_interval='30 16 * * *', # 매일 오후 4시 30분
)

def _get_url(ti):
    pathlib.Path("/home/airflow/data").mkdir(parents=True, exist_ok=True)
    TTBKey = 'Myapi_key'
    items = []
    for start_value in range(1, 11):
        url = f"http://www.aladin.co.kr/ttb/api/ItemList.aspx?ttbkey={TTBKey}&QueryType=ItemNewAll&SearchTarget=Used&SubSearchTarget=Book&MaxResults=50&start={start_value}&output=js&Version=20131101&OptResult=usedList"
        res = requests.get(url)
        items.extend(res.json()['item'])
    # items 리스트를 Airflow의 XCom 메커니즘을 통해 다른 작업과 공유 가능 이를 통해 items 값을 다른 작업에서 사용가능
    ti.xcom_push(key="items", value=items)
    

def insert_data_to_mongo_atlas(ti):
	# 이전 작업에서 XCom을 통해 전달된 데이터를 가져온다.
    data = ti.xcom_pull(key="items")
    # 자신의 MongoDB Atlas와 연결해준다. user와 password값 등록
    client = MongoClient('mongodb+srv://user:<password>@cluster0.wydppxv.mongodb.net/?retryWrites=true&w=majority')
    
    db = client['etl']
    collection = db['aladin']
    for item in data:
        collection.insert_one(item)

get_url = PythonOperator(
    task_id="get_url", python_callable=_get_url, dag=dag
)

insert_task = PythonOperator(
    task_id='insert_to_mongo_atlas',
    python_callable=insert_data_to_mongo_atlas,
    # 작업 함수에 실행 컨텍스트를 제공
    #  ti 매개변수를 통해 TaskInstance를 사용
    provide_context=True,
    dag=dag,
)
get_url >>  insert_task

 

- airflow를 이용하여 실행 

 

실행완료 확인! 성공

 

- MongDB 데이터 확인!

 

잘 들어갔슴다