开发者

Repeatedly write to stdin and read from stdout of a process from python

开发者 https://www.devze.com 2023-01-12 10:42 出处:网络
I have a piece of fortran code that reads some numbers from STDIN and writes results to STDOUT. For example:

I have a piece of fortran code that reads some numbers from STDIN and writes results to STDOUT. For example:

do
  read (*,*) x
  y = x*x
  write (*,*) y
enddo

So I can start the program from a shell and get the following sequence of inputs/outputs:

5.0

25.0

2.5

6.25

Now I need to do this from within python. After futilely wrestling with subprocess.Popen and looking through old questions on this site, I decided to use pexpect.spawn:

import pexpect, os
p = pexpect.spawn('squarer')
p.setecho(False)
p.write("2.5" + os.linesep)
res = p.readline()

and it works. The problem is, the real data I need to pass between python and my fortran program is an array of 100,000 (or more) double precision floats. If they're contained in an array called x, then

p.write(' '.join(["%.10f"%k for k in x]) + os.linesep)

times out with the following error message from pexpect:

buffer (last 100 chars):   
before (last 100 chars):   
after: <class 'pexpect.TIMEOUT'>  
match: None  
match_index: None  
exitstatus: None
flag_eof: False
pid: 8574
child_fd: 3
closed: False
timeout: 30
delimiter: <class 'pexpect.EOF'>
logfile: None
logfile_read: None
logfile_send: None
maxread: 2000
ignorecase: False
searchwindowsize: None
delaybeforesend: 0.05
delayafterclose: 0.1
delayafterterminate: 0.1

unless x has less than 303 elements. Is there a way to p开发者_StackOverflowass large amounts of data to/from STDIN/STDOUT of another program?

I have tried splitting the data into smaller chunks, but then I lose a lot in speed.

Thanks in advance.


Found a solution using the subprocess module, so I'm posting it here for reference if anyone needs to do the same thing.

import subprocess as sbp

class ExternalProg:

    def __init__(self, arg_list):
        self.opt = sbp.Popen(arg_list, stdin=sbp.PIPE, stdout=sbp.PIPE, shell=True, close_fds=True)

    def toString(self,x):
        return ' '.join(["%.12f"%k for k in x])

    def toFloat(self,x):
        return float64(x.strip().split())

    def sendString(self,string):
        if not string.endswith('\n'):
            string = string + '\n'
        self.opt.stdin.write(string)

    def sendArray(self,x):
        self.opt.stdin.write(self.toString(x)+'\n')

    def readInt(self):
        return int(self.opt.stdout.readline().strip())

    def sendScalar(self,x):
        if type(x) == int:
            self.opt.stdin.write("%i\n"%x)
        elif type(x) == float:
            self.opt.stdin.write("%.12f\n"%x)

    def readArray(self):
        return self.toFloat(self.opt.stdout.readline())

    def close(self):
        self.opt.kill()

The class is invoked with an external program called 'optimizer' as:

optim = ExternalProg(['./optimizer'])
optim.sendScalar(500) # send the optimizer the length of the state vector, for example
optim.sendArray(init_x) # the initial guess for x
optim.sendArray(init_g) # the initial gradient g
next_x = optim.readArray() # get the next estimate of x
next_g = evaluateGradient(next_x) # calculate gradient at next_x from within python
# repeat until convergence

On the fortran side (the program compiled to give the executable 'optimizer'), a 500-element vector would be read in so:

read(*,*) input_vector(1:500)

and would be written out so:

write(*,'(500f18.11)') output_vector(1:500)

and that's it! I've tested it with state vectors up to 200,000 elements (which is the upper limit of what I need right now). Hope this helps someone other than myself. This solution works with ifort and xlf90, but not with gfortran for some reason I don't understand.


example squarer.py program (it just happens to be in Python, use your Fortran executable):

#!/usr/bin/python
import sys
data= sys.stdin.readline() # expecting lots of data in one line
processed_data= data[-2::-1] # reverse without the newline
sys.stdout.write(processed_data+'\n')

example target.py program:

import thread, Queue
import subprocess as sbp

class Companion(object):
    "A companion process manager"
    def __init__(self, cmdline):
        "Start the companion process"
        self.companion= sbp.Popen(
            cmdline, shell=False,
            stdin=sbp.PIPE,
            stdout=sbp.PIPE)
        self.putque= Queue.Queue()
        self.getque= Queue.Queue()
        thread.start_new_thread(self._sender, (self.putque,))
        thread.start_new_thread(self._receiver, (self.getque,))

    def _sender(self, que):
        "Actually sends the data to the companion process"
        while 1:
            datum= que.get()
            if datum is Ellipsis:
                break
            self.companion.stdin.write(datum)
            if not datum.endswith('\n'):
                self.companion.stdin.write('\n')

    def _receiver(self, que):
        "Actually receives data from the companion process"
        while 1:
            datum= self.companion.stdout.readline()
            que.put(datum)

    def close(self):
        self.putque.put(Ellipsis)

    def send(self, data):
        "Schedule a long line to be sent to the companion process"
        self.putque.put(data)

    def recv(self):
        "Get a long line of output from the companion process"
        return self.getque.get()

def main():
    my_data= '12345678 ' * 5000
    my_companion= Companion(("/usr/bin/python", "squarer.py"))

    my_companion.send(my_data)
    my_answer= my_companion.recv()
    print my_answer[:20] # don't print the long stuff
    # rinse, repeat

    my_companion.close()

if __name__ == "__main__":
    main()

The main function contains the code you will use: setup a Companion object, companion.send a long line of data, companion.recv a line. Repeat as necessary.


Here's a huge simplification: Break your Python into two things.

python source.py | squarer | python sink.py

The squarer application is your Fortran code. Reads from stdin, writes to stdout.

Your source.py is your Python that does

import sys
sys.stdout.write(' '.join(["%.10f"%k for k in x]) + os.linesep)

Or, perhaps something a tiny bit simpler, i.e.

from __future__ import print_function
print( ' '.join(["{0:.10f}".format(k) for k in x]) )

And your sink.py is something like this.

import fileinput
for line in fileinput.input():
    # process the line 

Separating source, squarer and sink gets you 3 separate processes (instead of 2) and will use more cores. More cores == more concurrency == more fun.


I think that you only add one linebreak here:

p.write(' '.join(["%.10f"%k for k in x]) + os.linesep)

instead of adding one per line.


Looks like you're timing out (default timeout, I believe, 30 seconds) because preparing, sending, receiving, and processing that much data is taking a lot of time. Per the docs, timeout= is an optional named parameter to the expect method, which you're not calling -- maybe there's an undocumented way to set the default timeout in the initializer, which could be found by poring over the sources (or, worst case, created by hacking those sources).

If the Fortran program read and saved (say) 100 items at a time, with a prompt, syncing up would become enormously easier. Could you modify your Fortran code for the purpose, or would you rather go for the undocumented / hack approach?

0

精彩评论

暂无评论...
验证码 换一张
取 消