Some file read (readlines()) functions in Python
copy the file contents to memory (as a list)I need to process a file that's too large to
be copied in memory and as such need to use a file pointer (to access the file one byte at a time) -- as in C getc().The additional requirement I have is that
I'd like to rewind the file pointer to previous bytes like in C ungetc().Is there a way to do this in Python?
Also, in Python, I can read one line a开发者_JAVA百科t a
time with readline()Is there a way to read the previous line
going backward?You do not need file pointers, which Python does not have or want.
To go through a file line by line without reading the whole thing into memory, just iterate over the file object itself, i.e.
with open(filename, "r") as f: for line in f: ...
Using
readlines
is generally to be avoided.Going back a line isn't something you can do super-easily. If you never need to go back more than one line, check out the
pairwise
recipe in theitertools
documentation.
OK, here's what I came up with. Thanks Brenda for the idea of building a class.
Thanks Josh for the idea to use C like functions seek() and read()
#!/bin/python
# Usage: BufRead.py inputfile
import sys, os, string
from inspect import currentframe
# Debug function usage
#
# if DEBUG:
# debugLogMsg(currentframe().f_lineno,currentframe().f_code.co_filename)
# print ...
def debugLogMsg(line,file,msg=""):
print "%s:%s %s" % (file,line,msg)
# Set DEBUG off.
DEBUG = 0
class BufRead:
def __init__(self,filename):
self.__filename = filename
self.__file = open(self.__filename,'rb')
self.__fileposition = self.__file.tell()
self.__file.seek(0, os.SEEK_END)
self.__filesize = self.__file.tell()
self.__file.seek(self.__fileposition, os.SEEK_SET)
def close(self):
if self.__file is not None:
self.__file.close()
self.__file = None
def seekstart(self):
if self.__file == None:
self.__file.seek(0, os.SEEK_SET)
self.__fileposition = self.__file.tell()
def seekend(self):
if self.__file == None:
self.__file.seek(-1, os.SEEK_END)
self.__fileposition = self.__file.tell()
def getc(self):
if self.__file == None:
return None
self.__fileposition = self.__file.tell()
if self.__fileposition < self.__filesize:
byte = self.__file.read(1)
self.__fileposition = self.__file.tell()
return byte
else:
return None
def ungetc(self):
if self.__file == None:
return None
self.__fileposition = self.__file.tell()
if self.__fileposition > 0:
self.__fileposition = self.__fileposition - 1
self.__file.seek(self.__fileposition, os.SEEK_SET)
byte = self.__file.read(1)
self.__file.seek(self.__fileposition, os.SEEK_SET)
return byte
else:
return None
# uses getc() and ungetc()
def getline(self):
if self.__file == None:
return None
self.__fileposition = self.__file.tell()
if self.__fileposition < self.__filesize:
startOfLine = False
line = ""
while True:
if self.__fileposition == 0:
startOfLine = True
break
else:
c = self.ungetc()
if c == '\n':
c = self.getc()
startOfLine = True
break
if startOfLine:
c = self.getc()
if c == '\n':
return '\n'
else:
self.ungetc()
while True:
c = self.getc()
if c == '\n':
line += c
c = self.getc()
if c == None:
return line
if c == '\n':
self.ungetc()
return line
elif c == None:
return line
else:
line += c
else:
return None
# uses getc() and ungetc()
def ungetline(self):
if self.__file == None:
return None
self.__fileposition = self.__file.tell()
if self.__fileposition > 0:
endOfLine = False
line = ""
while True:
if self.__fileposition == self.__filesize:
endOfLine = True
break
else:
c = self.getc()
if c == '\n':
c = self.ungetc()
endOfLine = True
break
if endOfLine:
c = self.ungetc()
if c == '\n':
return '\n'
else:
self.getc()
while True:
c = self.ungetc()
if c == None:
return line
if c == '\n':
line += c
c = self.ungetc()
if c == None:
return line
if c == '\n':
self.getc()
return line
elif c == None:
return line
else:
line = c + line
else:
return None
def main():
if len(sys.argv) == 2:
print sys.argv[1]
b = BufRead(sys.argv[1])
sys.stdout.write(
'----------------------------------\n' \
'- TESTING GETC \n' \
'----------------------------------\n')
while True:
c = b.getc()
if c == None:
sys.stdout.write('\n')
break
else:
sys.stdout.write(c)
sys.stdout.write(
'----------------------------------\n' \
'- TESTING UNGETC \n' \
'----------------------------------\n')
while True:
c = b.ungetc()
if c == None:
sys.stdout.write('\n')
break
else:
sys.stdout.write(c)
sys.stdout.write(
'----------------------------------\n' \
'- TESTING GETLINE \n' \
'----------------------------------\n')
b.seekstart()
while True:
line = b.getline()
if line == None:
sys.stdout.write('\n')
break
else:
sys.stdout.write(line)
sys.stdout.write(
'----------------------------------\n' \
'- TESTING UNGETLINE \n' \
'----------------------------------\n')
b.seekend()
while True:
line = b.ungetline()
if line == None:
sys.stdout.write('\n')
break
else:
sys.stdout.write(line)
b.close()
if __name__=="__main__": main()
If you do want to use a file pointer directly (I think Mike Graham's suggestion is better though), you can use the file object's seek() method which lets you set the internal pointer, combined with the read() method, which support an option argument specifying how many bytes you'd like to read.
Write a class the reads and buffers input for you, and implement ungetc on it -- something like this perhaps (warning: untested, written while compiling):
class BufRead:
def __init__(self,filename):
self.filename = filename
self.fn = open(filename,'rb')
self.buffer = []
self.bufsize = 256
self.ready = True
def close(self):
if self.fn is not None:
self.fn.close()
self.fn = None
self.ready = False
def read(self,size=1):
l = len(self.buffer)
if not self.ready: return None
if l <= size:
s = self.buffer[:size]
self.buffer = self.buffer[size:]
return s
s = self.buffer
size = size - l
self.buffer = self.fn.read(min(self.bufsize,size))
if self.buffer is None or len(self.buffer) == 0:
self.ready = False
return s
return s + self.read(size)
def ungetc(self,ch):
if self.buffer is None:
self.buffer = [ch]
else:
self.buffer.append(ch)
self.ready = True
I don't want to do billions of unbuffered single char file reads plus I wanted a way
to debug the position of the file pointer. Hence, I resolved to return the file position
in addition to a char or line and to use mmap to map the file to memory. (and let mmap
handle paging) I think this will be a bit of a problem if the file is really, really big.
(as in larger than the amount of physical memory) That's when mmap would start going into
the virtual memory and things could get really slow. For now, it processes a 50 MB file in about 4 min.
import sys, os, string, re, time
from mmap import mmap
class StreamReaderDb:
def __init__(self,stream):
self.__stream = mmap(stream.fileno(), os.path.getsize(stream.name))
self.__streamPosition = self.__stream.tell()
self.__stream.seek(0 , os.SEEK_END)
self.__streamSize = self.__stream.tell()
self.__stream.seek(self.__streamPosition, os.SEEK_SET)
def setStreamPositionDb(self,streamPosition):
if self.__stream == None:
return None
self.__streamPosition = streamPosition
self.__stream.seek(self.__streamPosition, os.SEEK_SET)
def streamPositionDb(self):
if self.__stream == None:
return None
return self.__streamPosition
def streamSize(self):
if self.__stream == None:
return None
return self.__streamSize
def close(self):
if self.__stream is not None:
self.__stream.close()
self.__stream = None
def seekStart(self):
if self.__stream == None:
return None
self.setStreamPositionDb(0)
def seekEnd(self):
if self.__stream == None:
return None
self.__stream.seek(-1, os.SEEK_END)
self.setStreamPositionDb(self.__stream.tell())
def getcDb(self):
if self.__stream == None:
return None,None
self.setStreamPositionDb(self.__stream.tell())
if self.streamPositionDb() < self.streamSize():
byte = self.__stream.read(1)
self.setStreamPositionDb(self.__stream.tell())
return byte,self.streamPositionDb()
else:
return None,self.streamPositionDb()
def unGetcDb(self):
if self.__stream == None:
return None,None
self.setStreamPositionDb(self.__stream.tell())
if self.streamPositionDb() > 0:
self.setStreamPositionDb(self.streamPositionDb() - 1)
byte = self.__stream.read(1)
self.__stream.seek(self.streamPositionDb(), os.SEEK_SET)
return byte,self.streamPositionDb()
else:
return None,self.streamPositionDb()
def seekLineStartDb(self):
if self.__stream == None:
return None
self.setStreamPositionDb(self.__stream.tell())
if self.streamPositionDb() < self.streamSize():
# Back up to the start of the line
while True:
if self.streamPositionDb() == 0:
return self.streamPositionDb()
else:
c,fp = self.unGetcDb()
if c == '\n':
c,fp = self.getcDb()
return fp
else:
return None
def seekPrevLineEndDb(self):
if self.__stream == None:
return None
self.setStreamPositionDb(self.__stream.tell())
if self.streamPositionDb() < self.streamSize():
# Back up to the start of the line
while True:
if self.streamPositionDb() == 0:
return self.streamPositionDb()
else:
c,fp = self.unGetcDb()
if c == '\n':
return fp
else:
return None
def seekPrevLineStartDb(self):
if self.__stream == None:
return None
self.setStreamPositionDb(self.__stream.tell())
if self.streamPositionDb() < self.streamSize():
# Back up to the start of the line
while True:
if self.streamPositionDb() == 0:
return self.streamPositionDb()
else:
c,fp = self.unGetcDb()
if c == '\n':
return self.seekLineStartDb()
else:
return None
def seekLineEndDb(self):
if self.__stream == None:
return None
self.setStreamPositionDb(self.__stream.tell())
if self.streamPositionDb() > 0:
while True:
if self.streamPositionDb() == self.streamSize():
return self.streamPositionDb()
else:
c,fp = self.getcDb()
if c == '\n':
c,fp = self.unGetcDb()
return fp
else:
return None
def seekNextLineEndDb(self):
if self.__stream == None:
return None
self.setStreamPositionDb(self.__stream.tell())
if self.streamPositionDb() > 0:
while True:
if self.streamPositionDb() == self.streamSize():
return self.streamPositionDb()
else:
c,fp = self.getcDb()
if c == '\n':
return fp
else:
return None
def seekNextLineStartDb(self):
if self.__stream == None:
return None
self.setStreamPositionDb(self.__stream.tell())
if self.streamPositionDb() > 0:
while True:
if self.streamPositionDb() == self.streamSize():
return self.streamPositionDb()
else:
c,fp = self.getcDb()
if c == '\n':
return self.seekLineStartDb()
else:
return None
# uses getc() and ungetc()
def getLineDb(self):
if self.__stream == None:
return None,None
self.setStreamPositionDb(self.__stream.tell())
line = ""
if self.seekLineStartDb() != None:
c,fp = self.getcDb()
if c == '\n':
return c,self.streamPositionDb()
else:
self.unGetcDb()
while True:
c,fp = self.getcDb()
if c == '\n':
line += c
c,fp = self.getcDb()
if c == None:
return line,self.streamPositionDb()
self.unGetcDb()
return line,self.streamPositionDb()
elif c == None:
return line,self.streamPositionDb()
else:
line += c
else:
return None,self.streamPositionDb()
# uses getc() and ungetc()
def unGetLineDb(self):
if self.__stream == None:
return None,None
self.setStreamPositionDb(self.__stream.tell())
line = ""
if self.seekLineEndDb() != None:
c,fp = self.unGetcDb()
if c == '\n':
return c,self.streamPositionDb()
else:
self.getcDb()
while True:
c,fp = self.unGetcDb()
if c == None:
return line,self.streamPositionDb()
if c == '\n':
line += c
c,fp = self.unGetcDb()
if c == None:
return line,self.streamPositionDb()
self.getcDb()
return line,self.streamPositionDb()
elif c == None:
return line,self.streamPositionDb()
else:
line = c + line
else:
return None,self.streamPositionDb()
The question was initially prompted by my need to build a lexical analyzer.
getc() and ungetc() are useful at first (to get the read bugs out the way and
to build the state machine) After the state machine is done,
getc() and ungetc() become a liability as they take too long to read
directly from storage.
When the state machine was complete (debugged any IO problems,
finalized the states), I optimized the lexical analyzer.
Reading the source file in chunks (or pages) into memory and running
the state machine on each page yields the best time result.
I found that considerable time is saved if getc() and ungetc() are not used
to read from the file directly.
精彩评论