오음

Django와 Celery, RabbitMQ를 이용한 분산 비동기 작업 처리 (2) 본문

데이터 엔지니어링/FastAPI

Django와 Celery, RabbitMQ를 이용한 분산 비동기 작업 처리 (2)

오준돌 2023. 8. 24. 23:37

코드 참고 git 주소

 

GitHub - OhJune/Client-Django-FastAPI: Git DE

Git DE . Contribute to OhJune/Client-Django-FastAPI development by creating an account on GitHub.

github.com

 

RabbitMQ 서버 설치

 

> sudo apt install rabbitmq-server

> systemctl status rabbitmq-server

> sudo service rabbitmq-server start

 

 

가상환경에 celery 설치

 

> pip install celery 

celery 실행  -- celery.py가 있는 경로에서 실행해야한다. 나는 fastconnect프로젝트에 fastconnect파일경로이다. 

> celery -A django.config worker -l info

> celery -A django.config worker -l info --concurrency=1

 

redis는 gcp에서 따로 설치함

로컬에서 다운받아도 되고 기본 celery backend를 사용하여도 된다. 

 

결과)

왼쪽 위 : django

오른쪽 위 : fastapi

왼쪽 아래 : celery worker

오른쪽 아래 : rabbitmq-server 

 

결과를 보면 django에서 task.py들이 실행이 되어 views.py를 통해 fastapi로 값을 전달해준다. 이 과정에서 처리할 때 task_id라는 값이 들어오고 이 값을 자바스크립트로 처리해준다. 

 

 

 

앞선 내용에 이어서 

2023.08.24 - [데이터 엔지니어링] - Django와 Celery, RabbitMQ를 이용한 분산 비동기 작업 처리 (1)

 

Django와 Celery, RabbitMQ를 이용한 분산 비동기 작업 처리 (1)

저번 시간에 Fastapi와 Django를 이용하여 모델 서빙을 하였다. 여기서 장고에서 사용자의 값이 너무 많이 들어오게 될 경우 Fastapi가 처리해야할 데이터를 놓칠수도 있게 된다. 때문에 RabbitMQ로 메시

oh-um.tistory.com

장고 앱에 urls.py에서 views.py의 get_result함수를 사용하여 url을 지정해준다. 

# django_project/app/urls.py
from django.urls import path
from . import views

urlpatterns = [
    path('', views.index, name='index'),
    path('process_request', views.process_request, name='process_request'),
    path("get_result/<str:task_id>", views.get_result, name="get_result"),
    
]

index.html 

자바스크립트로 비동기 처리를 해준다. 

<!DOCTYPE html>
<html>
<head>
    <title>장고 - FastAPI 연동 예제</title>
    <script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
    <style>
        table {
            border-collapse: collapse;
            width: 100%;
        }

        table, th, td {
            border: 1px solid black;
        }

        th, td {
            padding: 15px;
            text-align: left;
        }
    </style>
</head>
<body>
{% csrf_token %}
    <h1>입력값 전송:</h1>
    <label for="input_data">입력값:</label>
    <input type="text" id="input_data">
    <button id="submit_button">전송</button>
    
    <h2>결과 출력:</h2>
    
    <div id="result"></div>

    <script>
        // 결과 가져오기 
        async function getResult(task_id) {
            const response = await fetch(`/get_result/${task_id}`);
            return await response.json();
        }

        // 결과 출력 
        function displayResult(result) {
            $("#result").html(`<pre>${JSON.stringify(result, null, 2)}</pre>`);
        }
        
        // 제출 버튼 이벤트
        $('#submit_button').click(async function () {
          try{
            const input_data = $('#input_data').val();
            const csrf_token = $('[name="csrfmiddlewaretoken"]').val();
            const response = await fetch('/process_request', {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json',
                    'X-CSRFToken': csrf_token,
                },
                body: JSON.stringify({ input_data: input_data }),
            });

            if (response.ok) {
                const responseData = await response.json();
                const task_id = responseData.task_id;

                let task_result = { ready: false };
                while (!task_result.ready) {
                    task_result = await getResult(task_id);
                    await new Promise(resolve => setTimeout(resolve,3000))
                }

                displayResult(task_result.result);
            } else {
                alert('다시 시도!');
            }
        }catch(error){
          console.error('Error',error);
          alert('오류발생')
        }
        });
    </script>
</body>
</html>