Issue
I tried "everything" but with no luck... probably am I misusing/misunderstanding asyncio.
[my goal/use case]
Push games from 1 computer (1Gb nic) to 16 xboxs (100Mb nic) in chunks of 10 (to take advantage of the 1:10 bandwidth ratio)
[my problem]
My code runs synchronously on a per-file/per-host basis:
- File1 pushed to xbox1... then xbox2...
- then File2 pushed to xbox1... then xbox2
[testing setup]
- 1 FTP client
- 1 FTP server with 2 ≠ users
[my code]
The main challenge I think I am facing is with the ftp_push
function looping over itself and dealing with await.
#! /usr/bin/env python3
import asyncio
import os
from ftplib import FTP
from tqdm import tqdm
game_dir = "~/360/Games/halo_3"
ftp_host = os.environ['ftp_host']
ftp_pass = os.environ['ftp_pass']
ftp_users = ["xbox1", "xbox2"]
def get_dir_size(path: str) -> int:
"""
return path size in bytes
"""
if not os.path.isdir(path):
print(f"Error: {path} is not a directory")
exit(1)
size = 0
with os.scandir(path) as x:
for entry in x:
if entry.is_file():
size += entry.stat().st_size
elif entry.is_dir():
size += get_dir_size(entry.path)
return size
async def ftp_login(host, user, passwd):
ftp = FTP(host)
ftp.login(user=user, passwd=passwd)
print(ftp.getwelcome())
return ftp
async def ftp_push(ftp_tqdm, path):
ftp = ftp_tqdm[0]
tqdm = ftp_tqdm[1]
with os.scandir(path) as x:
for entry in x:
if entry.is_file():
ftp.storbinary(f'STOR {os.path.basename(entry.path)}', open(entry.path, 'rb'), 2048, callback = lambda sent: tqdm.update(len(sent)))
if entry.is_dir():
ftp.mkd(os.path.basename(entry.path))
ftp.cwd(os.path.basename(entry.path))
await ftp_push(ftp_tqdm, entry.path)
ftp.cwd('..')
await asyncio.sleep(1)
async def main():
dir_size = get_dir_size(game_dir)
# create FTP instances (1 per xbox)
ftps_tasks = await asyncio.wait([ftp_login(ftp_host, ftp_user, ftp_pass) for ftp_user in ftp_users])
ftps = [i.result() for task in ftps_tasks for i in task]
# create TQDM instances (1 per xbox)
tqdms = [tqdm(unit = 'B', unit_scale = True, leave = False, miniters = 1, desc = ftp_suer, total = dir_size) for ftp_suer in ftp_users]
# zip FTP & TQDM instances
ftps_tqdms = list(zip(ftps, tqdms))
# trigger ftp push
await asyncio.wait([ftp_push(ftp_tqdm, game_dir) for ftp_tqdm in ftps_tqdms])
if __name__ == "__main__":
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
except Exception as e:
print (e)
How to make this code push data in //?
Thank you for your precious help! Cheers!
Solution
[solved]
Hi,
Again thank you @lord_haffi for taking the time to answer. I ended up changing strategy using Python threads.
#! /usr/bin/env python3
import json
from live_xboxs import LiveXboxs
from ftp import Ftp
from multiprocessing.pool import ThreadPool
from toolbox import Toolbox
from tqdm import tqdm
import os
def starmap_ftp_push(ftp, path, xbox_tqdm, dir_tqdm):
with os.scandir(path) as x:
ls = ftp.nlst()
for entry in x:
if entry.name in ls:
print(f"{xbox_tqdm.desc} => {entry.name} ({os.path.basename(path)}) exists in {ftp.pwd()}")
else:
if entry.is_file():
ftp.storbinary(f'STOR {os.path.basename(entry.path)}', open(entry.path, 'rb'), 2048, callback = lambda sent: Toolbox.tqdm_update(xbox_tqdm, dir_tqdm, sent))
if entry.is_dir():
ftp.mkd(os.path.basename(entry.path))
ftp.cwd(os.path.basename(entry.path))
starmap_ftp_push(ftp, entry.path, xbox_tqdm, dir_tqdm)
ftp.cwd('..')
def push_games(Ftps, live_xboxs):
"""
Ftp push games in // to all alive xbox (chunks of max 10 xboxs)
"""
print(f" push games to {GAMES_CWD}")
threads = len(live_xboxs) if len(live_xboxs) <= 10 else 10
with ThreadPool(threads) as pool:
games_size = sum([Toolbox.get_dir_size(game) for game in GAMES])
xbox_tqdms = [tqdm(unit = 'B', unit_scale = True, leave = False, miniters = 1, desc = xbox['name'], total = games_size, position = idx) for idx, xbox in enumerate(live_xboxs)]
for game in GAMES:
# Create tqdm instance (1 per game)
game_size = Toolbox.get_dir_size(game)
dir_tqdm = [tqdm(unit = 'B', unit_scale = True, leave = False, miniters = 1, desc = f"{xbox_tqdm.desc}/{game}", total = game_size, position = idx) for idx, xbox_tqdm in enumerate(xbox_tqdms, start=(len(xbox_tqdms)+1))]
# Create a list of 1 game_dir the size of the ftps & tqdm lists
game_path = [game for x in live_xboxs]
ftps = [ftp.ftp for ftp in Ftps]
args = list(zip(ftps, game_path, xbox_tqdms, dir_tqdm))
[Ftp.ftp.cwd(GAMES_CWD) for Ftp in Ftps]
pool.starmap(starmap_ftp_push, args)
# FTP login to each xbox
Ftps = [Ftp(xbox['ip'], xbox['name']) for xbox in live_xboxs]
# Push games
push_games(Ftps, live_xboxs)
Answered By - aymericpineau
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.