Issue
Using the answers here as a basis (which use SubprocessProtocol
), I'm simply trying to read from a subprocess and stop reading (and terminate the subprocess) at a point of my choosing (e.g., I've read enough data).
Note that I do want the benefit of using run_until_complete
per another discussion.
I happen to be using Windows, and the example below is using cat
from Cygwin. The actual utility I'm using is just a native windows console application - but one that will stream until it's closed manually.
I can read the data just fine, but my attempts to stop reading and close the subprocess (e.g., calling loop.stop() from within pipe_data_received()
) lead to exceptions (RuntimeError: Event loop is closed
and ValueError: I/O operation on closed pipe
). I'd like to immediately terminate the subprocess gracefully.
I don't think it's so much the platform as it is my not seeing where to properly interrupt things to have the desired effect. Any ideas on how to accomplish this?
My Python 3.7+ code (as modified from the example):
import asyncio
import os
external_program = "cat" # Something that will output to stdio
external_option = "a" # An arbitrarily large amount of data
saved_data = []
class SubprocessProtocol(asyncio.SubprocessProtocol):
def pipe_data_received(self, fd, data):
if fd == 1: # got stdout data (bytes)
data_len = len(data)
print(''.join(' {:02x}'.format(x) for x in data), flush=True)
saved_data.extend(data)
if len(saved_data) > 512: # Stop once we've read this much data
loop.call_soon_threadsafe(loop.stop)
def connection_lost(self, exc):
print("Connection lost")
loop.stop() # end loop.run_forever()
print("START")
if os.name == 'nt':
# On Windows, the ProactorEventLoop is necessary to listen on pipes
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(
loop.subprocess_exec(
SubprocessProtocol,
external_program,
external_option,
)
)
loop.run_forever()
finally:
loop.close()
print("DONE")
loop.close()
Solution
Not an asyncio expert, but something like this should work.
import time
import asyncio
import threading
class SubprocessProtocol(asyncio.SubprocessProtocol):
def __init__(self, loop):
self.transport = None
self.loop = loop
def pipe_data_received(self, fd, data):
print('data received')
def connection_lost(self, exc):
print("Connection lost")
def connection_made(self, transport):
print("Connection made")
self.transport = transport
# becasue calc won't call pipe_data_received method.
t = threading.Thread(target=self._endme)
t.setDaemon(True)
t.start()
def _endme(self):
time.sleep(2)
# You'd normally use these inside pipe_data_received, connection_lost methods
self.transport.close()
self.loop.stop()
def main():
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
loop.run_until_complete(loop.subprocess_exec(
lambda: SubprocessProtocol(loop),
'calc.exe'
))
loop.run_forever()
loop.close()
if __name__ == "__main__":
main()
Answered By - Himal
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.