开发者

Hadoop order of operations

开发者 https://www.devze.com 2023-03-26 00:55 出处:网络
According to the attached image found on yahoo\'s hadoop tutorial, the order of operations is map > combine > partition which should be followed by reduce

According to the attached image found on yahoo's hadoop tutorial, the order of operations is map > combine > partition which should be followed by reduce

Here is my an example key emmited by the map operation

LongValueSum:geo_US|1311722400|E        1

Assuming there ar开发者_StackOverflow社区e 100 keys of the same type, this should get combined as

geo_US|1311722400|E     100

Then i'd like to partition the keys by the value before the first pipe(|) http://hadoop.apache.org/common/docs/r0.20.2/streaming.html#A+Useful+Partitioner+Class+%28secondary+sort%2C+the+-partitioner+org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner+option%29

geo_US

so here's my streaming command

hadoop jar /usr/local/hadoop/contrib/streaming/hadoop-streaming-0.20.203.0.jar \
-D mapred.reduce.tasks=8 \
-D stream.num.map.output.key.fields=1 \
-D mapred.text.key.partitioner.options=-k1,1 \
-D stream.map.output.field.separator=\| \
-file mapper.py \
-mapper mapper.py \
-file reducer.py \
-reducer reducer.py \
-combiner org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-input input_file \
-output output_path

This is the error I get

java.lang.NumberFormatException: For input string: "1311722400|E    1"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:48)
at java.lang.Long.parseLong(Long.java:419)
at java.lang.Long.parseLong(Long.java:468)
at org.apache.hadoop.mapred.lib.aggregate.LongValueSum.addNextValue(LongValueSum.java:48)
at org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer.reduce(ValueAggregatorReducer.java:59)
at org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer.reduce(ValueAggregatorReducer.java:35)
at org.apache.hadoop.mapred.Task$OldCombinerRunner.combine(Task.java:1349)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1435)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1297)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:371)
at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:253)

I looks like the partitioner is running before the combiner. Any thoughts?


There is no guarantee that the Combiner will be run actually for hadoop versions > 0.16. In hadoop 17, the combiner is not run if a single <K,V> occupies the entire sort buffer. in Versions > 0.18, the combiner can be run multiple times both in the map and reduce phases.

Basically yours algorithms should not be dependent on whether the Combine function is called, since its meant to be just an optimization. For more information check out the book Haddop, A definitive guide.. found the snippet that talks about Combine functions on google books here


I have checked the "Hadoop Definitive Guide" Chapter 6 Shuffle and Sort. Map output is bufferd in memory first. When the memory exceeds its threshold, map output will be written to disk. Before it writes to disk, data will be partitioned. Within each partition, data will be sorted by key. After that if there is combiner function, combine the sort output.

There may be many spill files on disk, if there at least 3 spill files, the combiner will be run again before the output is written to disk.

At last, all spill files will be merged into one file to reduce number of IO.

In short, for mapper: map --> partition --> sort ---> combiner

and for reduer: copy form mapper --> merge (combiner called if exists) -> reduce

0

精彩评论

暂无评论...
验证码 换一张
取 消