Issue
I was trying to implement a golang like channel
in python. I used asyncio package and some simple generators to implement this.
import asyncio
def channel(capacity):
# A very lazy channel implementation
size = 0
data = []
while size < capacity:
item = yield
data.append(item)
size += 1
for item in data:
yield item
async def worker(id:int, jobs:channel, results:channel):
print(f"worker : {id} started")
await asyncio.sleep(0)
for job in jobs:
print(f"worker : {id} received job {job}")
result = job * job
await asyncio.sleep(1)
results.send(result)
async def main():
jobs = channel(10)
results = channel(10)
# priming jobs and results channel
next(jobs)
next(results)
for i in range(10):
jobs.send(i)
await worker(1, jobs, results)
await worker(2, jobs, results)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
even though the code seems to run, the second worker is not spawning until the first worker consumes all the jobs.
I added asyncio.sleep(0)
for forcing the context switch, but still the first worker is not returning the control back to the main co-routine.
What am I doing wrong here? Does it have anyhing to do with the channel generator
?
Result :
worker : 1 started
worker : 1 received job 1
worker : 1 received job 2
worker : 1 received job 3
worker : 1 received job 4
worker : 1 received job 5
worker : 1 received job 6
worker : 1 received job 7
worker : 1 received job 8
worker : 1 received job 9
worker : 2 started
Solution
When you do:
await worker(1, jobs, results)
You are waiting for this coroutine to end, and when it ends the next one is executed (once the channel is empty). So the first worker consumes all the items.
To solve this you have to run the two coroutines together.
With gather, transform all coroutines into tasks and wait for all of them to finish:
# with gather
await asyncio.gather(
worker(1, jobs, results),
worker(2, jobs, results)
)
With tasks (from doc):
Wrap the coro coroutine into a Task and schedule its execution. Return the Task object.
And then await the tasks.
task_1 = asyncio.create_task(worker(1, jobs, results))
task_2 = asyncio.create_task(worker(2, jobs, results))
await task_1
await task_2
Also don't create your channel, use asyncio.Queue with a maxsize
as a channel.
Answered By - Lucas
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.