Issue
I am utilizing a third party library called isort
. isort
has an available function that opens and reads a file. In order to speed this up I attempted to changed the function called isort.check_file
to make it perform asynchronously. The method check_file
takes the file path, however the current behaviour that I have attempted does not work.
...
coroutines= [self.check_file('c:\\example1.py'), self.check_file('c:\\example2.py')]
loop = asyncio.get_event_loop()
result = loop.run_until_complete(asyncio.gather(*coroutines))
...
async def check_file(self, changed_file):
return isort.check_file(changed_file)
However, this does not seem to work. How can I make the library call isort.check_file
be utilized correctly with asyncio.gather
?
Solution
Better understanding of IO Bottleneck and GIL
What your async function check_file
doing is just as same without async
at front. To get any meaningful performance asynchronously, you Must be using some sort of Awaitables
- which requires await
keyword.
So basically what you did is:
import time
async def wait(n):
time.sleep(n)
Which does absolutely no good for asynchronous operations.
To make such synchronous function asynchronous - assuming it's mostly IO-bound - you can use asyncio.to_thread
instead.
import asyncio
import time
async def task():
await asyncio.to_thread(time.sleep, 10) # <- await + something that's awaitable
# similar to await asyncio.sleep(10) now
async def main():
tasks = [task() for _ in range(10)]
await asyncio.gather(*tasks)
asyncio.run(main())
That essentially moves IO bound operation out of main thread, so main thread can do it's work without waiting for IO works.
But there's catch - Python's Global Interpreter Lock(GIL).
Due to CPython - official python implementation - limitation, only 1 python interpreter thread can run in at any given moment, stalling all others.
Then how we achieve better performance just by moving IO to different thread? Just simply by releasing GIL during IO operations.
IO Operations are basically just like this:
"Hey OS, please do this IO works for me. Wake me up when it's done."
Thread 1 goes to sleepSome time later, OS punches Thread 1
"Your IO Operation is done, take this and get back to work."
So all it does is Doing Nothing - for such cases, aka IO Bound stuffs, GIL can be safely released and let other threads to run. Built-in functions like time.sleep
, open()
, etc implements such GIL release logic in their C code.
This doesn't change much in asyncio
, which is internally bunch of event checks and callbacks. Each asyncio,Tasks
works like threads in some degree - tasks asking main loop to wake them up when IO operation done is done.
Now these basic simplified concepts sorted out, we can go back to your question.
CPU Bottleneck and IO Bottleneck
Bsically what you're up against is Not an IO bottleneck. It's mostly CPU/etc bottleneck.
Loading merely few KB of texts from local drives then running tons of intense Python code afterward doesn't count as an IO bound operation.
Testing
Let's consider following test case:
- run
isort.check_file
for 10000 scripts as:- Synchronously, just like normal python codes
- Multithreaded, with 2 threads
- Multiprocessing, with 2 processes
- Asynchronous, using
asyncio.to_thread
We can expect that:
- Multithreaded will be slower than Synchronous code, as there's very little IO works
- Multiprocessing process spawning & communicating takes time, so it will be slower in short workload, faster in longer workload.
- Asynchronous will be even more slower than the Multithreaded, because Asyncio have to deal with threads which it's not really designed for.
With folder structure of:
├─ main.py
└─ import_messes
├─ lib_0.py
├─ lib_1.py
├─ lib_2.py
├─ lib_3.py
├─ lib_4.py
├─ lib_5.py
├─ lib_6.py
├─ lib_7.py
├─ lib_8.py
└─ lib_9.py
Which we'll load 1000 times each, making up to total 10000 loads.
Each of those are filled with random imports I grabbed from asyncio
.
from asyncio.base_events import *
from asyncio.coroutines import *
from asyncio.events import *
from asyncio.exceptions import *
from asyncio.futures import *
from asyncio.locks import *
from asyncio.protocols import *
from asyncio.runners import *
from asyncio.queues import *
from asyncio.streams import *
from asyncio.subprocess import *
from asyncio.tasks import *
from asyncio.threads import *
from asyncio.transports import *
Source code(main.py):
"""
asynchronous isort demo
"""
import pathlib
import asyncio
import itertools
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from timeit import timeit
import isort
from isort import format
# target dir with modules
FILE = pathlib.Path("./import_messes")
# Monkey-patching isort.format.create_terminal_printer to suppress Terminal bombarding.
# Totally not required nor recommended for normal use
class SuppressionPrinter:
def __init__(self, *_, **__):
pass
def success(self, *_):
pass
def error(self, *_):
pass
def diff_line(self, *_):
pass
isort.format.BasicPrinter = SuppressionPrinter
# -----------------------------
# Test functions
def filelist_gen():
"""Chain directory list multiple times to get meaningful difference"""
yield from itertools.chain.from_iterable([FILE.iterdir() for _ in range(1000)])
def isort_synchronous(path_iter):
"""Synchronous usual isort use-case"""
# return list of results
return [isort.check_file(file) for file in path_iter]
def isort_thread(path_iter):
"""Threading isort"""
# prepare thread pool
with ThreadPoolExecutor(max_workers=2) as executor:
# start loading
futures = [executor.submit(isort.check_file, file) for file in path_iter]
# return list of results
return [fut.result() for fut in futures]
def isort_multiprocess(path_iter):
"""Multiprocessing isort"""
# prepare process pool
with ProcessPoolExecutor(max_workers=2) as executor:
# start loading
futures = [executor.submit(isort.check_file, file) for file in path_iter]
# return list of results
return [fut.result() for fut in futures]
async def isort_asynchronous(path_iter):
"""Asyncio isort using to_thread"""
# create coroutines that delegate sync funcs to threads
coroutines = [asyncio.to_thread(isort.check_file, file) for file in path_iter]
# run coroutines and wait for results
return await asyncio.gather(*coroutines)
if __name__ == '__main__':
# run once, no repetition
n = 1
# synchronous runtime
print(f"Sync func.: {timeit(lambda: isort_synchronous(filelist_gen()), number=n):.4f}")
# threading demo
print(f"Threading : {timeit(lambda: isort_thread(filelist_gen()), number=n):.4f}")
# multiprocessing demo
print(f"Multiproc.: {timeit(lambda: isort_multiprocess(filelist_gen()), number=n):.4f}")
# asyncio to_thread demo
print(f"to_thread : {timeit(lambda: asyncio.run(isort_asynchronous(filelist_gen())), number=n):.4f}")
Run results
Sync func.: 18.1764
Threading : 18.3138
Multiproc.: 9.5206
to_thread : 27.3645
You can see it turned out as we expected, isort.check_file
is not an IO-Bound operation. Therefore best bet is using Multiprocessing, if Really needed.
If number of files are low, like hundred or below, multiprocessing will suffer even more than using asyncio.to_thread
, because cost to spawn, communicate, and kill process overwhelm the multiprocessing's benefits.
Experiment with your usecase, adjust core count (max_workers
) to best fit your environment and your usecase.
Answered By - jupiterbjy
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.