I'm using a 2 processes Pool to parallel parse several log files,
po = Pool(processes=2)
pool_object = po.apply_async(log_parse, (hostgroup_sender_dir, hostname, host_depot_dir, synced_log, prev_last_pos, get_report_rate), )
(curr_last_pos, remote_report_datetime, report_gen_rate) = pool_object.get()
However it's quite slow on the initial run, ~16min for about twelve ~20Mb files.
There won't be much of a problem in the next iterations, considering I'll parse the logs new bytes each 2 or 3 min, but surely there's room for improvement on how I'm doing it on the first run. Would pre-splitting the logs into several lower sized splices (so that pyparse won't have to allocate the entirety of the log at ounce into memory) speed it up?
I'm still running it on a dual core dev VM, but soon will have to migrate to a quad core physical Server (I'll try to get an extra quad-core CPU) and it may need to be able to manage ~50 logs.
A splice from the log,
log_splice = """
# XX_MAIN (23143) Report at 2011-08-30 20:00:00.003 Type: Periodic #
# Report number 1790 State: Active #
################################################################################
# Running since : 2011-08-12 04:40:06.153 #
# Total execution time : 18 day(s) 15:19:53.850 #
# Last report date : 2011-08-30 19:45:00.002 #
# Time since last periodic report: 0 day(s) 00:15:00.000 #
################################################################################
----------------------------------------------------
| Periodic | Global |
----------------------------|-----------------------|--------------------------|
Simultaneous Accesses | Curr Max Cumulative | Max Cumulative |
--------------------------- | ---- ---- ---------- | ---- ------------- |
Accesses | 1 5 - | 180 - |
- in start/stop state | 1 5 12736 | 180 16314223 |
-------------------------------------------------------------------------------|
Accesses per Second | Max Occurr. Date | Max Occurrence Date |
--------------------------- | ------ -------------- | ------ --------------- |
Accesses per second | 21.00 08-30 19:52:33 | 40.04 08-16 20:19:18 |
-------------------------------------------------------------------------------|
Service Statistics 开发者_如何学Go | Success Total % | Success Total % |
--------------------------- | -------- -------- --- | --------- ---------- --- |
Services accepted accesses | 17926 17927 99 | 21635954 21637230 -98 |
- 98: NF | 7546 7546 100 | 10992492 10992492 100 |
- 99: XFC | 10380 10380 100 | 10643462 10643462 100 |
----------------------------------------------------------------------------- |
Services succ. terminations | 12736 12736 100 | 16311566 16314222 99 |
- 98: NF | 7547 7547 100 | 10991401 10992492 99 |
- 99: XFC | 5189 5189 100 | 5320165 5321730 99 |
----------------------------------------------------------------------------- |
"""
using pyparse,
unparsed_log_data = input_log.read()
#------------------------------------------------------------------------
# Define Grammars
#------------------------------------------------------------------------
integer = Word( nums )
# XX_MAIN ( 4801) Report at 2010-01-25 06:55:00
binary_name = "# XX_MAIN"
pid = "(" + Word(nums) + ")"
report_id = Suppress(binary_name) + Suppress(pid)
# Word as a contiguous set of characters found in the string nums
year = Word(nums, max=4)
month = Word(nums, max=2)
day = Word(nums, max=2)
# 2010-01-25 grammar
yearly_day_bnf = Combine(year + "-" + month + "-" + day)
# 06:55:00. grammar
clock24h_bnf = Combine(Word(nums, max=2) + ":" + Word(nums, max=2) + ":" + Word(nums, max=2) + Suppress("."))
timestamp_bnf = Combine(yearly_day_bnf + White(' ') + clock24h_bnf)("timestamp")
report_bnf = report_id + Suppress("Report at ") + timestamp_bnf
# Service Statistics | Success Total % |
# Services succ. terminations | 40 40 100 | 3494775 3497059 99 |
partial_report_ignore = Suppress(SkipTo("Services succ. terminations", include=True))
succ_term_bnf = Suppress("|") + integer("succTerms") + integer("totalTerms")
terminations_report_bnf = report_bnf + partial_report_ignore + succ_term_bnf
# Apply the BNF to the unparsed data
terms_parsing = terminations_report_bnf.searchString(unparsed_log_data)
I would structure the parser around parsing a single log entry. This accomplishes 2 things:
- it breaks up the problem into easily parallelizable chunks
- it positions your parser to handle the incremental log processing after the initial backlog of log data has been processed
Your parallelizing chunk size is then a nicely packaged single item, and each process can parse the item separately (assuming that you don't need to carry forward any state or elapsed time info from one log message to the next).
EDIT (this question has morphed into more of a topic on pyparsing tuning...)
I've found that it is better to define low-level primitives that are built up using Combine(lots+of+expressions+here)
using a pyparsing Regex expression. This usually applies to expressions like real numbers or timestamps, such as:
# 2010-01-25 grammar
yearly_day_bnf = Combine(year + "-" + month + "-" + day)
yearly_day_bnf = Regex(r"\d{4}-\d{2}-\d{2}")
# 06:55:00. grammar
clock24h_bnf = Combine(Word(nums, max=2) + ":" +
Word(nums, max=2) + ":" +
Word(nums, max=2) + Suppress("."))
clock24h_bnf = Regex(r"\d{2}:\d{2}:\d{2}\.")
clock24h_bnf.setParseAction(lambda tokens:tokens[0][:-1])
timestamp_bnf = Combine(yearly_day_bnf + White(' ') + clock24h_bnf)
timestamp_bnf = Regex(r"\d{4}-\d{2}-\d{2}\s+\d{1,2}:\d{2}:\d{2}")
No need to overdo, though. Things like integer=Word(nums)
are already generating RE's under the covers.
Note that I also removed the results name from timestamp_bnf. I usually leave off the results names from the primitive definitions, and add them as I assemble them into higher-level expressions, so I can use the same primitive multiple times, with different names, like:
summary = ("Started:" + timestamp_bnf("startTime") +
"Ended:" + timestamp_bnf("endTime"))
I find that this also helps me organize my parsed structures.
Moving the results name to the higher expression also leads me to give the field a more descriptive name:
report_bnf = report_id + Suppress("Report at ") + timestamp_bnf("reportTime")
Looking at your grammar, you are not really cracking all of this report info, just extracting the report time from this line:
# XX_MAIN (23143) Report at 2011-08-30 20:00:00.003
and 2 integer fields from this line:
Services succ. terminations | 12736 12736 100 | 16311566 16314222 99 |
Try this instead:
report_bnf = report_id + Suppress("Report at") + timestamp_bnf("reportTime")
succ_term_bnf = (Suppress("Services succ. terminations") + Suppress("|") +
integer("succTerms") + integer("totalTerms"))
log_data_sources_bnf = report_bnf | succ_term_bnf
extractLogData = lambda logentry : sum(log_data_sources_bnf.searchString(logentry))
print extractLogData(log_slice).dump()
Pyparsing will always be slower than RE's, and it may be that a pyparsing parser in your case is just a prototyping stepping stone. I'm sure I can't get you 500X performance with a pyparsing parser, and you may just have to use the RE-based solution to process Mb's worth of log files.
精彩评论