Issue
I'm trying to learn websockets with asyncio in Python. I've implemented a websocket server that receives binary data and outputs to another computer.
The problem is that when that data hits the other computer, the resulting file is huge. With small files (like a .txt file with 2 lines or such) it works well but with large files (around 5 mb and up) the resulting file in the receiver computer is 4gb.
I'm not able to find what is causing this. No matter what I do, sender filesize and receiver filesize never match.
Some code:
FileManager.py
class Manager():
BUFFER_SIZE = 8092
file = None
filesize = None
filename = None
received_file = bytearray()
sent = 0
lock = asyncio.Lock()
secret = None
ws = None
def __init__(self, secret=None, ws: websockets.WebSocketServerProtocol = None):
self.ws = ws
self.secret = secret
def open_file(self, filename, mode):
self.file = open(filename, mode)
def close_file(self):
self.file.close()
async def chunk_sender(self):
async with self.lock:
self.file.seek(self.sent)
bytes_read = self.file.read(self.BUFFER_SIZE)
await self.ws.send(json.dumps({
"cmd": "send",
"key": self.secret,
"data": bytes_read.decode("utf-8")
}))
self.sent += self.BUFFER_SIZE
async def chunk_receiver(self, binary):
async with self.lock:
self.received_file += binary
self.file.write(self.received_file)
perc = ((len(self.received_file) * 100)/self.filesize)
print("\rDownloading file: " + colored(str(round(perc, 2)) + "%", "magenta"), end='', flush=True)
async def start_sending(self):
self.open_file(self.filename, "rb")
spawn = math.ceil(self.filesize / self.BUFFER_SIZE)
tasks = []
for _ in range(spawn):
tasks.append(self.chunk_sender())
pbar = tqdm.tqdm(total=len(tasks), leave=True, mininterval=0)
for process in asyncio.as_completed(tasks):
value = await process
pbar.set_description(value)
pbar.update()
ClientManager.py
import websockets
import json
from termcolor import colored
from classes import File
class Manager:
SERVER_URL = None
filename = None
filesize = 0
secret = None
FileManager = File.Manager()
def __init__(self, SERVER_URL, filename, filesize, secret):
self.SERVER_URL = SERVER_URL
self.filename = filename
self.filesize = filesize
self.secret = secret
# Initialize FileManager
self.FileManager.secret = self.secret
self.FileManager.filesize = self.filesize
self.FileManager.filename = self.filename
async def start_sender(self):
async with websockets.connect(self.SERVER_URL) as ws:
self.FileManager.ws = ws
await ws.send(json.dumps({"cmd": "sender_init", "key": self.secret}))
print("Now in the receiver computer", end=" "), print(
colored("sendpai " + self.secret, "magenta"))
while True:
message = await ws.recv()
deserialized = json.loads(message)
cmd = deserialized["cmd"]
if cmd == "receiver_request":
await self.FileManager.start_sending()
elif cmd == "receiver_init":
await ws.send(json.dumps({"cmd": "file_details", "key": self.secret, "filename": self.filename, "filesize": self.filesize}))
async def start_receiver(self):
async with websockets.connect(self.SERVER_URL) as ws:
self.FileManager.ws = ws
await ws.send(json.dumps({"cmd": "receiver_init", "key": self.secret}))
while True:
message = await ws.recv()
deserialized = json.loads(message)
if "cmd" in deserialized:
cmd = deserialized["cmd"]
if cmd == "send":
if "data" in deserialized:
binary_chunk = bytes(
deserialized["data"], encoding="utf-8")
await self.FileManager.chunk_receiver(binary_chunk)
elif cmd == "file_details":
self.FileManager.filename = deserialized["filename"]
self.FileManager.filesize = deserialized["filesize"]
self.FileManager.open_file("hello", "wb")
await ws.send(json.dumps({"cmd": "receiver_request", "key": self.secret}))
print("[The file is about to be downloaded]")
print(
"filename: " + colored(str(self.FileManager.filename), "green"), end=" ")
print(
"filesize: " + colored(str(self.FileManager.filesize / 1000) + "mb", "yellow"))
Server.py
class Server():
clients = []
clients_lock = threading.Lock()
async def register(self, ws: websockets.WebSocketServerProtocol, key, who) -> None:
with self.clients_lock:
self.clients.append({"key": key, "ws": ws, "who": who})
logging.info(who + f' {ws.remote_address[0]} connects')
async def unregister(self, ws: websockets.WebSocketServerProtocol) -> None:
with self.clients_lock:
for client in self.clients:
if client["ws"] == ws:
del client
logging.info(f'{ws.remote_address[0]} connects')
async def init_event(self, ws: websockets.WebSocketServerProtocol, key: str, who:str) -> None:
await self.register(ws, key, who)
logging.info(f'{ws.remote_address[0]} with key f{key}')
async def receiver_request_event(self, ws: websockets.WebSocketServerProtocol, key: str) -> None:
await self.register(ws, key, "receiver")
for client in self.clients:
if client["key"] == key:
await client["ws"].send(json.dumps({"cmd": "receiver_request"}))
async def send_to_receiver(self, key, message):
for client in self.clients:
if(client["key"] == key and client["who"] == "receiver"):
await client["ws"].send(message)
async def send_to_sender(self, key, message):
for client in self.clients:
if(client["key"] == key and client["who"] == "sender"):
await client["ws"].send(message)
async def ws_handler(self, ws: websockets.WebSocketServerProtocol, uri: str):
try:
async for message in ws:
deserialized = json.loads(message)
cmd = deserialized["cmd"]
key = deserialized["key"]
if cmd == "sender_init":
await self.init_event(ws, key, "sender")
elif cmd == "receiver_request":
await self.receiver_request_event(ws, key)
elif cmd == "send":
await self.send_to_receiver(key, message)
elif cmd == "receiver_init":
await self.init_event(ws, key, "receiver")
await self.send_to_sender(key, message)
elif cmd == "file_details":
await self.send_to_receiver(key, message)
except websockets.exceptions.ConnectionClosed as e:
logging.info("Connection closed")
I've tried to debug my code in search of:
- Sending more chunks than needed
- Server double sending socket messages to receiver
- Maybe encoding issues when sending the data?
One thing I notice is that I needed a lock in the chunk_sender
function, because I was reading from the same pointer a lot of times as the coroutines arrived, that improved things but still have the issue.
Thanks in advance.
Solution
Nevermind, 2 errors:
Client is added to my list twice in the server events:
self.receiver_request_event(ws, key)
self.init_event(ws, key, "receiver")
So, when websocket messages are sent I receive twice.
And, also this, I should write the given binary not the already sent bytes everytime:
async def chunk_receiver(self, binary):
async with self.lock:
self.received_file += binary
self.file.write(self.received_file)
...
Answered By - Carlos Molero
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.