开发者

Oozie running own MapReduce workflow issue

开发者 https://www.devze.com 2023-03-10 06:01 出处:网络
Not sure if anyone has run into this issue. I am trying to use oozie for running a simple MapReduce job that searches for a string value in HDFS location and if it finds it it outputs it.When I submit

Not sure if anyone has run into this issue. I am trying to use oozie for running a simple MapReduce job that searches for a string value in HDFS location and if it finds it it outputs it. When I submit the job, oozie successfully executes but I don't get the expected output. It seems that my mapper never was invoked since it did not filter out any of the expected results. The output file just has all the input records. I have put System.out.printlns all over the code and I don't see them in task log files. I have checked the config, and job results and I don't see a counter that I have added nor any of the System.outs. The job config though states that my mapper class was executed. This does not make any sense to me. Below is a snippet of my Mapper code:

private static final String SEARCH_FOR_STRING = "poc.search.string";

enum SearchCounters {
    NUMBER_OF_MATCHES;
}


private NullWritable nullValue = NullWritable.get();
private Text outputLine = new Text();
private String searchString = null;

@Override
protected void setup(Context context) {
    searchString = context.getConfiguration().get(SEARCH_FOR_STRING);
    System.out.println("Searching for: [" + searchString + "]");
    context.getCounter(SearchCounters.NUMBER_OF_MATCHES).increment(0);
}

@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{   
    String inputLine = value.toString();
    System.out.println("key: " + key.toString() + "value: " + inputLine 
            + " searchString: [" + searchString + "]");

    if(inputLine.contains(searchString)) {
        context.getCounter(SearchCounters.NUMBER_OF_MATCHES).increment(1);
        outputLine.set(inputLine);
        context.write(outputLine, nullValue);
    }
}

And here is my workflow.xml

<workflow-app xmlns="uri:oozie:workflow:0.1" name="search-wf">
    <start to="mr-node"/>
    <action name="mr-node">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/${wf:user()}/${outputDir}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapreduce.map.class</name>
                    <value>poc.SearchForValueMapper</value>
                </property>
                <property>
                    <name>mapreduce.reducer.class</name>
                    <value>poc.SearchForValueReducer</value>
                </property>
                <property>
                    <name>mapred.map.tasks</name>
                    <value>100</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>/user/${wf:user()}/${inputDir}</value>
                </property>
                <property>
                    <name>com.disney.search.string</name>
                    <value>${searchString}</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>/user/${wf:user()}/${outputDir}</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</me开发者_运维问答ssage>
    </kill>
    <end name="end"/>
</workflow-app>
0

精彩评论

暂无评论...
验证码 换一张
取 消