开发者

Need help implementing this algorithm with map Hadoop MapReduce

开发者 https://www.devze.com 2023-01-02 06:46 出处:网络
i have algorithm that will go through a large data set read some text files and search for specific terms in those lines. I have it implemented in Java, but I didnt want to post code so that it doesnt

i have algorithm that will go through a large data set read some text files and search for specific terms in those lines. I have it implemented in Java, but I didnt want to post code so that it doesnt look i am searching for someone to implement it for me, but it is true i really need a lot of help!!! This was not planned for my project, but data set turned out to be huge, so teacher told me I have to do it like this.

EDIT(i did not clarified i previos version)The data set I have is on a Hadoop cluster, and I should make its MapReduce implementation

I was reading about MapReduce and thaught that i first do the standard implementation and then it will be more/less easier to do it with mapreduce. But didnt happen, since algorithm is quite stupid and nothing special, and map reduce...i cant wrap my mind around it.

So here is shortly pseudo code of my algorithm

LIST termList   (there is method that creates this list from lucene index)
FOLDER topFolder

INPUT topFolder
IF it is folder and not empty
    list files (there are 30 sub folders inside)
    FOR EACH sub folder
        GET file "CheckedFile.txt"
        analyze(CheckedFile)
    ENDFOR
END IF


Method ANALYZE(CheckedFile)

read CheckedFile
WHILE CheckedFile has next line
    GET line
    FOR(loops through termList)
            GET third word from line
          IF third word = term from开发者_Python百科 list
        append whole line to string buffer
    ENDIF
ENDFOR
END WHILE
OUTPUT string buffer to file

Also, as you can see, each time when "analyze" is called, new file has to be created, i understood that map reduce is difficult to write to many outputs???

I understand mapreduce intuition, and my example seems perfectly suited for mapreduce, but when it comes to do this, obviously I do not know enough and i am STUCK!

Please please help.


You can just use an empty reducer, and partition your job to run a single mapper per file. Each mapper will create its own output file in your output folder.


Map Reduce is easily implemented using some nice Java 6 concurrency features, especially Future, Callable and ExecutorService.

I created a Callable that will analyse a file in the way you specified

public class FileAnalyser implements Callable<String> {

  private Scanner scanner;
  private List<String> termList;

  public FileAnalyser(String filename, List<String> termList) throws FileNotFoundException {
    this.termList = termList;
    scanner = new Scanner(new File(filename));
  }

  @Override
  public String call() throws Exception {
    StringBuilder buffer = new StringBuilder();
    while (scanner.hasNextLine()) {
      String line = scanner.nextLine();
      String[] tokens = line.split(" ");
      if ((tokens.length >= 3) && (inTermList(tokens[2])))
        buffer.append(line);
    }
    return buffer.toString();
  }

  private boolean inTermList(String term) {
    return termList.contains(term);
  }
}

We need to create a new callable for each file found and submit this to the executor service. The result of the submission is a Future which we can use later to obtain the result of the file parse.

public class Analayser {

  private static final int THREAD_COUNT = 10;

  public static void main(String[] args) {

    //All callables will be submitted to this executor service
    //Play around with THREAD_COUNT for optimum performance
    ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);

    //Store all futures in this list so we can refer to them easily
    List<Future<String>> futureList = new ArrayList<Future<String>>();

    //Some random term list, I don't know what you're using.
    List<String> termList = new ArrayList<String>();
    termList.add("terma");
    termList.add("termb");

    //For each file you find, create a new FileAnalyser callable and submit
    //this to the executor service. Add the future to the list
    //so we can check back on the result later
    for each filename in all files {
      try {
        Callable<String> worker = new FileAnalyser(filename, termList);
        Future<String> future = executor.submit(worker);
        futureList.add(future);
      }
      catch (FileNotFoundException fnfe) {
        //If the file doesn't exist at this point we can probably ignore,
        //but I'll leave that for you to decide.
        System.err.println("Unable to create future for " + filename);
        fnfe.printStackTrace(System.err);
      }
    }

    //You may want to wait at this point, until all threads have finished
    //You could maybe loop through each future until allDone() holds true
    //for each of them.

    //Loop over all finished futures and do something with the result
    //from each
    for (Future<String> current : futureList) {
      String result = current.get();
      //Do something with the result from this future
    }
  }
}

My example here is far from complete, and far from efficient. I haven't considered the sample size, if it's really huge you could keep looping over the futureList, removing elements that have finished, something similar to:

while (futureList.size() > 0) {
      for (Future<String> current : futureList) {
        if (current.isDone()) {
          String result = current.get();
          //Do something with result
          futureList.remove(current);
          break; //We have modified the list during iteration, best break out of for-loop
        }
      }
}

Alternatively you could implement a producer-consumer type setup where the producer submits callables to the executor service and produces a future and the consumer takes the result of the future and discards then future.

This would maybe require the produce and consumer be threads themselves, and a synchronized list for adding/removing futures.

Any questions please ask.

0

精彩评论

暂无评论...
验证码 换一张
取 消