开发者

Accessing a mapper's counter from a reducer

开发者 https://www.devze.com 2023-02-19 21:42 出处:网络
I need to access the counters from my mapper in my reducer.Is this possible?If so how is it done? As an example:

I need to access the counters from my mapper in my reducer. Is this possible? If so how is it done?

As an example: my mapper is:

public class CounterMapper extends Mapper<Text,Text,Text,Text> {

    static enum TestCounters { TEST }

    @Override
    protected void map(Text key, Text value, Context context)
                    throws IOException, InterruptedException {
        context.getCounter(TestCounters.TEST).increment(1);
        context.write(key, value);
    }
}

My reducer is

public class CounterReducer extends Reducer<Text,开发者_JS百科Text,Text,LongWritable> {

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
                        throws IOException, InterruptedException {
        Counter counter = context.getCounter(CounterMapper.TestCounters.TEST);
        long counterValue = counter.getValue();
        context.write(key, new LongWritable(counterValue));
    }
}

counterValue is always 0. Am I doing something wrong or is this just not possible?


In the Reducer's configure(JobConf), you can use the JobConf object to look up the reducer's own job id. With that, your reducer can create its own JobClient -- i.e. a connection to the jobtracker -- and query the counters for this job (or any job for that matter).

// in the Reducer class...
private long mapperCounter;

@Override
public void configure(JobConf conf) {
    JobClient client = new JobClient(conf);
    RunningJob parentJob = 
        client.getJob(JobID.forName( conf.get("mapred.job.id") ));
    mapperCounter = parentJob.getCounters().getCounter(MAP_COUNTER_NAME);
}

Now you can use mapperCounter inside the reduce() method itself.

You actually need a try-catch here. I'm using the old API, but it shouldn't be hard to adapt for the new API.

Note that mappers' counters should all be finalized before any reducer starts, so contrary to Justin Thomas's comment, I believe you should get accurate values (as long as the reducers aren't incrementing the same counter!)


Implemented Jeff G's solution on the new API:

    @Override
    public void setup(Context context) throws IOException, InterruptedException{
        Configuration conf = context.getConfiguration();
        Cluster cluster = new Cluster(conf);
        Job currentJob = cluster.getJob(context.getJobID());
        mapperCounter = currentJob.getCounters().findCounter(COUNTER_NAME).getValue();  
    }


The whole point of map/reduce is to parallelize the jobs. There will be many unique mappers/reducers so the value wouldn't be correct anyway except for that run of the map/reduce pair.

They have a word count example:

http://wiki.apache.org/hadoop/WordCount

You could change the context.write(word,one) to context.write(line,one)


The global counter values are never broadcast back to each mapper or reducer. If you want the # of mapper records to be available to the reducer, you'll need to rely on some external mechanism to do this.


I asked this question, but I haven't solve my problem. However, an alternative solution came to my mind. In mapper, number of words is counted, and it can be written to intermediate output with minimum key(so that this value is in head) in cleanup function which runs a the end of the mapper. In the reducer, number of words is calculating by adding values in head. The sample code and a part of its output is available below.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;
import java.util.StringTokenizer;

/**
 * Created by tolga on 1/26/16.
 */
public class WordCount {
    static enum TestCounters { TEST }
    public static class Map extends Mapper<Object, Text, Text, LongWritable> {
        private final static LongWritable one = new LongWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens()) {
                word.set(tokenizer.nextToken());
                context.write(word, one);
                context.getCounter(TestCounters.TEST).increment(1);
            }
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            context.write(new Text("!"),new LongWritable(context.getCounter(TestCounters.TEST).getValue()));
        }
    }

    public static class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> {

        public void reduce(Text key, Iterable<LongWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (LongWritable val : values) {
                sum += val.get();
            }
            context.write(key, new LongWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        Job job = new Job(conf, "WordCount");
        job.setJarByClass(WordCount.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
}

Text File:

Turgut Özal University is a private university located in Ankara, Turkey. It was established in 2008 by the Turgut Özal Thought and Action Foundation and is named after former Turkish president Turgut Özal.

Intermediate Output

**!	33**
2008	1
Action	1
Ankara,	1
Foundation	1
It	1
Thought	1
Turgut	1
Turgut	1
Turgut	1

**!	33**
2008	1
Action	1
Ankara,	1
Foundation	1
It	1
Thought	1
Turgut	3


Improvement from itzhaki's answer

findCounter(COUNTER_NAME) is no longer supported - https://hadoop.apache.org/docs/r2.7.0/api/org/apache/hadoop/mapred/Counters.html

@Override
public void setup(Context context) throws IOException, InterruptedException{
    Configuration conf = context.getConfiguration();
    Cluster cluster = new Cluster(conf);
    Job currentJob = cluster.getJob(context.getJobID());
    mapperCounter = currentJob.getCounters().findCounter(GROUP_NAME, COUNTER_NAME).getValue();  
}

GROUP_NAME is specified, when the counter is invoked. e.g.

context.getCounter("com.example.mycode", "MY_COUNTER").increment(1);

then

mapperCounter = currentJob.getCounters().findCounter("com.example.mycode", "MY_COUNTER").getValue();  

Also, one important point that, if the counter does not exist it will initialize one with value 0.

0

精彩评论

暂无评论...
验证码 换一张
取 消