Issue
We all know that using asyncio substantially improves the performance of a socket server, and obviously things get even more awesome if we could take advantage of all cores in our cpu (maybe via multiprocessing module or os.fork()
etc.)
I'm now trying to build a multicore socket server demo, with a asynchronous socket server listening on each core and all binding to one port. Simply by creating a async server and then use os.fork()
, let processes work competitively.
However the single-core-fine code runs into some trouble when I'm trying to fork. Seems like there's some problem with registering same filedescriptors from different processes in epoll selector module.
I'm showing some code below, can anyone help me out?
Here's a simple, logically clear code of echo server using asyncio:
import os
import asyncio #,uvloop
from socket import *
# hendler sends back incoming message directly
async def handler(loop, client):
with client:
while True:
data = await loop.sock_recv(client, 64)
if not data:
break
await loop.sock_sendall(client, data)
# create tcp server
async def create_server(loop):
sock = socket(AF_INET ,SOCK_STREAM)
sock.setsockopt(SOL_SOCKET , SO_REUSEADDR ,1)
sock.bind(('',25000))
sock.listen()
sock.setblocking(False)
return sock
# whenever accept a request, create a handler task in eventloop
async def serving(loop, sock):
while True:
client ,addr = await loop.sock_accept(sock)
loop.create_task(handler(loop ,client))
loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))
loop.create_task(serving(loop, sock))
loop.run_forever()
It works fine until I'm trying to fork, after the socket was bounl and before server starts serving. (This logic works fine in synchronous -- threading based code.)
When I'm trying this:
loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))
from multiprocessing import cpu_count
for num in range(cpu_count() - 1):
pid = os.fork()
if pid <= 0: # fork process as the same number as
break # my cpu cores
loop.create_task(serving(loop, sock))
loop.run_forever()
Theoretically forked process are bounl to a same socket? And run in a same event loop? then work just fine?
However I'm getting these error messages:
Task exception was never retrieved
future: <Task finished coro=<serving() done, defined at /home/new/LinuxDemo/temp1.py:21> exception=FileExistsError(17, 'File exists')>
Traceback (most recent call last):
File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 262, in _add_reader
key = self._selector.get_key(fd)
File "/usr/local/lib/python3.7/selectors.py", line 192, in get_key
raise KeyError("{!r} is not registered".format(fileobj)) from None
KeyError: '6 is not registered'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/test/temp1.py", line 23, in serving
client ,addr = await loop.sock_accept(sock)
File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 525, in sock_accept
self._sock_accept(fut, False, sock)
File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 538, in _sock_accept
self.add_reader(fd, self._sock_accept, fut, True, sock)
File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 335, in add_reader
return self._add_reader(fd, callback, *args)
File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 265, in _add_reader
(handle, None))
File "/usr/local/lib/python3.7/selectors.py", line 359, in register
self._selector.register(key.fd, poller_events)
FileExistsError: [Errno 17] File exists
Python version 3.7.3,
I'm totally confused about what's going on.
Could anybody help? thanks
Solution
According to the tracker issue, it is not supported to fork an existing asyncio event loop and attempt to use it from multiple processes. However, according to Yury's comment on the same issue, multi-processing can be implemented by forking before starting a loop, therefore running fully independent asyncio loops in each child.
Your code actually confirms this possibility: while create_server
is async def
, it doesn't await anything, nor does it use the loop
argument. So we can implement Yury's approach by by making create_server
a regular function, removing the loop
argument, and calling it before os.fork()
, and only running event loops after forking:
import os, asyncio, socket, multiprocessing
async def handler(loop, client):
with client:
while True:
data = await loop.sock_recv(client, 64)
if not data:
break
await loop.sock_sendall(client, data)
# create tcp server
def create_server():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', 25000))
sock.listen()
sock.setblocking(False)
return sock
# whenever accept a request ,create a handler task in eventloop
async def serving(loop, sock):
while True:
client, addr = await loop.sock_accept(sock)
loop.create_task(handler(loop, client))
sock = create_server()
for num in range(multiprocessing.cpu_count() - 1):
pid = os.fork()
if pid <= 0: # fork process as the same number as
break # my cpu cores
loop = asyncio.get_event_loop()
loop.create_task(serving(loop, sock))
loop.run_forever()
Answered By - user4815162342
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.