Issue
I have 3 scripts that need to be combined in order to process data in a pipeline. The scripts run forever, until the execution is interrupted by the user. This is how they are executed inside a terminal:
script1_producer.sh | script2_processor.sh | script3_processor.sh
script1_producer.sh
produces the data to be processed (as an example it just prints incrementing numbers)
i=1
while true; do
echo $i
i=$(($i+1))
sleep 1
done
script2_processor.sh
consumes data from Script1 and calculates a new stream of data (multiplying each number*2):
while read -r line
do
echo "$(($line*2))"
done < "${1:-/dev/stdin}"
script3_processor.sh
consumes data from Script2 and calculates a new stream of data (Adding a letter to each number):
while read -r line
do
echo "A$(($line))"
done < "${1:-/dev/stdin}"
The resulting output when running script1_producer.sh | script2_processor.sh | script3_processor.sh
:
A2
A4
A6
...
Now I would like these scripts to be controlled by Python subprocesses using pipes.
In the end I need process the output of script3_processor.sh
and perform operations for each line.
I'm trying to implement this using asyncio though it would be ok not to use asyncio if thats possible.
This is my - very naive attempt process_pipes.py
:
import asyncio
import subprocess
import os
async def async_receive():
p1 = await asyncio.create_subprocess_exec(
"./script1_producer.sh",
stdout=subprocess.PIPE,
)
p2 = await asyncio.create_subprocess_exec(
"./script2_processor.sh",
stdin=p1.stdout,
stdout=subprocess.PIPE,
)
p3 = await asyncio.create_subprocess_exec(
"./script3_processor.sh",
stdin=p2.stdout,
stdout=subprocess.PIPE,
)
# Read just one line to test
data = await p3.stdout.readline()
print(data)
asyncio.run(async_receive())
Unfortunately, I'm getting the following exception when executing this script:
Traceback (most recent call last):
File "process_pipes.py", line 28, in <module>
asyncio.run(async_receive())
File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "process_pipes.py", line 12, in async_receive
p2 = await asyncio.create_subprocess_exec(
File "/usr/lib/python3.8/asyncio/subprocess.py", line 236, in create_subprocess_exec
transport, protocol = await loop.subprocess_exec(
File "/usr/lib/python3.8/asyncio/base_events.py", line 1630, in subprocess_exec
transport = await self._make_subprocess_transport(
File "/usr/lib/python3.8/asyncio/unix_events.py", line 197, in _make_subprocess_transport
transp = _UnixSubprocessTransport(self, protocol, args, shell,
File "/usr/lib/python3.8/asyncio/base_subprocess.py", line 36, in __init__
self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
File "/usr/lib/python3.8/asyncio/unix_events.py", line 789, in _start
self._proc = subprocess.Popen(
File "/usr/lib/python3.8/subprocess.py", line 808, in __init__
errread, errwrite) = self._get_handles(stdin, stdout, stderr)
File "/usr/lib/python3.8/subprocess.py", line 1477, in _get_handles
p2cread = stdin.fileno()
AttributeError: 'StreamReader' object has no attribute 'fileno'
I read some examples on Stackoverflow and elsewhere telling me to handle the pipes differently, but could not get these to work in my scenario.
How can I mimic running script1_producer.sh | script2_processor.sh | script3_processor.sh
and process the output of script3 in Python?
Solution
I found another solution, guiding me by this question:
Before that, one thing to remark, is that the scripts has syntax error given that in lines like this one, echo "$(($line*2))"
, it should be more spaces, something like this echo "$(( $line * 2 ))"
, bash is a little silly about spaces. Apart from that, all good.
One thing to remember here, is that pipes have two ends, one to read and another to write. So in the first process, it would be something like in this sketch:
- Write end(WE)
- Read end(RE)
p0 ---> | pipe 1 | ---> p1
WE RE
You should use a pipe from os
, as describe in the question referred above. This part would be something like this:
read1, write1 = os.pipe()
p0 = await asyncio.create_subprocess_exec(
"./script1_producer.sh",
stdout=write1
)
the stdout would be the WE of the pipe, while for the p1 we have
| pipe 1 | ---> p1 -------> | pipe 2|
WE RE=stdin stdout=WE
the stdin would be the RE of the first pipe, and the stdout the WE of the second pipe, something like this:
read2, write2 = os.pipe()
p2 = await asyncio.create_subprocess_exec(
"./script2_processor.sh",
stdin=read1,
stdout=write2,
)
And in the third process
| pipe 2 | ---> p3 -------> | asyncio PIPE|
WE RE=stdin stdout=WE
Joining all together we have
import asyncio
import subprocess
import os
async def async_receive():
read1, write1 = os.pipe()
p0 = await asyncio.create_subprocess_exec(
"./script1_producer.sh",
stdout=write1
)
read2, write2 = os.pipe()
p2 = await asyncio.create_subprocess_exec(
"./script2_processor.sh",
stdin=read1,
stdout=write2,
)
p3 = await asyncio.create_subprocess_exec(
"./script3_processor.sh",
stdin=read2,
stdout=asyncio.subprocess.PIPE,
)
# Read just one line to test
while True:
data = await p3.stdout.readline()
data = data.decode('ascii').rstrip()
print(data)
print("Sleeping 1 sec...")
await asyncio.sleep(1)
asyncio.run(async_receive())
In this way you still could use asyncio.
Answered By - Gealber
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.