您的位置:首页 > 大数据

MongoDB 的 MapReduce 大数据统计统计挖掘

2014-02-27 18:35 323 查看
MongoDB虽然不像我们常用的mysql,sqlserver,oracle等关系型数据库有group by函数那样方便分组,但是MongoDB要实现分组也有3个办法:* Mongodb三种分组方式:* 1、group(先筛选再分组,不支持分片,对数据量有所限制,效率不高)* 2、mapreduce(基于js引擎,单线程执行,效率较低,适合用做后台统计等)* 3、aggregate(推荐) (如果你的PHP的mongodb驱动版本需>=1.3.0,推荐你使用aggregate,性能要高很多,并且使用上要简单些,不过1.3的目前还不支持账户认证模式,可以通过http://pecl.php.net/package/mongo查看更新日志和Bug)下面就来看下mapreduce方式:Mongodb官网对MapReduce介绍:Map/reduce in MongoDB is useful for batch processing of data and aggregation operations. It is similar in spirit to using something like Hadoop with all input coming from a collection and output going to a collection. Often, in a situation where you would have used GROUP BY in SQL, map/reduce is the right tool in MongoDB.大致意思是:Mongodb中的Map/reduce主要是用来对数据进行批量处理和聚合操作,有点类似于使用Hadoop对集合数据进行处理,所有输入数据都是从集合中获取,而MapReduce后输出的数据也都会写入到集合中。通常类似于我们在SQL中使用Group By语句一样。使用MapReduce要实现两个函数:Map和Reduce。Map函数调用emit(key,value)遍历集合中所有的记录,将key与value传给Reduce函数进行处理。Map函数和Reduce函数是使用Javascript编写的,并可以通过db.runCommand或mapreduce命令来执行MapReduce操作。MapReduce命令如下:[javascript] view plaincopydb.runCommand({ mapreduce : <collection>,map : <mapfunction>,reduce : <reducefunction>[, query : <query filter object>][, sort : <sort the query. useful for optimization>][, limit : <number of objects to return from collection>][, out : <output-collection name>][, keeptemp: <true|false>][, finalize : <finalizefunction>][, scope : <object where fields go into javascript global scope >][, verbose : true]});参数说明:mapreduce:要操作的目标集合map:映射函数(生成键值对序列,作为Reduce函数的参数)reduce:统计函数query:目标记录过滤sort:对目标记录排序limit:限制目标记录数量out:统计结果存放集合(如果不指定则使用临时集合,在客户端断开后自动删除)keeptemp:是否保留临时集合finalize:最终处理函数(对reduce返回结果执行最终整理后存入结果集合)scope:向map、reduce、finalize导入外部变量verbose:显示详细的时间统计信息map函数map函数调用当前对象,并处里对象的属性,传值给reduce,map方法使用this来操作当前对象,最少调用一次emit(key,value)方法来向reduce提供参数,其中emit的key为最终数据的id。reduce函数接收一个值和数组,根据需要对数组进行合并分组等处理,reduce的key就是emit(key,value)的key,value_array是同个key对应的多个value数组。Finalize函数此函数为可选函数,可在执行完map和reduce后执行,对最后的数据进行统一处理。看完基本介绍,我们再来看一个实例:已知集合feed,测试数据如下:[javascript] view plaincopy{"_id": ObjectId("50ccb3f91e937e2927000004"),"feed_type": 1,"to_user": 234,"time_line": "2012-12-16 01:26:00"}{"_id": ObjectId("50ccb3ef1e937e0727000004"),"feed_type": 8,"to_user": 123,"time_line": "2012-12-16 01:26:00"}{"_id": ObjectId("50ccb3e31e937e0a27000003"),"feed_type": 1,"to_user": 123,"time_line": "2012-12-16 01:26:00"}{"_id": ObjectId("50ccb3d31e937e0927000001"),"feed_type": 1,"to_user": 123,"time_line": "2012-12-16 01:26:00"}我们按动态类型feed_type和用户to_user进行分组统计,实现结果:
feed_typeto_usercout
12341
81231
11232
实现代码:[php] view plaincopy//编写map函数$map = 'function() {var key = {to_user:this.to_user,feed_type:this.feed_type};var value = {count:1};emit(key,value);} ';//reduce 函数$reduce = 'function(key, values) {var ret = {count:0};for(var i in values) {ret.count += 1;}return ret;}';//查询条件$query = null; //本实例中没有查询条件,设置为null[php] view plaincopy$mongo = new Mongo('mongodb://root:root@127.0.0.1: 28017/'); //链接mongodb,账号和密码为root,root$instance = $mongo->selectDB("testdb");//执行此命令后,会创建feed_temp_res的临时集合,并将统计后的数据放在该集合中$cmd = $instance->command(array('mapreduce' => 'feed','map' => $map,'reduce' => $reduce,'query' => $query,'out' => 'feed_temp_res'));//查询临时集合中的统计数据,验证统计结果是否和预期结果一致$cursor = $instance->selectCollection('feed_temp_res')->find();$result = array();try {while ($cursor->hasNext()){$result[] = $cursor->getNext();}}catch (MongoConnectionException $e){echo $e->getMessage();}catch (MongoCursorTimeoutException $e){echo $e->getMessage();}catch(Exception $e){echo $e->getMessage();}//testvar_dump($result);下面是输出的结果,和预期结果一致[javascript] view plaincopy{"_id": {"to_user": 234,"feed_type": 1},"value": {"count": 1}}{"_id": {"to_user": 123,"feed_type": 8},"value": {"count": 1}}{"_id": {"to_user": 123,"feed_type": 1},"value": {"count": 2}}以上只是简单的统计实现,你可以实现复杂的条件统计编写复杂的reduce函数,可以增加查询条件,排序等等。附上mapReduce数据库处理函数(简单封装)[php] view plaincopy/*** mapReduce分组** @param string $table_name 表名(要操作的目标集合名)* @param string $map 映射函数(生成键值对序列,作为 reduce 函数参数)* @param string $reduce 统计处理函数* @param array $query 过滤条件 如:array('uid'=>123)* @param array $sort 排序* @param number $limit 限制的目标记录数* @param string $out 统计结果存放集合 (不指定则使用tmp_mr_res_$table_name, 1.8以上版本需指定)* @param bool $keeptemp 是否保留临时集合* @param string $finalize 最终处理函数 (对reduce返回结果进行最终整理后存入结果集合)* @param string $scope 向 map、reduce、finalize 导入外部js变量* @param bool $jsMode 是否减少执行过程中BSON和JS的转换,默认true(注:false时 BSON-->JS-->map-->BSON-->JS-->reduce-->BSON,可处理非常大的mapreduce,//true时BSON-->js-->map-->reduce-->BSON)* @param bool $verbose 是否产生更加详细的服务器日志* @param bool $returnresult 是否返回新的结果集* @param array &$cmdresult 返回mp命令执行结果 array("errmsg"=>"","code"=>13606,"ok"=>0) ok=1表示执行命令成功* @return*/function mapReduce($table_name,$map,$reduce,$query=null,$sort=null,$limit=0,$out='',$keeptemp=true,$finalize=null,$scope=null,$jsMode=true,$verbose=true,$returnresult=true,&$cmdresult){if(empty($table_name) || empty($map) || empty($reduce)){return null;}$map = new MongoCode($map);$reduce = new MongoCode($reduce);if(empty($out)){$out = 'tmp_mr_res_'.$table_name;}$cmd = array('mapreduce' => $table_name,'map' => $map,'reduce' => $reduce,'out' =>$out);if(!empty($query) && is_array($query)){array_push($cmd, array('query'=>$query));}if(!empty($sort) && is_array($sort)){array_push($cmd, array('sort'=>$query));}if(!empty($limit) && is_int($limit) && $limit>0){array_push($cmd, array('limit'=>$limit));}if(!empty($keeptemp) && is_bool($keeptemp)){array_push($cmd, array('keeptemp'=>$keeptemp));}if(!empty($finalize)){$finalize = new Mongocode($finalize);array_push($cmd, array('finalize'=>$finalize));}if(!empty($scope)){array_push($cmd, array('scope'=>$scope));}if(!empty($jsMode) && is_bool($jsMode)){array_push($cmd, array('jsMode'=>$jsMode));}if(!empty($verbose) && is_bool($verbose)){array_push($cmd, array('verbose'=>$verbose));}$dbname = $this->curr_db_name;$cmdresult = $this->mongo->$dbname->command($cmd);if($returnresult){if($cmdresult && $cmdresult['ok']==1){$result = $this->find($out, array());}}if($keeptemp==false){//删除集合$this->mongo->$dbname->dropCollection($out);}return $result;}-------------------------------------------------------------------------------------------MongoDB 的 MapReduce 相当于 Mysql 中的"group by", 所以在 MongoDB 上使用 Map/Reduce 进行并行"统计"很容易。使用 MapReduce 要实现两个函数 Map 函数和 Reduce 函数,Map 函数调用 emit(key, value), 遍历collection 中所有记录, key 与 value 传递给 Reduce 函数进行处理。 函数和 Reduce 将 Map 函数可以使用 JavaScript 来实现,可以通db.runCommand 或 mapReduce 命令来执行一个 MapReduce的操作。MapReduce函数的参数列表如下
db.runCommand(
{ mapreduce : <collection>,
map : <mapfunction>,
reduce : <reducefunction>
[, query : <query filter object>]
[, sort : <sort the query.  useful for optimization>]
[, limit : <number of objects to return from collection>]
[, out : <output-collection name>]
[, keeptemp: <true|false>]
[, finalize : <finalizefunction>]
[, scope : <object where fields go into javascript global scope >]
[, verbose : true]
}
);
或者这么写:
db.collection.mapReduce(
<map>,
<reduce>,
{
<out>,
<query>,
<sort>,
<limit>,
<keytemp>,
<finalize>,
<scope>,
<jsMode>,
<verbose>
}
)
mapreduce:指定要进行mapreduce处理的collectionmap:map函数reduce:reduce函数out:输出结果的collection的名字,不指定会默认创建一个随机名字的collection(如果使用了out选项,就不必指定keeptemp:true了,因为已经隐含在其中了)query:一个筛选条件,只有满足条件的文档才会调用map函数。(query。limit,sort可以随意组合)sort:和limit结合的sort排序参数(也是在发往map函数前给文档排序),可以优化分组机制limit:发往map函数的文档数量的上限(要是没有limit,单独使用sort的用处不大)keytemp:true或false,表明结果输出到的collection是否是临时的,如果想在连接关闭后仍然保留这个集合,就要指定keeptemp为true,如果你用的是MongoDB的mongo客户端连接,那必须exit后才会删除。如果是脚本执行,脚本退出或调用close会自动删除结果collectionfinalize:是函数,它会在执行完map、reduce后再对key和value进行一次计算并返回一个最终结果,这是处理过程的最后一步,所以finalize就是一个计算平均数,剪裁数组,清除多余信息的恰当时机scope:javascript代码中要用到的变量,在这里定义的变量在map,reduce,finalize函数中可见verbose:用于调试的详细输出选项,如果想看MpaReduce的运行过程,可以设置其为true。也可以print把map,reduce,finalize过程中的信息输出到服务器日志上。其中重点的几个参数说明:

1、Map

Map 函数必须调用 emit(key, value) 返回键值对,使用 this 访问当前待处理的 Document。m = function() { emit(this.classid, 1) }value 可以使用 JSON Object 传递 (支持多个属性值)。例如:emit(this.classid, {count:1})

2 Reduce

Reduce 函数接收的参数类似 Group 效果,将 Map 返回的键值序列组合成 { key, [value1, value2, value3, value...] } 传递给 reduce。r = function(key, values) {var x = 0;values.forEach(function(v) { x += v });return x;}

3 Result

res = db.runCommand({mapreduce:"students",map:m,reduce:r,out:"students_res"});mapReduce() 将结果存储在 "students_res" 表中。

8.4 Finalize

利用 finalize() 我们可以对 reduce() 的结果做进一步处理。f = function(key, value) { return {classid:key, count:value}; }列名变与 “classid”和”count”了,这样的列表更容易理解。

8.5 Options

可以添加更多的控制细节 。添加query、sort等。实例:

                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: