开发者

running another job in hadoop

开发者 https://www.devze.com 2023-02-25 19:42 出处:网络
I don\'t understand how to make a job use the same output directory directory to write a different file in it. I have tried commeting

I don't understand how to make a job use the same output directory directory to write a different file in it. I have tried commeting and ucommenting this line, but it still doesn't work. I get the following exception when I comment it. Anyhow in the code I am trying to run two separate jobs with the same reducer but a different mapper.

EDIT: And no, the output of one job is not the input of the other, the reason I want them in the same folder is because they are inputs to yet another map reduce job I want to do.

FileOutputFormat.setOutputPath(job, new Path(args[1]));

11/04/14 13:33:11 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
Exception in thread "main" org.apache.hadoop.mapred.InvalidJobConfException: Output directory not set.
    at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:120)
    at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:770)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:432)
    at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:447)
    at org.myorg.WordCount.main(WordCount.java:123)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:156)

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        // String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        // if (otherArgs.length != 2) {
        //   System.err.println("Usage: wordcount <in> <out>");
        //   System.exit(2);
        // }
        Job job = new Job(conf, "Job1");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(Mapper1.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);

        FileSystem hdfs = FileSystem.get(conf);
        Path fromPath = new Path("/user/hadoop/output/part-r-00000");
        Path toPath = new Path("/user/hadoop/output/output1");

        // renaming to output1
        boolean isRenamed = hdfs.rename(fromPath, toPath);
        if (isRenamed)
        {
            System.out.println("Renamed to /user/hadoop/output/output1!");
        }
        else
        {
            System.out.println("Not Renamed!");
        }

        job = new Job(conf, "Job2");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(Mapper2.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // FileInputFormat.addInputPath(job, new Path(args[0]));
        // FileOutputFormat.setOutputPath(job, new Path(args[1]));


        System.exit( job.waitForCompletion(true) ? 0 : 1);
    }

adding the following to my code causes other errors:

    job.setInputFormatClass(FileInputFormat.class);
    job.setOutputFormatClass(FileOutputFormat.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);

Exception in thread "main" java.lang.RuntimeException: java.lang.InstantiationException
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:115)
    at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:768)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:432)
    at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:447)
    at org.myorg.WordCount.main(WordCount.java:135)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.util.RunJar.main(RunJar.jav开发者_运维知识库a:156)
Caused by: java.lang.InstantiationException
    at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:30)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:113)
    ... 9 more


You have to provide a new Configuration object for the second job. BTW why aren't you using these methods for your output format?

job.setInputFormatClass(FileInputFormat.class); job.setOutputFormatClass(FileOutputFormat.class);

Here is a blogpost about recursing jobs, thats quite the same stuff you are doing.
http://codingwiththomas.blogspot.com/2011/04/controlling-hadoop-job-recursion.html

EDIT: By the way, what is your intend to write into a folder that is the output of the previous job aka the input of the new job? This will just result in another exception like: "Output path already exists".


All the files don't need to be in the same directory. Your third job can have multiple input paths (directories or files).

see

FileInputFormat.addInputPaths(JobConf conf, String commaSeparatedPaths)

and friends...

0

精彩评论

暂无评论...
验证码 换一张
取 消