Issue
recently I have been studying the application of Python's Package "aioftp" in order to download asynchronously some data from a remote server. So far, I have had no success in my efforts. Generally, the Error Message that returns to me regards the access to the local file in my computer, as if more than one thread were trying to write into it at the same time. Other times, I get a ConnectionResetError: "Connection lost".
Here is an example code for reference. This code should evaluate all available files within a remote repository ('ftp.star.nesdis.noaa.gov') and asynchronously download the selected files to one's local computer:
import os
import asyncio
import aiofiles
from pathlib import Path
from queue import Queue, Empty as QueueEmptyException
from threading import Semaphore
import aioftp
from ftplib import FTP
def get_parents(level=2):
cwd = os.getcwd()
for i in range(level):
cwd = os.path.dirname(cwd)
return cwd
def getUrlsFromFTP(serverFTP="",
remoteDirName=""
):
urlsToDownload = []
ftp = FTP(serverFTP)
ftp.login()
ftp.cwd(remoteDirName)
try:
files = list(sorted([file for file in ftp.nlst()]))
urlsToDownload = [os.path.join(remoteDirName,
str(f)
).replace("\\", "/")
for f in files]
print("N° of files found: ", len(urlsToDownload))
except Exception as err:
print(err)
finally:
ftp.close()
return urlsToDownload
class FTPFileDownloader:
def __init__(self,
serverFTP: str,
remoteURLFolder="",
user="",
password="",
dirname=r"dataFolder",
urlsToDownload=list(),
maxConcurrentThreads=20):
self.serverFTP = serverFTP
self.user = user
self.password = password
self.remoteURLFolder = remoteURLFolder
self.chunk_size = 1024 # 1 KB
self.dirnameToSave = dirname
print("Saving in: ", self.dirnameToSave)
self.downloadSemaphore = Semaphore(maxConcurrentThreads)
self.downloadQueue = Queue()
for url in urlsToDownload:
localFileName = Path(url).name
self.downloadQueue.put( (localFileName, url) )
async def download(self, client):
self.downloadSemaphore.acquire()
while not self.downloadQueue.empty():
try:
localfilename, url = self.downloadQueue.get()
print("Downloading: ", localfilename)
async with aiofiles.open(localfilename, "wb") as file:
async with client.download_stream(url) as stream:
async for block in stream.iter_by_block(self.chunk_size):
content = await block.read()
await file.write(content)
except QueueEmptyException:
print("Queue is Empty")
self.downloadSemaphore.release()
async def downloadURLFiles(self):
async with aioftp.Client.context(self.serverFTP,
# user=self.user,
# password=self.password
) as client:
tasks = []
for i in range(self.downloadQueue.qsize()):
task = asyncio.create_task(self.download(client))
tasks.append(task)
for t in tasks:
await t.result()
def run(self):
asyncio.run(self.downloadURLFiles())
def main():
serverFTP = 'ftp.star.nesdis.noaa.gov'
GOES_URLS = getUrlsFromFTP(
serverFTP,
remoteDirName = r"/pub/sod/mecb/crw/data/5km/v3.1/nc/v1.0/daily/sst/",
)
dirname = os.path.join(get_parents(1),
"Downloads")
Downloader = FTPFileDownloader(serverFTP,
urlsToDownload=GOES_URLS,
dirname=dirname)
Downloader.run()
if __name__ == "__main__":
import nest_asyncio
nest_asyncio.apply()
main()
So far, All I got was the "RuntimeError: This event loop is already running".
Any insight is welcome.
Sincerely,
Solution
Body
After dwelling deep within the async packages used in the above script question, I finally managed to create a functional routine for downloading data from ftp connection-based servers.
Below, I present two approaches for downloading asynchronously remote data:
- Single-threaded
- multi-threaded
Both approaches were only possible after considering the following Issues:
- Usage of the aioftp.pathio.AsyncPathIO class for writing the stream of data to a local file, instead of using the original "open" - which blocks the thread.
- Better understanding of the asyncio.gather(*tasks) and asyncio.get_event_loop().run_until_complete(future) methods.
- Usage of the pathlib.Path element at the "aioftp.pathio.AsyncPathIO().open" method. From what I have observed from the aioftp documentation, it is not totally evident that one must use a Path instance instead of a raw string (perhaps the aioftp's authors could reinforce this point in the package's documentation).
- Better usage of the Semaphore for ensuring a limit of concurrent downloads being executed at the "same time" from the remote server
- A better usage of the loop instance.
- Usage of the third-party package called "nest_asyncio". This was the current only solution found so far that solves the "RuntimeError: This event loop is already running in python" problem"
- Avoidance of loop closures (e.g., asyncio.get_event_loop().close()). This case in particular is discussed elsewhere.
Single-Threaded asynchronous Approach
Description
One will notice that the "Single-Threaded Script" example below shows different approaches for retrieving the desired asynchronous results: a) by means of the "Loop AbstractClass"; b) by means of the "await future"; c) by means of the "await coRoutine". This way, I hope to show all possible solutions for the same problem.
Single-Threaded Asynchronous Script
import os
import asyncio
from pathlib import Path
from threading import Semaphore
import aioftp
import numpy as np
from ftplib import FTP
def get_parents(level=2):
cwd = os.getcwd()
for i in range(level):
cwd = os.path.dirname(cwd)
return cwd
def getUrlsFromFTP(serverFTP="",
remoteDirName="",
yearrange=None
):
if yearrange is None:
yearrange = {"lower": 2010,
"higher": 2015}
urlsToDownload = []
ftp = FTP(serverFTP)
ftp.login()
ftp.cwd(remoteDirName)
try:
years = list(sorted([int(y) for y in ftp.nlst()]))
years = [y for y in years if all([y >= yearrange["lower"],
y < yearrange["higher"],
])
]
print("Years to be evaluated: ", years)
def getfilesFromFTPFolder(url):
ftp.cwd(url)
ncfiles = ftp.nlst()
ncfilesURLS = [os.path.join(url, f).replace("\\", "/")
for f in ncfiles]
return ncfilesURLS
sst_GOES_Annual_URLS = [os.path.join(remoteDirName,
str(x)
) for x in years]
urlsToDownload = list(
map(lambda url: getfilesFromFTPFolder(url),
sst_GOES_Annual_URLS)
)
urlsToDownload = np.concatenate(urlsToDownload).tolist()
print("N° of images found: ", len(urlsToDownload))
except Exception as err:
print(err)
finally:
ftp.close()
return urlsToDownload
class FTPFileDownloader:
def __init__(self,
serverFTP: str,
remoteURLFolder="",
user="",
password="",
dirname=r"dataFolder",
urlsToDownload=list(),
maxDownloadsPerTime=4):
self.serverFTP = serverFTP
self.user = user
self.password = password
self.remoteURLFolder = remoteURLFolder
self.chunk_size = 1024*10 # 10 KB
if not os.path.exists(dirname):
os.makedirs(dirname, exist_ok=True)
else:
pass
self.dirnameToSave = dirname
print("Saving in: ", self.dirnameToSave)
self.downloadSemaphore = Semaphore(maxDownloadsPerTime)
self.downloadList = []
for url in urlsToDownload:
path = Path(url)
localFileName = path.name
toFileName = Path(os.path.join(self.dirnameToSave, localFileName))
self.downloadList.append( (toFileName, url) )
async def download(self,
client:aioftp.Client,
toFileName:Path,
url:str):
self.downloadSemaphore.acquire()
print("\t\t Downloading: ", toFileName.name)
async with aioftp.pathio.AsyncPathIO().open(toFileName, mode="wb") as file:
async with client.download_stream(url) as stream:
async for block in stream.iter_by_block(self.chunk_size):
await file.write(block)
async def downloadURLFiles(self, useGatherWithTasks=True, useLoop=True):
try:
async with aioftp.Client.context(self.serverFTP,
# user=self.user,
# password=self.password
) as client:
# O exemplo abaixo não funciona
if useGatherWithTasks:
print("\n\n", "-"*50, "\n")
print("Using Tasks")
print("\n", "-"*50, "\n")
tasks = []
for (toFileName, url) in self.downloadList:
task = asyncio.create_task(self.download(client, toFileName, url))
tasks.append(task)
future = asyncio.gather(*tasks)
if useLoop:
print("\n\n\t", "-"*40, "\n")
print("\t Using Loop")
print("\n\t", "-"*40, "\n")
loop = asyncio.get_event_loop()
results = loop.run_until_complete(future)
# Não é possível fechar o loop devido a um
# erro sistêmico do Python: https://github.com/python/cpython/issues/77650
else:
print("\n\n\t", "-"*40, "\n")
print("\t No Loop")
print("\n\t", "-"*40, "\n")
results = await future
return results
else:
print("\n\n", "-"*50, "\n")
print("NO TASKS: awaiting each Coroutine")
print("\n", "-"*50, "\n")
for (toFileName, url) in self.downloadList:
await self.download(client, toFileName, url)
except KeyboardInterrupt:
pass
def run(self, useGatherWithTasks=True, useLoop=True):
asyncio.run(self.downloadURLFiles(useGatherWithTasks, useLoop))
def main(maxDownloadsPerTime = 4):
serverFTP = 'ftp.star.nesdis.noaa.gov'
urlsToDownload = getUrlsFromFTP(serverFTP,
remoteDirName = r"/pub/sod/mecb/crw/data/5km/v3.1/nc/v1.0/daily/sst/",
)[:12]
dirname = os.path.join(get_parents(1),
"Downloads")
Downloader = FTPFileDownloader(serverFTP,
urlsToDownload=urlsToDownload,
dirname=dirname,
maxDownloadsPerTime=maxDownloadsPerTime)
Downloader.run()
if __name__ == "__main__":
import nest_asyncio
nest_asyncio.apply()
main()
Multi-Threaded asynchronous Approach
Below I managed to create a MultiThreaded Asynchronous Download example. One may notice that some options were similar to the single-threaded; nevertheless, in this latter example, I had to use a lambda function in order to wrap the target function into Threads' tasks. Furthermore, I opted for a manager-Labor approach; therefore, I am using two classes: one for effectively downloading the remote data (urls), and the other for managing the whole MultiThreading configuration.
Multi-Threaded Asynchronous Script
import os
import asyncio
from pathlib import Path
import aioftp
import numpy as np
from ftplib import FTP
from queue import Queue, Empty as QueueEmptyException
import threading
from threading import Semaphore , Thread
from os import getpid
import nest_asyncio
import concurrent
def get_parents(level=2):
cwd = os.getcwd()
for i in range(level):
cwd = os.path.dirname(cwd)
return cwd
def listToQueue(listOfUrls:list, dirnameToSave:str):
QueueOfUrlsToDownload = Queue()
for url in listOfUrls:
path = Path(url)
localFileName = path.name
toFileName = Path(os.path.join(dirnameToSave, localFileName))
QueueOfUrlsToDownload.put( (toFileName, url) )
return QueueOfUrlsToDownload
def getUrlsFromFTP(serverFTP="",
remoteDirName="",
yearrange=None
):
if yearrange is None:
yearrange = {"lower": 2010,
"higher": 2015}
urlsToDownload = []
ftp = FTP(serverFTP)
ftp.login()
ftp.cwd(remoteDirName)
try:
years = list(sorted([int(y) for y in ftp.nlst()]))
years = [y for y in years if all([y >= yearrange["lower"],
y < yearrange["higher"],
])
]
print("Years to be evaluated: ", years)
def getfilesFromFTPFolder(url):
ftp.cwd(url)
ncfiles = ftp.nlst()
ncfilesURLS = [os.path.join(url, f).replace("\\", "/")
for f in ncfiles]
return ncfilesURLS
sst_GOES_Annual_URLS = [os.path.join(remoteDirName,
str(x)
) for x in years]
urlsToDownload = list(
map(lambda url: getfilesFromFTPFolder(url),
sst_GOES_Annual_URLS)
)
urlsToDownload = np.concatenate(urlsToDownload).tolist()
print("N° of images found: ", len(urlsToDownload))
except Exception as err:
print(err)
finally:
ftp.close()
return urlsToDownload
class FTPFileDownloader:
def __init__(self,
client: aioftp.Client,
user: str,
password: str,
dirnameToSave: str,
urlsToDownload: Queue,
semaphore: Semaphore):
self.client = client
self.user = user
self.password = password
self.chunk_size = 1024*10 # 10 KB
if not os.path.exists(dirnameToSave):
os.makedirs(dirnameToSave, exist_ok=True)
else:
pass
self.dirnameToSave = dirnameToSave
self.downloadSemaphore = semaphore
self.urlsToDownload = urlsToDownload
async def download(self,
client:aioftp.Client,
toFileName:Path,
url:str):
self.downloadSemaphore.acquire()
try:
currentThread = threading.current_thread()
print("Process: {0} - Thread {1}".format(getpid(), currentThread.name) +
" is Downloading: ", toFileName.name)
if not os.path.exists(toFileName):
async with aioftp.pathio.AsyncPathIO().open(toFileName, mode="wb") as file:
async with client.download_stream(url) as stream:
async for block in stream.iter_by_block(self.chunk_size):
await file.write(block)
else:
pass
# print("File {0} already downloaded".format(toFileName))
except aioftp.errors.StatusCodeError:
print("Download abortado")
# deleting aborted file that was partially downloaded
os.remove(toFileName)
finally:
self.downloadSemaphore.release()
async def downloadURLFiles(self):
try:
while not self.urlsToDownload.empty():
QueueElement = self.urlsToDownload.get(block=False)
if QueueElement:
toFileName, url = QueueElement
await self.download(self.client, toFileName, url)
self.urlsToDownload.task_done()
else:
break
except QueueEmptyException:
print('Queue empty')
finally:
self.urlsToDownload.put(None)
async def run(self):
try:
await self.downloadURLFiles()
except (KeyboardInterrupt, RuntimeError):
pass
class multiThreadDownloadManager:
def __init__(self,
semaphore: Semaphore,
maxDownloadsPerTime: int,
serverFTP = 'ftp.star.nesdis.noaa.gov',
remoteDirName = r"/pub/sod/mecb/crw/data/5km/v3.1/nc/v1.0/daily/sst/",
connectionsPerThread = 10,
maxThreadsPerTime=10):
self.semaphore = semaphore
self.maxDownloadsPerTime = maxDownloadsPerTime
self.serverFTP = serverFTP
self.remoteDirName = remoteDirName
self.connectionsPerThread = connectionsPerThread
self.maxThreadsPerTime = maxThreadsPerTime
# Post Init
self.urlsToDownload = getUrlsFromFTP(self.serverFTP,
self.remoteDirName
)
self.dirnameToSave = os.path.join(get_parents(1),
"Downloads")
self.semaphore = Semaphore(self.connectionsPerThread * self.maxThreadsPerTime)
self.QueueOfUrlsToDownload = listToQueue(self.urlsToDownload,
self.dirnameToSave)
self.client = aioftp.Client()
print("Data will be saved in: ", self.dirnameToSave)
async def _download(self):
try:
Downloader = FTPFileDownloader(client=self.client,
user="anonimous",
password="",
dirnameToSave = self.dirnameToSave,
urlsToDownload=self.QueueOfUrlsToDownload,
semaphore=self.semaphore)
await Downloader.run()
except ConnectionResetError:
print("Connection was lost")
del Downloader
async def download(self,
executor: concurrent.futures.ThreadPoolExecutor,
loop: asyncio.windows_events._WindowsSelectorEventLoop):
try:
threads = []
await self.client.connect(self.serverFTP)
await self.client.login(# user=self.user,
# password=self.password
)
for i in range(self.maxThreadsPerTime):
coRoutine = self._download()
target = lambda: loop.run_in_executor(executor,
asyncio.run(coRoutine)
)
t = Thread(target=target,
name="Thread_{0}".format(i+1))
t.start()
threads.append(t)
for t in threads:
t.join()
except ConnectionResetError:
print("Connection was lost")
finally:
self.client.close()
def main(maxDownloadsPerTime: int=50,
maxThreadsPerTime: int=10):
semaphore = Semaphore(maxDownloadsPerTime * maxThreadsPerTime)
manager = multiThreadDownloadManager(semaphore,
maxDownloadsPerTime,
maxThreadsPerTime=maxThreadsPerTime
)
loop = asyncio.get_event_loop()
try:
executor = concurrent.futures.ThreadPoolExecutor(max_workers=maxDownloadsPerTime)
mainCoRoutine = manager.download(executor, loop)
loop.run_until_complete(mainCoRoutine)
except KeyboardInterrupt:
pass
while not manager.QueueOfUrlsToDownload.empty():
pass
try:
loop.stop()
loop.close()
except RuntimeError:
print("Cannot close a running Event Loop")
print("The list of URLS have been fully downloaded")
if __name__ == "__main__":
nest_asyncio.apply()
main()
Acknowledgments
Finally, I would like to thank @Ermaure for his/her advices and guidance.
Sincerely,
Answered By - Philipe Riskalla Leal
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.