开发者

Run Hadoop job without using JobConf

开发者 https://www.devze.com 2022-12-18 05:59 出处:网络
I can\'t find a single example of submitting a Hadoop job that does not use the deprecated JobConf class.JobClient, which hasn\'t been deprecated, still开发者_开发技巧 only supports methods that take

I can't find a single example of submitting a Hadoop job that does not use the deprecated JobConf class. JobClient, which hasn't been deprecated, still开发者_开发技巧 only supports methods that take a JobConf parameter.

Can someone please point me at an example of Java code submitting a Hadoop map/reduce job using only the Configuration class (not JobConf), and using the mapreduce.lib.input package instead of mapred.input?


Hope this helpful

import java.io.File;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MapReduceExample extends Configured implements Tool {

    static class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
        public MyMapper(){

        }

        protected void map(
                LongWritable key,
                Text value,
                org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, Text>.Context context)
                throws java.io.IOException, InterruptedException {
            context.getCounter("mygroup", "jeff").increment(1);
            context.write(key, value);
        };
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = new Job();
        job.setMapperClass(MyMapper.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        FileUtils.deleteDirectory(new File("data/output"));
        args = new String[] { "data/input", "data/output" };
        ToolRunner.run(new MapReduceExample(), args);
    }
}


I believe this tutorial illustrates removing the deprecated JobConf class using Hadoop 0.20.1.


This is a nice example with downloadable code: http://sonerbalkir.blogspot.com/2010/01/new-hadoop-api-020x.html It's also over two years old and there is no official documentation discussing the new API. Sad.


In the previous API there were three ways of submitting the job and one of them is by submitting the job and getting a reference to the RunningJob and getting an id of the RunningJob.

submitJob(JobConf) : only submits the job, then poll the returned handle to the RunningJob to query status and make scheduling decisions.

How can one use the new Api and get a reference to the RunningJob and get an id of the runningJob as none of the api's return a reference to RunningJob

http://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/Job.html

thanks


Try to use Configuration and Job. Here is an example:

(Replace your Mapper, Combiner, Reducer classes and other configuration)

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCount {
  public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    Configuration conf = new Configuration();
    if(args.length != 2) {
      System.err.println("Usage: <in> <out>");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "Word Count");

    // set jar
    job.setJarByClass(WordCount.class);

    // set Mapper, Combiner, Reducer
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);

    /* Optional, set customer defined Partioner:
     * job.setPartitionerClass(MyPartioner.class);
     */

    // set output key
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    // set input and output path
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    // by default, Hadoop use TextInputFormat and TextOutputFormat
    // any customer defined input and output class must implement InputFormat/OutputFormat interface
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
0

精彩评论

暂无评论...
验证码 换一张
取 消