您的位置:首页 > 数据库 > Mongodb

MongoDB聚合(Map-Reduce)(二)

2017-04-26 15:55 316 查看
MapReduce要实现两个函数:Map和Reduce。Map函数调用emit(key,value)遍历一个或多个集合中所有的记录,进行分组(group by),然后将key与value传给Reduce函数进行处理,输出结果。

(1)MapReduce使用自定义JavaScript函数执行map和reduce操作,所以是基于js引擎,单线程执行,效率不高,比Aggregation复杂,适合用做后台统计等。
(2)MapReduce支持分片操作,可以进行拆分,分发到不同的机器上执行(多服务器并行做数据集合处理),然后再将不同的机器处理的结果汇集起 来,输出结果,。
(3)MapReduce能执行单一聚合的所有操作count、distinct、group,但group 在当数据量非常大的时候,处理能力就不太好,先筛选再分组,不支持 分片,对数据量有所限制,效率不高。

MapReduce语法】

先对MapReduce语法认识一下,各个参数有什么作用,接下去的操作理解起来会比较容易。

[sql] view plain copy







<span style="font-size:18px;"> db.collection.mapReduce(

<map>,

<reduce>,

{

out: <collection>,

query: <document>,

sort: <document>,

limit: <number>,

finalize: <function>,

scope: <document>,

jsMode: <boolean>,

verbose: <boolean>,

bypassDocumentValidation: <boolean>

}

)</span>

参数说明:
map:是JavaScript 函数,负责将每一个输入文档转换为零或多个文档,通过key进行分组,生成键值对序列,作为 reduce 函数参数。
reduce:是JavaScript 函数,对map操作的输出做合并的化简的操作(将key-values变成key-value,也就是把values数组变成一个单一的值value)。
out:reduce执行完,存放的集合,如果不指定集合,则使用默认的临时集合,在MapReduce的连接关闭后自动就被删除了。
out: { <action>: <collectionName>
[, db: <dbName>]
[, sharded: <boolean> ]
[, nonAtomic: <boolean> ] }
query:过滤的条件,对符合条件的文档执行map函数。(query。limit,sort可以随意组合)。
sort :对文档进行排序,sort和limit结合的sort排序参数(也是在发往map函数前给文档排序),可以优化分组机制。
limit :发往map函数的文档数量的上限(要是没有limit,单独使用sort的用处不大)。
finalize:可以对reduce输出结果再一次修改,跟group的finalize一样,不过MapReduce没有group的4MB文档的输出限制。
scope:向map、reduce、finalize导入外部变量。
verbose:是否包括结果信息中的时间信息,默认为fasle。
"timing" : {
"mapTime" : 0,
"emitLoop" : 2,
"reduceTime" : 0,
"mode" : "mixed",
"total" : 0
}

map函数】
map是JavaScript 函数,负责将每一个输入文档转换为零或多个文档,通过key进行分组,生成键值对序列,作为 reduce 函数参数。
function() {
emit(key, value);
}

key对文档进行分组,value是要统计的数据,value可以是JSON对象(emit只能容纳MongoDB的最大BSON文件大小的一半)。我们对订单的详细统计每个产品类型卖出了多少个。我们先通过 pnumber进行分组,然后在对 quantity相加 相当于select pnumber,sum(quantity) from item group by pnumber

[sql] view plain copy







<span style="font-size:18px;">db.item.insert( [

{

"quantity" : 2,

"price" : 5.0,

"pnumber" : "p003"

},{

"quantity" : 2,

"price" : 8.0,

"pnumber" : "p002"

},{

"quantity" : 1,

"price" : 4.0,

"pnumber" : "p002"

},{

"quantity" : 2,

"price" : 4.0,

"pnumber" : "p001"

},{

"quantity" : 4,

"price" : 10.0,

"pnumber" : "p003"

},{

"quantity" : 10,

"price" : 20.0,

"pnumber" : "p001"

},{

"quantity" : 10,

"price" : 20.0,

"pnumber" : "p003"

},{

"quantity" : 5,

"price" : 10.0,

"pnumber" : "p002"

}

])

> var map = function() { emit(this.pnumber,this.quantity)}

> var reduce=function(key,values){return {'pumber':key,'quantity':Array.sum(values)}}

> db.item.mapReduce( map,

reduce,

{ out: "map_reduce_data" }

)

> db.map_reduce_data.find()

{ "_id" : "p001", "value" : { "pumber" : "p001", "quantity" : 12 } }

{ "_id" : "p002", "value" : { "pumber" : "p002", "quantity" : 8 } }

{ "_id" : "p003", "value" : { "pumber" : "p003", "quantity" : 20 } }</span>

【query过滤的条件】

对符合条件的文档将会执行map函数。(query。limit,sort可以随意组合), 我们对订单的详细的每次每种产品卖出的数量要大于5的并统计每个产品类型卖出了多少个。我们先通过 pnumber进行分组,然后在对 quantity相加 相当于select pnumber,sum(quantity) from item where quantity>5 group by pnumber

[sql] view plain copy







<span style="font-size:18px;">> var map = function() { emit(this.pnumber,this.quantity)}

> var reduce=function(key,values){return {'pumber':key,'quantity':Array.sum(values)}}

> db.item.mapReduce( map,

reduce,

{ query:{'quantity':{$gt:5}},

out: "map_reduce_data" } )

{

"result" : "map_reduce_data",

"timeMillis" : 5,

"counts" : {

"input" : 2,

"emit" : 2,

"reduce" : 0,

"output" : 2

},

"ok" : 1

}

> db.map_reduce_data.find()

{ "_id" : "p001", "value" : 10 }

{ "_id" : "p003", "value" : 10 }</span>

【value是JSON对象】

value可以是JSON格式,我们对订单的详细统计每个产品类型出现次数。我们先通过 pnumber进行分组,然后在对 quantity相加 相当于select pnumber,count(*) from item group by pnumber。

[sql] view plain copy







<span style="font-size:18px;"> >var map = function() {emit(this.pnumber,{count:1});}

> var reduce=function(key,values){

var count=0;

values.forEach(function(val){ count+=val.count;});

return {'pumber':key,"count":count};

}

> db.item.mapReduce( map,

reduce,

{ out: "map_reduce_data" }

)

{

"result" : "map_reduce_data",

"timeMillis" : 6,

"counts" : {

"input" : 10,

"emit" : 10,

"reduce" : 3,

"output" : 3

},

"ok" : 1

}

> db.map_reduce_data.find()

{ "_id" : "p001", "value" : { "pumber" : "p001", "count" : 2 } }

{ "_id" : "p002", "value" : { "pumber" : "p002", "count" : 3 } }

{ "_id" : "p003", "value" : { "pumber" : "p003", "count" : 5 } }</span>

【emit多次的循环】

可以对emit多次的循环,可以根据输入文档的项目字段中的元素的数量(键,值)多次调用:
function() {
this.items.forEach(function(item){ emit(key, value); });
}

我们对统计订单中对应的产品销售了多少个,我们先通过 pnumber进行分组,然后在对 quantity相加。

[sql] view plain copy







<span style="font-size:18px;">

db.orders.insert( [

{

"onumber" : "001",

"item" : [{

"quantity" : 2,

"price" : 5.0,

"pnumber" : "p003"

},{

"quantity" : 2,

"price" : 8.0,

"pnumber" : "p002"

}]

},{

"onumber" : "002",

"item" : [{

"quantity" : 1,

"price" : 4.0,

"pnumber" : "p002"

},{

"quantity" : 2,

"price" : 4.0,

"pnumber" : "p001"

},{

"quantity" : 4,

"price" : 10.0,

"pnumber" : "p003"

}]

},{

"onumber" : "003",

"item" : [{

"quantity" : 10,

"price" : 20.0,

"pnumber" : "p001"

},{

"quantity" : 10,

"price" : 20.0,

"pnumber" : "p003"

}]

},{

"onumber" : "004",

"item" : [{

"quantity" : 5,

"price" : 10.0,

"pnumber" : "p002"

}]

}

])

> var map = function() { this.item.forEach(function(it){ emit(it.pnumber,it.quantity); })}

> var reduce=function(key,values){return {'pumber':key,'quantity':Array.sum(values)}}

> db.orders.mapReduce( map,

reduce,

{ out: "map_reduce_data" } )

{

"result" : "map_reduce_data",

"timeMillis" : 51,

"counts" : {

"input" : 4,

"emit" : 8,

"reduce" : 3,

"output" : 3

},

"ok" : 1

}

> db.map_reduce_data.find()

{ "_id" : "p001", "value" : { "pumber" : "p001", "quantity" : 12 } }

{ "_id" : "p002", "value" : { "pumber" : "p002", "quantity" : 8 } }

{ "_id" : "p003", "value" : { "pumber" : "p003", "quantity" : 16 } }</span>

> var map = function() { this.item.forEach(function(it){ emit(it.pnumber,it.quantity); })}
也可以这样写
> var map = function() { for(var i=0;i<this.item.length;i++){ var it=item[i]; emit(it.pnumber,it.quantity); }}

【reduce函数

reduce是JavaScript 函数,对map操作的输出做合并的化简的操作(将key-values变成key-value,也就是把values数组变成一个单一的值value)。
function(key, values) {
...
return result;
}

values:值参数是一个数组,返回对象的类型必须与由map函数发出的值的类型相同。
reduce函数应该交换:即如果中元素的顺序不影响reduce函数的输出。
reduce( key, [ A, B ] ) == reduce( key, [ B, A ] )

对map操作的输出做合并的化简的操作(将key-values变成key-value,也就是把values数组变成一个单一的值value),我们对订单的详细统计每个产品类型卖出了多少个和每种产品出现次数。我们先通过 pnumber进行分组,然后在对 quantity相加 相当于select pnumber,count(*),sum(quantity) from item group by pnumber

[sql] view plain copy







<span style="font-size:18px;">>var map = function() {

var value={count:1, quantity:this.quantity};

emit(this.pnumber,value);

}

>var reduce=function(key,values){

var reducedVal = { count: 0, quantity: 0 };

for (var i = 0; i < values.length; i++) {

reducedVal.count += values[i].count;

reducedVal.quantity += values[i].quantity;

}

return reducedVal;

}

{

"result" : "map_reduce_data",

"timeMillis" : 7,

"counts" : {

"input" : 10,

"emit" : 10,

"reduce" : 3,

"output" : 3

},

"ok" : 1

}

> db.map_reduce_data.find()

{ "_id" : "p001", "value" : { "count" : 2, "quantity" : 12 } }

{ "_id" : "p002", "value" : { "count" : 3, "quantity" : 8 } }

{ "_id" : "p003", "value" : { "count" : 5, "quantity" : 20 } }</span>

【MapReduce 执行结果信息】

我们执行MapReduce输出结果时,有打印MapReduce信息

[sql] view plain copy







<span style="font-size:18px;">

{

"result" : "map_reduce_data",

"timeMillis" : 7,

"counts" : {

"input" : 10,

"emit" : 10,

"reduce" : 3,

"output" : 3

},

"ok" : 1

}

</span>

result:reduce执行完,存放的集合,如果不指定集合,则使用默认的临时集合,在MapReduce的连接关闭后自动就被删除了。我们这边有指定集合的名称map_reduce_data。
timeMillis:执行MapReduce所花费的时间(毫秒)。
input:满足条件被发送到map函数的文档个数。
emit:在map函数中emit被调用的次数,也就是所有集合中的数据总量。
ouput:输出到集合中的结果文档个数。
ok:是否成功,成功为1。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: