My program follows a iterative map/reduce approach. And it needs to stop if certain conditions are met. Is there anyway i can set a global variable that can be distributed across all map/reduce tasks and check if the global variable reaches the condition for completion.
Something like this.
While(Condition != true){
Configuration conf = getConf();
开发者_如何学运维 Job job = new Job(conf, "Dijkstra Graph Search");
job.setJarByClass(GraphSearch.class);
job.setMapperClass(DijkstraMap.class);
job.setReducerClass(DijkstraReduce.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
}
Where condition is a global variable that is modified during/after each map/reduce execution.
Each time you run a map-reduce job, you can examine the state of the output, the values contained in the counters, etc, and make a decision at the node that is controlling the iteration on whether you want one more iteration or not. I guess I don't understand where the need for a global state comes from in your scenario.
More generally -- there are two main ways state is shared between executing nodes (although it should be noted that sharing state is best avoided since it limits scalability).
- Write a file to HDFS that other nodes can read (make sure the file gets cleaned up when the job exits, and that speculative execution won't cause weird failures).
- Use ZooKeeper to store some data in dedicated ZK tree nodes.
You can use Configuration.set(String name, String value) to set a value you will be able to access in your Mappers/Reducers/etc:
In your driver:
conf.set("my.dijkstra.parameter", "value");
And e.g. in your mapper:
public void configure(JobConf job) {
myParam = job.get("my.dijkstra.parameter");
}
But this will not likely help you to look on the output of previous jobs to decide whether to start one more iteration. I.e. this value will not be pushed back after job execution.
You can also use Hadoop's DistributedCache to store files that will be distributed among all nodes. This is a bit better than simply store something on HDFS if a value you are going to pass this way is something small.
Of course counters can also be used for this purpose. But they don't look too reliable for purposes of making decisions in the algorithm. Looks like in some cases they can be incremented twice (if some task was executed more then once, e.g. in case of failure or speculative execution) - I am not sure.
This is how it works in Hadoop 2.0
In your driver:
conf.set("my.dijkstra.parameter", "value");
And in your Mapper:
protected void setup(Context context) throws IOException,
InterruptedException {
Configuration conf = context.getConfiguration();
strProp = conf.get("my.dijkstra.parameter");
// and then you can use it
}
You can use Cascading to organize multiple Hadoop jobs. Specify a HDFS path where you want to keep the global state variable and initialize with dummy contents. On each iteration, read the current contents of this HDFS path, delete those contents, perform any number of map/reduce steps, and finally perform a global reduce that updates the global state variable. Depending on the nature of your task, you may need to disable speculative execution and allow for many retries.
精彩评论