开发者

manipulating iterator in mapreduce

开发者 https://www.devze.com 2023-01-11 18:36 出处:网络
I am trying to f开发者_JAVA百科ind the sum of any given points using hadoop, The issue I am having is on getting all values from a given key in a single reducer. It looks like this.

I am trying to f开发者_JAVA百科ind the sum of any given points using hadoop, The issue I am having is on getting all values from a given key in a single reducer. It looks like this.

Reducer:

 public static class Reduce extends MapReduceBase implements
        Reducer<Text, IntWritable, Text, DoubleWritable> {

    public void reduce(Text key, Iterator<IntWritable> values,
            OutputCollector<Text, DoubleWritable> output, Reporter reporter)
            throws IOException {
        Text word = new Text();

        Iterator<IntWritable> tr = values;
        IntWritable v;
        while (tr.hasNext()) {
             v = tr.next();

            Iterator<IntWritable> td = values;
            while (td.hasNext()) {

                IntWritable u = td.next();
                double sum = u+v;
                word.set( u + " + " + v);
                output.collect(word, new DoubleWritable(sum));
            }
        }
    }
}

And I am trying to create two copies of the Iterator variable so that I can go through all the values of the second iterator while I get a single value from the previous Iterator( Two while loops above) but the two iterators hold the same value all the time.

I am not sure if this is the right way to do it.


The iterators in the reducer are not as simple as you might think.

The issue is that the total number of items that you are iterating through might not fit into memory. That means that the iterator may be reading from disk. If you have two independent copies of the iterator, then you can have one of them far ahead of the other which implies that the data between where the two iterators point can't be dropped.

For simplicity of implementation, Hadoop doesn't support having more than one iterator for the reduce values.

The practical impact of this is that you can't go through the same iterator twice. That isn't nice, but it is the case. If you absolutely know that the number of items will fit into memory, then you can copy all the items into a list as suggested by MrGomez. If you don't know that, you may have to use secondary storage.

The better approach is to redesign your program so that you don't need unbounded storage in the reducer. This can get a bit tricky, but there are standard approaches to the problem.

For your particular problem, you have a quadratic growth in output size relative to the largest reduce input set. This is usually a really bad idea. In most cases you don't need ALL pairs, just the most important pairs. If you can trim the set of pairs in some way, then you are all set and you may be able to remove the all pairs constraint.

For instance, if you are trying to find the 100 pairs with the largest sum for each reduce set, you can keep a priority queue with the 100 largest inputs seen so far and a priority queue with the 100 largest sums seen so far. For each new input, you can form the sum with the largest 100 numbers seen so far and try to stick those sums into the second queue. Finally, you should stick the new input into the first queue and trim both queues to 100 elements by deleting the smallest values (if necessary). In the close method of the reduce, you should dump the priority queue. This approach guarantees that you only need min(n^2, 200) elements of storage which avoids the n^2 problem and avoids the double pass through the input by keeping the 100 largest items seen rather than all items seen.


I'm not sure exactly what you're trying to accomplish, but I know this much: the behavior of Hadoop's Iterators is a bit strange. Calling Iterator.next() will always return the SAME EXACT instance of IntWritable, with the contents of that instance replaced with the next value. So holding a reference to the IntWritable across calls to Iterator.next() is almost always a mistake. I believe this behavior is by design to reduce the amount of object creation and GC overhead.

One way to get around this is to use WritableUtils.clone() to clone the instance you're trying to preserve across calls to Iterator.next().


To copy Iterator, you can't assign the iterator to a new variable. You should "clone" a iterator to a new variable of iterator class. When the iterator A assign an other iterator variable B, the two variables of iterator are pointed the same data.


Going by your previous question, you appear to be stuck on the iterator problem piccolbo described. The formulation of your reducer also indicates you've forgone his proposed algorithms for the naive approach... which will work, albeit suboptimally.

Allow me to clean up your code a bit with my answer:

// Making use of Hadoop's Iterable reduce, assuming it's available to you
//
//  The method signature is:
//
//  protected void reduce(KEYIN key, java.lang.Iterable<VALUEIN> values, 
//   org.apache.hadoop.mapreduce.Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>.Context 
//   context) throws java.io.IOException, java.lang.InterruptedException
//
public void reduce(Text key, Iterable<IntWritable> values, Context context)
        throws IOException, InterruptedException {

    // I assume you declare this here to save on GC
    Text outKey = new Text();
    IntWritable outVal = new IntWritable();

    // Since you've forgone piccolbo's approach, you'll need to maintain the
    // data structure yourself. Since we always walk the list forward and
    // wish to optimize the insertion speed, we use LinkedList. Calls to
    // IntWritable.get() will give us an int, which we then copy into our list.
    LinkedList<Integer> valueList = new LinkedList<Integer>();

    // Here's why we changed the method signature: use of Java's for-each
    for (IntWritable iw: values) {
        valueList.add(iw.get());
    }

    // And from here, we construct each value pair as an O(n^2) operation
    for (Integer i: valueList) {
        for (Integer j: valueList) {
            outKey.set(i + " + " + j);
            outVal.set(i + j);
            context.write(outKey, outVal);
        }
    }

    // Do note: I've also changed your return value from DoubleWritable to
    // IntWritable, since you should always be performing integer operations
    // as defined. If your points are Double, supply DoubleWritable instead.
}

This works, but it makes several assumptions that limit performance when constructing your distance matrix, including requiring the combination to be performed in a single reduce operation.

Consider piccolbo's approach if you know the size and dimensionality of your input data set in advance. This should be available, in the worst case, by walking the lines of input in linear time.

(See this thread for why we can't implement this as a forward iterator.)

0

精彩评论

暂无评论...
验证码 换一张
取 消