I don't understand how to make a job use the same output directory directory to write a different file in it. I have tried commeting and ucommenting this line, but it still doesn't work. I get the following exception when I comment it. Anyhow in the code I am trying to run two separate jobs with the same reducer but a different mapper.
EDIT: And no, the output of one job is not the input of the other, the reason I want them in the same folder is because they are inputs to yet another map reduce job I want to do.
FileOutputFormat.setOutputPath(job, new Path(args[1]));
11/04/14 13:33:11 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
Exception in thread "main" org.apache.hadoop.mapred.InvalidJobConfException: Output directory not set.
at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:120)
at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:770)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:432)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:447)
at org.myorg.WordCount.main(WordCount.java:123)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
// if (otherArgs.length != 2) {
// System.err.println("Usage: wordcount <in> <out>");
// System.exit(2);
// }
Job job = new Job(conf, "Job1");
job.setJarByClass(WordCount.class);
job.setMapperClass(Mapper1.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
FileSystem hdfs = FileSystem.get(conf);
Path fromPath = new Path("/user/hadoop/output/part-r-00000");
Path toPath = new Path("/user/hadoop/output/output1");
// renaming to output1
boolean isRenamed = hdfs.rename(fromPath, toPath);
if (isRenamed)
{
System.out.println("Renamed to /user/hadoop/output/output1!");
}
else
{
System.out.println("Not Renamed!");
}
job = new Job(conf, "Job2");
job.setJarByClass(WordCount.class);
job.setMapperClass(Mapper2.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// FileInputFormat.addInputPath(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit( job.waitForCompletion(true) ? 0 : 1);
}
adding the following to my code causes other errors:
job.setInputFormatClass(FileInputFormat.class);
job.setOutputFormatClass(FileOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
Exception in thread "main" java.lang.RuntimeException: java.lang.InstantiationException
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:115)
at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:768)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:432)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:447)
at org.myorg.WordCount.main(WordCount.java:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.jav开发者_运维知识库a:156)
Caused by: java.lang.InstantiationException
at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:30)
at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:113)
... 9 more
You have to provide a new Configuration object for the second job. BTW why aren't you using these methods for your output format?
job.setInputFormatClass(FileInputFormat.class); job.setOutputFormatClass(FileOutputFormat.class);
Here is a blogpost about recursing jobs, thats quite the same stuff you are doing.
http://codingwiththomas.blogspot.com/2011/04/controlling-hadoop-job-recursion.html
EDIT: By the way, what is your intend to write into a folder that is the output of the previous job aka the input of the new job? This will just result in another exception like: "Output path already exists".
All the files don't need to be in the same directory. Your third job can have multiple input paths (directories or files).
see
FileInputFormat.addInputPaths(JobConf conf, String commaSeparatedPaths)
and friends...
精彩评论