I have a 9GB text file of tweets in the following format:
T 'time and date'
U 'name of user in the form of a URL'
W Actual tweet
There are in total 6,000,000 users and more than 60,000,000 tweets. I read 3 lines at a time using itertools.izip() and then according to the name, write it into a file. But its taking way too long (26 hours and counting). How can this be made faster?
Posting code for completeness,
s='the existing folder which will have all the files'
with open('path to file') as f:
for line1,line2,line3 in itertools.izip_longest(*[f]*3):
if(line1!='\n' and line2!='\n' and line3!='\n'):
line1=line1.split('\t')
line2=line2.split('\t')
line3=line3.split('\t')
if(not(re.search(r'No Post Title',line1[1]))):
url=urlparse(line3[1].strip('\n')).path.strip('/')
if(url==''):
file=open(s+'junk','a')
file.write(line1[1])
file.close()
else:
file=open(s+url,'a')
file.write(line1[1])
file.close()
My aim is to use topic modeling on the small texts (as in, running lda on all the tweets of one user, thus requiring a separate file for each user), but its taking way too much time.
UPDATE: I used the suggestions by user S.Lott and used the following code :
import re
from urlparse import urlparse
import os
def getUser(result):
result=result.split('\n')
u,w=result[0],result[1]
path=urlparse(u).path.strip('/')
if(path==''):
f=open('path to junk','a')
f.write('its Junk !!')
f.close()
else:
result="{0}\n{1}\n{2}\n".format(u,w,path)
writeIntoFile(result)
def writeIntoFile(result):
tweet=result.split('\n')
users = {}
directory='path to directory'
u, w, user = tweet[0],tweet[1],tweet[2]
if user not in users :
if(os.path.isfile(some_directory+user)):
开发者_如何学编程 if(len(users)>64):
lru,aFile,u=min(users.values())
aFile.close()
users.pop(u)
users[user]=open(some_directory+user,'a')
users[user].write(w+'\n')
#users[user].flush
elif (not(os.path.isfile(some_directory+user))):
if len(users)>64:
lru,aFile,u=min(users.values())
aFile.close()
users.pop(u)
users[user]=open(some_directory+user,'w')
users[user].write(w+'\n')
for u in users:
users[u].close()
import sys
s=open(sys.argv[1],'r')
tweet={}
for l in s:
r_type,content=l.split('\t')
if r_type in tweet:
u,w=tweet.get('U',''),tweet.get('W','')
if(not(re.search(r'No Post Title',u))):
result="{0}{1}".format(u,w)
getUser(result)
tweet={}
tweet[r_type]=content
Obviously, it is pretty much a mirror of what he suggested and kindly shared too. Initially the speed was very fast but then it has got slower . I have posted the updated code so that i can get some more suggestions on how it could have been made faster. If i was reading from sys.stdin, then there was an import error which could not be resolved by me. So, to save time and get on with it, i simply used this , hoping that it works and does so correctly. Thanks.
This is why your OS has multiprocessing pipelines.
collapse.py sometweetfile | filter.py | user_id.py | user_split.py -d some_directory
collapse.py
import sys
with open("source","r") as theFile:
tweet = {}
for line in theFile:
rec_type, content = line.split('\t')
if rec_type in tweet:
t, u, w = tweet.get('T',''), tweet.get('U',''), tweet.get('W','')
result= "{0}\t{1}\t{2}".format( t, u, w )
sys.stdout.write( result )
tweet= {}
tweet[rec_type]= content
t, u, w = tweet.get('T',''), tweet.get('U',''), tweet.get('W','')
result= "{0}\t{1}\t{2}".format( t, u, w )
sys.stdout.write( result )
filter.py
import sys
for tweet in sys.stdin:
t, u, w = tweet.split('\t')
if 'No Post Title' in t:
continue
sys.stdout.write( tweet )
user_id.py
import sys
import urllib
for tweet in sys.stdin:
t, u, w = tweet.split('\t')
path=urlparse(w).path.strip('/')
result= "{0}\t{1}\t{2}\t{3}".format( t, u, w, path )
sys.stdout.write( result )
user_split.py
users = {}
for tweet in sys.stdin:
t, u, w, user = tweet.split('\t')
if user not in users:
# May run afoul of open file limits...
users[user]= open(some_directory+user,"w")
users[user].write( tweet )
users[user].flush( tweet )
for u in users:
users[u].close()
Wow, you say. What a lot of code.
Yes. But. It spreads out among ALL the processing cores you own and it all runs concurrently. Also, when you connect stdout to stdin through a pipe, it's really only a shared buffer: there's no physical I/O occurring.
It's amazingly fast to do things this way. That's why the *Nix operating systems work that way. This is what you need to do for real speed.
The LRU algorithm, FWIW.
if user not in users:
# Only keep a limited number of files open
if len(users) > 64: # or whatever your OS limit is.
lru, aFile, u = min( users.values() )
aFile.close()
users.pop(u)
users[user]= [ tolu, open(some_directory+user,"w"), user ]
tolu += 1
users[user][1].write( tweet )
users[user][1].flush() # may not be necessary
users[user][0]= tolu
You spend most of the time in I/O. Solutions:
- make larger I/O operations, i.e. read into a buffer of let's say 512K and do not write information till you have a buffer of 256K at least.
- avoid doing file open and close as much as possible
- use several threads to read from the file, i.e. split the file to chunks and give each thread it's own chunk to work on
For such a mass of information, I would use a database (MySQL, PostgreSQL, SQLite, etc.). They are optimized for the kind of things you are doing.
Thus, instead of appending to a file, you could simply add a line to a table (either the junk or the "good" table), with the URL and the associated data (the same URL can be on multiple lines). This would definitely speed up the writing part.
With the current approach, time is lost because the input file is read from one location on your hard drive while you write on many different locations: the head of the hard drive moves back and forth physically, which is slow. Also, creating new files takes time. If you could mostly read from the input file and let a database handle the data caching and disk writing optimization, the processing would undoubtedly be faster.
Not sure if this is going to be faster, just an idea. Your file looks like a csv
with tabs as delimiters. Have you tried createing a CSV
reader?
import csv
reader = csv.reader(open('bigfile'), 'excel-tab')
for line in reader:
process_line()
EDIT: Calling csv.field_size_limit(new_limit)
is pointless here.
You could try creating a dict with the format {url: [lines...]}
, and only writing each file at the end. I suspect that repeatedly opening and closing files is a lot of overhead. How many lines are you writing per file on average? If basically each line is getting its own file then there's not much you can do, except change that requirement :)
On my system at least, almost all of the running time would be spent closing files. Sequential reading and writing is fast, so you can very well make several passes over the data. Here's what I'd do:
- Split the file into as many files as you can open at once, in such a way that
- a user's tweets go to the same file
- you keep track of which files contain more than one user
- Go on splitting the output files, until every file contains only one user
If you can write to 200 files in parallel, after two passes over all the data you'd have 40000 files containing 150 users on average, so after the third pass you'd probably be nearly done.
Here's some code that assumes the file has been preprocessed according to S.Lott's answer (collapse, filter, user_id). Note that it will delete the input file along with other intermediate files.
todo = ['source']
counter = 0
while todo:
infilename = todo.pop()
infile = open(infilename)
users = {}
files = []
filenames = []
for tweet in infile:
t, u, w, user = tweet.split('\t')
if user not in users:
users[user] = len(users) % MAX_FILES
if len(files) < MAX_FILES:
filenames.append(str(counter))
files.append(open(filenames[-1], 'w'))
counter += 1
files[users[user]].write(tweet)
for f in files:
f.close()
if len(users) > MAX_FILES:
todo += filenames[:len(users)-MAX_FILES]
infile.close()
os.remove(infilename)
I think writing line by line is a run-time killer when working with such huge data. You can accelerate it significantly with vectorized operations, i.e. read/write a bunch of lines at once as in this answer here
精彩评论