Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
Tags
- JavaScripts
- 파일저장
- 리눅스 # 기초
- datetime #zip
- cron
- celery
- 원하는 태그 찾기
- Docker
- text.children
- 가상환경 초기세팅
- 자동화
- aiflow
- ssh operator
- 자연어처리 환경 컨테이너
- beautifulsoup
- Google Cloud Storage
- 빗썸api
- requesthead
- mariadb설치 #mysql설치
- requests
- FastAPI
- 모델서빙
- enumerate #함수 # def
- K-ICT
- HeidiSQL
- with open
- pickle #datetime
- etl
- airflow
- 정규표현식
Archives
- Today
- Total
오음
Airflow와 몽고DB 본문
크롤링을 통한 데이터들을 json형태로 get하기 때문에 전처리 과정을 생략하고 Mysql이 아닌 MongoDB에 저장을
1. 중고서적 판매사이트인 알라딘에서 API를 받아와 도서의 목록을 json파일의 형태로 받는다.
2. MongoDB에 적재시키는 작업을 airflow자동화
<시작>
- airflow 컨테이너 실행
2023.05.31 - [데이터 엔지니어링/Airflow] - Airflow 컨테이너 띄우기
Airflow 컨테이너 띄우기
저번시간에 활용한 Crontab과 비슷한 역할을 하는 Airflow에 대해서 알아보고자 한다. Airflow란? - 유연한 파이썬 프레임워크를 사용해 쉽게 데이터 파이프라인을 구축할 수 있게 해주며, 최신 기술
oh-um.tistory.com
- 설정한 경로의 dags파일에 파이썬파일 넣기
- DAG 설정
- 알라딘 API이용하여 크롤링하는 함수
- MongoDB와 연결하고 적재하는 함수
- 참고
https://www.mongodb.com/developer/products/mongodb/mongodb-apache-airflow/#currency-over-time
from datetime import date, datetime
import requests
import pandas as pd
import json
import pathlib
import airflow.utils.dates
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from pymongo.mongo_client import MongoClient
from airflow.models import XCom
import os
dag = DAG(
dag_id="aladin_book_mongo", #dag id
description="aladin book data", #dag의 설명
start_date=datetime(2023, 5, 24, 0, 0), # 시작 날짜 및 시간 설정
schedule_interval='30 16 * * *', # 매일 오후 4시 30분
)
def _get_url(ti):
pathlib.Path("/home/airflow/data").mkdir(parents=True, exist_ok=True)
TTBKey = 'Myapi_key'
items = []
for start_value in range(1, 11):
url = f"http://www.aladin.co.kr/ttb/api/ItemList.aspx?ttbkey={TTBKey}&QueryType=ItemNewAll&SearchTarget=Used&SubSearchTarget=Book&MaxResults=50&start={start_value}&output=js&Version=20131101&OptResult=usedList"
res = requests.get(url)
items.extend(res.json()['item'])
# items 리스트를 Airflow의 XCom 메커니즘을 통해 다른 작업과 공유 가능 이를 통해 items 값을 다른 작업에서 사용가능
ti.xcom_push(key="items", value=items)
def insert_data_to_mongo_atlas(ti):
# 이전 작업에서 XCom을 통해 전달된 데이터를 가져온다.
data = ti.xcom_pull(key="items")
# 자신의 MongoDB Atlas와 연결해준다. user와 password값 등록
client = MongoClient('mongodb+srv://user:<password>@cluster0.wydppxv.mongodb.net/?retryWrites=true&w=majority')
db = client['etl']
collection = db['aladin']
for item in data:
collection.insert_one(item)
get_url = PythonOperator(
task_id="get_url", python_callable=_get_url, dag=dag
)
insert_task = PythonOperator(
task_id='insert_to_mongo_atlas',
python_callable=insert_data_to_mongo_atlas,
# 작업 함수에 실행 컨텍스트를 제공
# ti 매개변수를 통해 TaskInstance를 사용
provide_context=True,
dag=dag,
)
get_url >> insert_task
- airflow를 이용하여 실행
실행완료 확인! 성공
- MongDB 데이터 확인!
잘 들어갔슴다
'데이터 엔지니어링 > Airflow' 카테고리의 다른 글
Airflow) GPU서버를 이용한 모델 학습과 학습된 모델 버킷에 저장하기 (0) | 2023.07.30 |
---|---|
Airflow 컨테이너 띄우기 (0) | 2023.05.31 |