Issue
I am trying to build a REST API that will manage some machine learning classification tasks. I have written an API view, which when hit, will trigger the start of a classification task (such as: training an SVM classifier with the data the user provided previously). However, this is a long running task, so I would ideally not have the user wait once they have made a request to this view. Instead, I would like to start this task in the background and give them a response immediately. They can later view the results of the classification in a separate view (haven't implemented that yet.)
I am using ASGI_APPLICATION = 'mlxplorebackend.asgi.application'
in settings.py
.
Here's my API view in views.py
import asyncio
from concurrent.futures import ProcessPoolExecutor
from django import setup as SetupDjango
# ... other imports
loop = asyncio.get_event_loop()
def DummyClassification():
result = sum(i * i for i in range(10 ** 7))
print(result)
return result
# ... other API views
class TaskExecuteView(APIView):
"""
Once an API call is made to this view, the classification algorithm will start being processed.
Depends on:
1. Parser for the classification algorithm type and parameters
2. Classification algorithm implementation
"""
def get(self, request, taskId, *args, **kwargs):
try:
task = TaskModel.objects.get(taskId = taskId)
except TaskModel.DoesNotExist:
raise Http404
else:
# this is basically the classification task for now
# need to turn this to an async view
with ProcessPoolExecutor(initializer = SetupDjango) as pool:
loop.run_in_executor(pool, DummyClassification)
return Response({ "message": "The task with id: {} has been started".format(task.taskId) }, status = status.HTTP_200_OK)
The problem I am facing is the following:
When I do not use
with ProcessPoolExecutor(initializer = SetupDjango) as pool:
i.e. without the initializer, I getdjango.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.
(full traceback at: https://paste.ubuntu.com/p/ctjmFNYMXW/)When I do use the initializer, the view no longer remains async, it gets blocked. The response returns after the task is completed, which is about 5 seconds on my machine. I do realize I am not really making use of
asyncio.sleep()
inside myDummyClassification()
function, but I can't figure out the way to do so.
I am guessing this is not the way to do it, therefore any suggestions would be appreciated. I would like to avoid celery if I can, since that seems a tad bit too complicated for me.
Edit:
If I get rid of ProcessPoolExecutor()
and simply do loop.run_in_executor(None, DummyClassification)
, it works as expected, but then only one worker thread is working on the task, which doesn't seem remotely ideal for a classification task.
Solution
This was a ride. I at first went through the pain of setting up celery
only to find out that the original problem of the classification task using one CPU core remains. Then I switched to django-rq
with redis
and it is currently working as expected.
from .tasks import Pipeline
class TaskExecuteView(APIView):
"""
Once an API call is made to this view, the classification algorithm will start being processed.
Depends on:
1. Parser for the classification algorithm type
2. Classification algorithm implementation
"""
def get(self, request, taskId, *args, **kwargs):
try:
task = TaskModel.objects.get(taskId = taskId)
except TaskModel.DoesNotExist:
raise Http404
else:
Pipeline.delay(taskId) # this is async now ✔
# mark this as an in-progress task
TaskModel.objects.filter(taskId = taskId).update(inProgress = True)
return Response({ "message": "The task with id: {}, title: {} has been started".format(task.taskId, task.taskTitle) }, status = status.HTTP_200_OK)
tasks.py
from django_rq import job
@job('default', timeout=3600)
def Pipeline(taskId):
# classification task
Answered By - mahieyin-rahmun
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.