开发者

How to set output writer in MapReduce

开发者 https://www.devze.com 2023-04-03 15:48 出处:网络
I\'m trying out the mapreduce framework from (http://code.google.com/p/appengine-mapreduce/) and modified the demo application a bit (use the mapreduce.input_readers.DatastoreInputReader instead of ma

I'm trying out the mapreduce framework from (http://code.google.com/p/appengine-mapreduce/) and modified the demo application a bit (use the mapreduce.input_readers.DatastoreInputReader instead of mapreduce.input_readers.BlobstoreZipInputReader).

I've set up 2 pipeline classes:

class IndexPipeline(base_handler.PipelineBase):
def run(self):
    output = yield mapreduce_pipeline.MapreducePipeline(
        "index",
        "main.index_map", #added higher up in code
        "main.index_reduce", #added higher up in code
        "mapreduce.input_readers.DatastoreInputReader",
        mapper_params={
            "entity_kind": "model.SearchRecords",
        },
        shards=16)
    yield StoreOutput("Index", output)

class StoreOutput(base_handler.PipelineBase):
    def run(self, mr_type, encoded_key):
        logging.info("output is %s %s" % (mr_type, str(encoded_key)))
        if encoded_key:
            key = db.Key(encoded=encoded_key)
            m = db.get(key)

            yield op.db.Put(m)

And run it with:

pipeline = Ind开发者_StackOverflow中文版exPipeline()
pipeline.start()

But I keep getting this error:

Handler yielded two: ['a'] , but no output writer is set.

I've tried to find somewhere in the source where to set the output writer but with out success. Only thing I found is that one should set a output_writer_class somewhere.

Does anyone know how to set this?

On a side note, the encoded_key argument in StoreOutput always seems to be None.


Output writer must be defined as argument of mapreduce_pipeline.MapreducePipeline (cf. docstring) :

class MapreducePipeline(base_handler.PipelineBase):
  """Pipeline to execute MapReduce jobs.

  Args:
    job_name: job name as string.
    mapper_spec: specification of mapper to use.
    reducer_spec: specification of reducer to use.
    input_reader_spec: specification of input reader to read data from.
    output_writer_spec: specification of output writer to save reduce output to.**
    mapper_params: parameters to use for mapper phase.
    reducer_params: parameters to use for reduce phase.
    shards: number of shards to use as int.
    combiner_spec: Optional. Specification of a combine function. If not
      supplied, no combine step will take place. The combine function takes a
      key, list of values and list of previously combined results. It yields
      combined values that might be processed by another combiner call, but will
      eventually end up in reducer. The combiner output key is assumed to be the
      same as the input key.

  Returns:
    filenames from output writer.
  """
0

精彩评论

暂无评论...
验证码 换一张
取 消