开发者

Hadoop reducer string manipulation doesn't work

开发者 https://www.devze.com 2023-01-18 05:01 出处:网络
Hi Text manipulation in Reduce phase seems not working correctly. I suspect problem could be in my code rather then hadoop itself but you never know...

Hi Text manipulation in Reduce phase seems not working correctly. I suspect problem could be in my code rather then hadoop itself but you never know... If you can spot any gotchas let me know. I wasted a day trying to figure out what’s wrong with this code.

my sample input file called simple.psv

12345   abc@bbc.com|m|1975
12346   bbc@cde.com|m|1981

my Mapper and reducer code

package simplemapreduce;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;

/**
 *
 * @author
 */
public class Main {


    public static class SimpleMap extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, Text> {

        public void map(LongWritable key, Text inputs,
                OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException {

            String inputString = inputs.toString();
            //System.out.println("CRM Map record:"+inputString);
            StringTokenizer tokenizer = new StringTokenizer(inputString);
            Text userid = new Text();
            if (tokenizer.hasMoreTokens()) {
                userid.set(tokenizer.nextToken());
                Text data = new Text();
                if (tokenizer.hasMoreTokens()) {
                    data.set(tokenizer.nextToken());
                } else {
                    data.set("");
                }
                output.collect(userid, data);
            }
        }
    }

    /**
     * A reducer class that just emits its input.
     */
    public static class SimpleReduce extends MapReduceBase implements
            Reducer<Text, Text, Text, Text> {

        public void reduce(Text key, Iterator<Text> values,
                OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException {

            while (values.hasNext()) {
                Text txt = values.next();
开发者_JAVA技巧                String inputString = "<start>" + txt.toString() + "<end>";
                Text out = new Text();
                out.set(inputString);
                //System.out.println(inputString);
                output.collect(key, out);

            }
        }
    }

    public static void main(String[] args) throws IOException {

        if (args.length != 2) {
            System.err.println("Usage: SimpleMapReduce <input path> <output path>");
            System.exit(1);
        }
        JobConf conf = new JobConf(Main.class);
        conf.setJobName("Simple Map reducer");

        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        conf.setMapperClass(SimpleMap.class);
        conf.setCombinerClass(SimpleReduce.class);
        conf.setReducerClass(SimpleReduce.class);
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);
        conf.setNumReduceTasks(1);
        JobClient.runJob(conf);
    }
}

my sample launch script called simple.sh

#!/bin/bash

hadoop jar SimpleMapReduce.jar \
  /full/path/to/input/simple.tsv  /user/joebloggs/output

expected output

12345   <start>abc@bbc.com|m|1975<end>
12346   <start>bbc@cde.com|m|1981<end>

actual output

12345   <start><start>abc@bbc.com|m|1975<end><end>
12346   <start><start>bbc@cde.com|m|1981<end><end>

I tested this on Amazon s3 as well on Linux if you could spot the problem and let me know what it is... that will really save some hair on my head!


The basic flow of data through the system is:

Input -> Map -> Reduce -> output.

As a performance optimization the combiner has been added to allow a computer (one of the many in the hadoop cluster) to do a partial aggregation of the data before it is transmitted to the system where the actual reducer is run.

In the word count example it is fine to start with these values :

1 1 1 1 1 1 1 1 1 1

combine them into

3 4 2 1

and the reduce them into the final result

10

So the combiner is essentially a performance optimization. If you do not specify a combiner it will not change the information going through (i.e. it's an "identity reducer"). So you can only use the SAME class as both the combiner and reducer if the dataset remains valid that way. In your case: that is not true --> your data is now invalid.

You do:

conf.setCombinerClass(SimpleReduce.class);
conf.setReducerClass(SimpleReduce.class);

So this makes the output of your mapper go through your reducer twice. The first one adds: "start" & "end" The second one adds "start" & "end" again.

Simple solution:

// conf.setCombinerClass(SimpleReduce.class);
conf.setReducerClass(SimpleReduce.class);

HTH


I had a problem wherein the reducer wont get all the data sent by the mapper. The reducer would only get upto the specific portion output.collect will emit. For Eg. for the Input Data:

12345 abc@bbc.com|m|1975
12346 bbc@cde.com|m|1981

if I say

output.collect(key,mail_id);

Then it will not get the next two fields - sex and year of birth.

// conf.setCombinerClass(SimpleReduce.class);

Solved the problem.

0

精彩评论

暂无评论...
验证码 换一张
取 消