Issue
So, We have a situation where we are building a replacement for a legacy system. This replacement is a series of lambdas in a State Machine (Step Function).
During our rollout we are duplicating incoming requests, sending one copy to our Step Function, and sending the other side to the legacy system. If the new system does not fail, then it will return first and cancel the other thread.
We are doing this so that if our new system fails, it will still a response from the legacy system. Given that our system is about 3x-4x faster than the legacy system, we were relying on that at fist to prevent the old system from returning before ours (yes, potential race condition we were well aware of and already designing for). We were using asyncio, and aiohttp to accomplish this, along with async.wait(first_completed)
basicaly (somewhat pseudo code)
async def new_system()
boto3.start_sync_execution(Express State Machine)
async def legacy_system()
await aiohttp.post(request)
async def wait_first()
done, pending = async.wait(new_system_task, legacy_system_task, First_completed)
for task in done
check if it was an error
if error, wait for pending
else
cancel pending
Now, our business requirements have shifted slightly (hah) that requests that they be able to compare the output from our system with the output from the previous system. We have no way to directly link the two systems databases and correlate requests between them, so we thought that we would pull the request out into a lambda.
with the above pseudo code you can see that we would not have the legacy system response to parse/store/attribute to our correlation ids.
But if we pulled it out into a lambda, we could stop listening for the lambda response if ours returned successful. However that lambda would still continue to run allowing us to grab the response, parse and store it.
However, if we replace the aiohttp.post with a boto3.function invoke then we run into a classic async problem - neither the state machine execution or the boto3 invoke tie into the asyncio loop and therefor will return 'simultaneously' in the wait.
I have 4 possible solutions in mind and I'm not sure which is the best I would like advice on the Pro's and Cons of each that I may not be aware of or have wrong
- Don't cancel the Pending task. Let the pending task complete - which will allow us to just put the parse/store in the legacy_system() function without too much trouble
The fear here is that we have to return to an API as soon as possible. So if we return with our new system, and the legacy system is still 'running' in the invoke for another 3-4 seconds, how badly will this screw up concurrent executions of the lambda?
Use executor_functions from async to tie both new_system() and legacy_system() into the loops. I am unsure of how to do this directly, but as I understand the code this should work without any adverse sideffects and allow us to accomplish what we want
Use invoke(type=event) from the boto3 function to make an async call, and add an await function that pools for a response This seems to be ok, but i am not certain that it would actually work in practice, and I see it as overly complicated - polling for a completed lambda execution feels adverse to the rest of our event driven service and overly complicated
Add an await asyncio.sleep() to the functions I am not sure that we can do less than 1 second on await asyncio.sleep? if we can that may be the simplest answer with the least issues, but it seems really friggin hacky.
I tried do option 2 for a bit tonight, but have yet to figure out how to make it work.
Thanks in advance
Solution
While I try to do things without importing a ton of libraries, in this case there isn't much choice. Boto3 calls inherently Block async loops. However the aioboto3
library handles this.
import aioboto3
session = aioboto3.Session()
async with session.client('lambda') as fn_client:
response = await fn_client.invoke(...
Answered By - lynkfox
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.