일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
- etl
- pickle #datetime
- aiflow
- Google Cloud Storage
- requests
- celery
- enumerate #함수 # def
- datetime #zip
- beautifulsoup
- mariadb설치 #mysql설치
- 정규표현식
- with open
- 리눅스 # 기초
- 자연어처리 환경 컨테이너
- requesthead
- text.children
- 빗썸api
- JavaScripts
- ssh operator
- 파일저장
- K-ICT
- airflow
- 자동화
- 모델서빙
- Docker
- 원하는 태그 찾기
- 가상환경 초기세팅
- cron
- FastAPI
- HeidiSQL
- Today
- Total
오음
Django와 Celery, RabbitMQ를 이용한 분산 비동기 작업 처리 (1) 본문
저번 시간에 Fastapi와 Django를 이용하여 모델 서빙을 하였다. 여기서 장고에서 사용자의 값이 너무 많이 들어오게 될 경우 Fastapi가 처리해야할 데이터를 놓칠수도 있게 된다. 때문에 RabbitMQ로 메시지를 연결해주고 celery로 비동기 작업을 처리하는 구조를 만들어 볼까 한다.
처리에 앞서 비동기란 무엇일까?
비동기 태스크(asynchronous task)는 코드의 실행이 동시에 발생하지 않고, 순차적으로 실행을 기다리지 않고 독립적으로 실행하는 작업이다. 비동기 프로그래밍을 사용하면 프로그램이 다른 작업을 기다리지 않고 동시에 여러 작업을 수행할 수 있도록 하여 모든 작업을 완료할 수 있게 된다.
Message broker 란?
Message broker는 송신자의 이전 메시지 프로토콜로부터의 메시지를 수신자의 이전 메시지 프로토콜로 변환하는 중간 모듈이다. Kafka, RabbitMQ, Redis등이 여기에 해당된다. 이번 프로젝트에서는 메세지 브로커로 RabbitMQ를 사용하였다.
RabbitMQ는 AMQP를 구현한 메시지 브로커이며 기본적으로 producer가 메시지를 생성하여 전송하고 Queue가 메시지를 순차적으로 쌓는다. 그 다음 consumer가 Queue에서 수신을 하는 흐름이다.
RabbitMQ의 장점으로는 Cluster구성이 쉽고 manageUI기능이 제공되며 확장성이 뛰어나며 메세지가 성공적으로 전달되었다고 판단될 경우 메세지가 큐에서 삭제된다. 나는 이러한 특성과 이벤트의 영속성과 재처리의 필요성이 없다고 판단하여 RabbitMQ를 이용하였다.
※ 메세지 큐들에 대한 비교
https://velog.io/@mdy0102/MQ-%EB%B9%84%EA%B5%90-Kafka-RabbitMQ-Redis
Celery 란?
Celery 는 python application 에서 많은 양의 작업들을 나눠서 처리할 수 있도록 해주는 분산 시스템이다. Python application 에서 생성된 작업들의 실시간 처리와 작업 스케줄링 등을 제공하는 task queue 이다.
Celery 동작 구조
웹 서비스(Django)에서 발생한 요청(Task)를 Message Broker에서 받아 Celery를 이용하여 분산 처리를 진행한다. Celery에서는 작업이 완료되는 (특정 이벤트)에 DB Task를 수행한다.
아래는 Celery git이다. 사용법과 프레임워크 통합 등등 내용이 나와있습니다.
GitHub - celery/celery: Distributed Task Queue (development branch)
Distributed Task Queue (development branch). Contribute to celery/celery development by creating an account on GitHub.
github.com
실습 ) 장고와 FastAPI 간에 메시지 큐를 사용하는 방법은 다음과 같이 진행.
- 먼저, RabbitMQ 서버를 설치하고 실행한다. Celery를 사용하여 장고 프로젝트에 비동기 태스크를 추가한다.
- 장고에서 FastAPI 서버의 엔드포인트로 가는 작업을 비동기 태스크로 작성한다.
- 태스크를 실행할 때마다 RabbitMQ에 메시지를 추가하고, 메시지가 처리된 후 결과를 반환한다.
=> 결과적으로 다수의 사용자 입력이 동시에 발생해도 메시지 큐를 통해 하나씩 처리하고 서버에 과부하 없이 사용자들에게 적절한 응답을 전달할 수 있음.
이전단계부터 이어서...
2023.07.30 - [데이터 엔지니어링/FastAPI] - FastAPI로 모델 서빙해주기 (Feat.Django)
FastAPI로 모델 서빙해주기 (Feat.Django)
만들어 놓은 word2vec 기반의 Song2vec.model을 Fastapi에 올리고 Django가 값을 띄워주는 구조를 만들어 볼까 한다. 우선 FastAPI가 뭐냐?? FastAPI는 파이썬 프레임워크 비교적 가벼운 웹개발이라면 Flask를 사
oh-um.tistory.com
settings.py있는 경로에 celery.py 추가
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
# 기본 장고파일 설정
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'fastconnect.settings')
app = Celery('fastconnect')
app.config_from_object('django.conf:settings', namespace='CELERY')
#등록된 장고 앱 설정에서 task 불러오기
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
settings.py에서 celery설정
아래 처럼 설정만 추가해주면 broker와 db를 적어 주는데 celery는 .delay모듈을 이용하면 자동으로 결과값을 저장해준다.
CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//" # rabbitmq
CELERY_RESULT_BACKEND = 'redis://:비밀번호@ip:port/database' # redis
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
다음은 장고 app에 가서 task와 views에 있는 함수를 추가/수정할 것 이다.
task.py는 말그대로 celery worker들이 수행할 작업들이다.
- @shared_task로 처리하고 싶은 일에 딱지를 붙인다.
- store_input_data_in_redis는 들어오는 데이터를 redis에 저장한다.
- settings.py에서 설정한 redis 데이터베이스 3번은 결과값(fastapi로부터 추천받은 곡)을 저장하는 것이고
- 사용자의 현재 가지고 있는 값은 따로 데이터베이스 4번에 저장하기 위해 설정을 해두었다.
- send_data_to_fastapi작업에 delay를 붙이면 Redis Backend에 기록이 저장된다.
- RabbitMQ는 Celery에게 일을 준다.
- 일을 받은 Celery는 send_data_to_fastapi작업을 시작한다.
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import requests
import redis
from django.conf import settings
import datetime
from fastconnect.celery import app as celery_app
@shared_task(bind=True)
def store_input_data_in_redis(self, input_data):
redis_conn = redis.Redis(host='사용자ip', port=6379, db=4, password= 'password')
print(redis_conn)
# redis_conn = celery_app.backend.client
date = datetime.datetime.now()
redis_key = f"input_data:{date}"
redis_conn.set(redis_key, ",".join(input_data))
@shared_task
def send_data_to_fastapi(data):
url = "http://localhost:8001/process" # FastAPI 엔드포인트 주소
response = requests.post(url, json={"input_data": data})
return response.json()
views.py 수정
process_request함수로 값을 받고 task.py에 있는 task들을 실행시킨다.
get_result로 celery를 통해 받은 결과값을 json형태로 가져온다.
이 후 사용자에게 결과값을 보여주기 위해 html파일에서 자바스크립트로 처리 해준다.
from django.shortcuts import render
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
import requests
import json
from .models import ResultData
from .tasks import send_data_to_fastapi, store_input_data_in_redis
from celery.result import AsyncResult
import time
from collections import namedtuple
def index(request):
return render(request, 'index.html')
@csrf_exempt
def process_request(request):
if request.method == 'POST':
input_data = json.loads(request.body.decode('utf-8'))["input_data"]
input_data = [str(x) for x in input_data.split(",")]
# 데이터 확인
print(f"Input data (Django): {input_data}")
store_input_data_in_redis(input_data)
if input_data:
task = send_data_to_fastapi.delay(input_data)
print(f"Result from task1: {task}")
print(f"Result from task2: {task.id}")
return JsonResponse({"task_id": str(task.id)})
else:
return JsonResponse({"task_id": "No input_data provided"})
else:
return JsonResponse({"error": "Only POST requests are allowed"})
@csrf_exempt
def get_result(request, task_id):
if request.method == 'GET':
task = AsyncResult(task_id)
time.sleep(2)
try:
result = task.result # 작업의 결과값을 가져옵니다.
except Exception as e:
result = None # 예외가 발생하면 결과값을 None으로 설정합니다.
result_payload = {
"task_id": task_id,
'ready': task.ready(),
'result': result,
}
data = result_payload['result']
Song = namedtuple("Song", ["title", "artist", "ky", "tj"])
results = [Song(x['title'], x['artist'], x['ky_song_num_id'], x['tj_song_num_id']) for x in data['result']]
print(results)
return JsonResponse(result_payload) #,{"result":results}
else:
return JsonResponse({'error': 'Only GET requests are allowed'})
이로써 장고 코드 수정은 끝났고 rabbitmq서버를 실행하고 celery worker들로 비동기처리를 시작한다.
'데이터 엔지니어링 > FastAPI' 카테고리의 다른 글
Django와 Celery, RabbitMQ를 이용한 분산 비동기 작업 처리 (2) (0) | 2023.08.24 |
---|---|
FastAPI로 모델 서빙해주기 (Feat.Django) (0) | 2023.07.30 |