Is it possible to write a Hadoop-ready reduce function that can find the longes开发者_StackOverflow社区t run of 1s (only the length of the run)?
I'm thinking of something that can run on Python's functools.reduce. But I would eventually like to run on a Hadoop cluster (by "Hadoop-ready" I mean that the reduction steps can run in any arbitrary order).
The motivation is the search for tandem repeats in biological sequences as discussed here http://biostar.stackexchange.com/questions/10582/counting-repeat-sequence - finding the longest run of repeats. Sequentially this problem is trivial. But processing can this be done on big data? Trying to frame it as a map-reduce problem: The map function would map all words of interest (e.g. all occurrences of TGATCT) to 1s and everything else to 0s. The reducer function just needs to find the longest run of 1s.
I tried one approach that seemed doable but found a case where it fails.
The following is a skeleton code with tests.
#!/usr/bin/env python
def count_tandem_repeats_reducer(left, right):
# ...
def reduce(func, array):
# Just like functools.reduce but apply func at random positions
# func takes 2 adjacent elements of the array and returns 1 element
# the 2 elements are reduced into 1 until the array is of size 1
def count_tandem_repeats(seq):
if not seq: return 0
if len(seq) == 1: return seq[0]
return reduce(count_tandem_repeats_reducer, m)
# Testing
assert count_tandem_repeats([]) == 0
assert count_tandem_repeats([0,0,0]) == 0
assert count_tandem_repeats([1,1]) == 2
assert count_tandem_repeats([1,0,0,0,1,1,1,1,0,0,1]) == 4
assert count_tandem_repeats([0,0,0,1,1,1,0,0]) == 3
assert count_tandem_repeats([0,1,0,1,1,0,1,1,1,0,1,1,1,1,0] == 4
assert count_tandem_repeats([0,1,0,1,1,0,1,1,1,0,1,1,1,1,0][::-1]) == 4
This doesnt seem like a perfect fit for a set of parallel reducers. One alternative is to implement it as a separate map-reduce task which will run after your original algo (the one that converts ur sequence to 1s and 0s).
You then implement a Custom Input format and record reader that splits your input stream into some arbitrary number of segments and ALSO ensure the split happens only at a 1 -> 0 transition. Then in the mapper (If you were implementing the solution in Java, you would have a mapper class) you can maintain a count of the longest number of 1. Each mapper will output the longest run of 1s in its input split.. The reducer will then just return the max() of the output of all the mappers.
def count(seq):
return max(reduce(lambda acc, val: acc[:-1] + [acc[-1]+val] if val else acc + [val], seq, [0]))
print count([1,0,0,0,1,1,1,1,0,0,1])
prints
4
just to show that it can be done
精彩评论