Issue
I want to start a FastAPI service in Python and a Java service to communicate through HTTP. My idea is to use subprocess
to call the Java startup script, and then monitor the logs of the Java service to determine whether the start/stop was successful. Since subprocess
is a blocking task, I replaced asyncio.subprocess_shell
is successful, but it does not wait for the Java service to start successfully, which is a danger in potential. I hope to use asyncio.SubprocessProtocol
continuously monitors Java's startup logs until it outputs a successful string, but the while
statement is prioritized, preventing the await
from being executed and causing the process to freeze. Is there a way to give startup_event
is a signal to not start successfully until the task is completed?
And here is my code.
import asyncio
import re
from logging import logger
from fastapi import FastAPI
app = FastAPI()
transport = asyncio.SubprocessTransport()
protocal = asyncio.SubprocessProtocol()
class MyProtocol(asyncio.SubprocessProtocol):
startup_str = re.compile("Server - Started")
is_startup = False
is_exited = False
def pipe_data_received(self, fd, data):
logger.info(data)
super().pipe_data_received(fd, data)
if not self.is_startup:
if re.search(self.startup_str, str(data)):
self.is_startup = True
def pipe_connection_lost(self, fd, exc):
if exc is None:
logger.debug(f"Pipe {fd} Closed")
else:
logger.error(exc)
super().pipe_connection_lost(fd, exc)
def process_exited(self):
logger.info("Process Exited")
super().process_exited()
self.is_exited = True
@app.on_event("startup")
async def startup_event():
loop = asyncio.get_running_loop()
global transport, protocal
transport, protocal = await loop.subprocess_shell(MyProtocol, "/start_java_server.sh")
# Waiting Application startup complete.
# while not self.protocal.is_startup:
# pass
@app.on_event("shutdown")
async def shutdown_event():
global transport, protocal
transport.close()
Solution
Your FastAPI application will not start until startup_event
completes.
So, just add a loop to the "startup_event" function and wait until your "Java service" starts:
@app.on_event("startup")
async def startup_event():
loop = asyncio.get_running_loop()
global transport, protocal
transport, protocal = await loop.subprocess_shell(MyProtocol, "/start_java_server.sh")
while not protocal.is_startup:
await asyncio.sleep(0.1)
But I would recommend you to use lifespan object instead of deprecated startup_event
. And add started_future
Future object as it's shown in the documentation:
import asyncio
from contextlib import asynccontextmanager
import re
from logging import getLogger
from fastapi import FastAPI
logger = getLogger(__name__)
transport: asyncio.SubprocessTransport
protocol: "MyProtocol"
class MyProtocol(asyncio.SubprocessProtocol):
def __init__(self, started_future: asyncio.Future, exited_future: asyncio.Future):
self.started_future = started_future
self.exited_future = exited_future
self.startup_str = re.compile("Server - Started")
def pipe_data_received(self, fd, data):
logger.info(data)
super().pipe_data_received(fd, data)
# if not self.started_future.result():
if re.search(self.startup_str, str(data)):
self.started_future.set_result(True)
def pipe_connection_lost(self, fd, exc):
if exc is None:
logger.debug(f"Pipe {fd} Closed")
else:
logger.error(exc)
super().pipe_connection_lost(fd, exc)
def process_exited(self):
logger.info("Process Exited")
super().process_exited()
self.exited_future.set_result(True)
@asynccontextmanager
async def lifespan(app: FastAPI):
global transport, protocol
loop = asyncio.get_running_loop()
started_future = asyncio.Future(loop=loop)
exited_future = asyncio.Future(loop=loop)
transport, protocol = await loop.subprocess_shell(
lambda: MyProtocol(started_future, exited_future),
"/start_java_server.sh"
)
try:
await asyncio.wait_for(started_future, timeout=5.0)
except asyncio.TimeoutError:
print('timeout!')
yield
try:
await asyncio.wait_for(exited_future, timeout=5.0)
except asyncio.TimeoutError:
print('timeout!')
transport.close()
app = FastAPI(lifespan=lifespan)
Answered By - Yurii Motov
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.