According to the attached image found on yahoo's hadoop tutorial, the order of operations is map > combine > partition which should be followed by reduce
Here is my an example key emmited by the map operation
LongValueSum:geo_US|1311722400|E 1
Assuming there ar开发者_StackOverflow社区e 100 keys of the same type, this should get combined as
geo_US|1311722400|E 100
Then i'd like to partition the keys by the value before the first pipe(|) http://hadoop.apache.org/common/docs/r0.20.2/streaming.html#A+Useful+Partitioner+Class+%28secondary+sort%2C+the+-partitioner+org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner+option%29
geo_US
so here's my streaming command
hadoop jar /usr/local/hadoop/contrib/streaming/hadoop-streaming-0.20.203.0.jar \
-D mapred.reduce.tasks=8 \
-D stream.num.map.output.key.fields=1 \
-D mapred.text.key.partitioner.options=-k1,1 \
-D stream.map.output.field.separator=\| \
-file mapper.py \
-mapper mapper.py \
-file reducer.py \
-reducer reducer.py \
-combiner org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-input input_file \
-output output_path
This is the error I get
java.lang.NumberFormatException: For input string: "1311722400|E 1"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:48)
at java.lang.Long.parseLong(Long.java:419)
at java.lang.Long.parseLong(Long.java:468)
at org.apache.hadoop.mapred.lib.aggregate.LongValueSum.addNextValue(LongValueSum.java:48)
at org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer.reduce(ValueAggregatorReducer.java:59)
at org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer.reduce(ValueAggregatorReducer.java:35)
at org.apache.hadoop.mapred.Task$OldCombinerRunner.combine(Task.java:1349)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1435)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1297)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:371)
at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:253)
I looks like the partitioner is running before the combiner. Any thoughts?
There is no guarantee that the Combiner will be run actually for hadoop versions > 0.16.
In hadoop 17, the combiner is not run if a single <K,V>
occupies the entire sort buffer. in Versions > 0.18, the combiner can be run multiple times both in the map and reduce phases.
Basically yours algorithms should not be dependent on whether the Combine function is called, since its meant to be just an optimization. For more information check out the book Haddop, A definitive guide.. found the snippet that talks about Combine functions on google books here
I have checked the "Hadoop Definitive Guide" Chapter 6 Shuffle and Sort. Map output is bufferd in memory first. When the memory exceeds its threshold, map output will be written to disk. Before it writes to disk, data will be partitioned. Within each partition, data will be sorted by key. After that if there is combiner function, combine the sort output.
There may be many spill files on disk, if there at least 3 spill files, the combiner will be run again before the output is written to disk.
At last, all spill files will be merged into one file to reduce number of IO.
In short, for mapper: map --> partition --> sort ---> combiner
and for reduer: copy form mapper --> merge (combiner called if exists) -> reduce
精彩评论