开发者

MongoDB Map/Reduce incrementally with adaptative query

开发者 https://www.devze.com 2023-04-13 05:30 出处:网络
I have an issue with data I want to aggregate incrementally. I have devices (a lot, stored in the device collection) that emits measures (NOT regularly) that are stored in the db in the record collec

I have an issue with data I want to aggregate incrementally.

I have devices (a lot, stored in the device collection) that emits measures (NOT regularly) that are stored in the db in the record collection. Each record has a timestamp_utc that is not the timestamp where data were stored but where measure where measured. (totally different since devices sends bunch of measures)

What I want is to batch data aggregation (Map Reduce) with different scales for every different devices incrementally. For instance I want a collection with an average of measure every 5 min, one every 30min, ... every day etc ... I don't want to process whole data every time but only new entries.

Ex: record collection contains :

{ _id : {device1, time : ISODate(2011-10-12T13:50:01Z)}, value : { meas1 : 2, meas2 : 4}},
{ _id : {device1, time : ISODate(2011-10-12T13:51:01Z)}, value : { meas1 : 1, meas2 : 6}},
{ _id : {device2, time : ISODate(2011-10-12T13:49:01Z)}, value : { meas1 : 3, meas2 : 7}},
{ _id : {device2, time : ISODate(2011-10-12T13:50:01Z)}, value : { meas1 : 4, meas2 : 8}},
{ 开发者_JS百科_id : {device2, time : ISODate(2011-10-12T13:51:01Z)}, value : { meas1 : 5, meas2 : 9}},

After data aggregation on scale1 (every 5 min) I will have something like

{ _id : {device1, time : ISODate(2011-10-12T13:50:00Z)}, value : { meas1 : 1.5, meas2 : 5}},
{ _id : {device2, time : ISODate(2011-10-12T13:45:00Z)}, value : { meas1 : 3, meas2 : 7}},
{ _id : {device2, time : ISODate(2011-10-12T13:50:00Z)}, value : { meas1 : 4.5, meas2 : 8.5}},

Etc on every sale. Generating this data the first time is naturally not a big deal and it's a very usual map/reduce operation.

db.record.mapReduce(map, reduce, {finalize : finalize, out :  { merge : db.recordscale1 }});

Problem comes when I want to do incremental map reduce. Indeed I would like to do sthg like this

db.record.mapReduce(map, reduce, {query : { "_id.time_utc" : { $gte : timeMin } }, finalize : finalize, out :  { merge : db.recordscale1 }});

The problem is that timeMin is different for each device, it should correspond to the last entry (sorted by time_utc) inserted in the output collection.

I've tried lot of techniques unsuccessfully (using insertion_time ...).

Has anybody an idea how can I handle it ?

I want to avoid doing one mapreduce per device since it will be too low.


I added finally a new field with the inserted timestamp.

For each map reduce call, i get the last treated timestamp in the reduced collection, I call map reduce with a query based on the inserted timestamp to avoid duplicates treatments.

If anyone have a better idea, i'm interrested ;-)

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号