Mongodb Mapreduce 初窥

本文涉及的产品
云数据库 MongoDB,通用型 2核4GB
简介:

去年年底,开始接触并学习Mapreduce模型。因为工作上的关系,最近开始研究Mongodb,其中对其新特性(2010年四月)reduce模型实现产生的兴趣,因为特别留意了一下。当然网上关于该方面的内容并不是很多,且多为EN文,所以我想有必要将学习使用过程中的一些问题作一下记录并加以整理,因为就有了此文。

       废话不多说了,开始正文吧!

       目前支持Mongodb的C#客户端应该就是Samuel Corder 开源的这个项目了,链接:http://github.com/samus/mongodb-csharp

       其中在它的源码包中的MongoDB.Net-Tests目录下有对TestMapReduce和TestMapReduceBuilder相应测试用例,因为我本地没安装NUnit,所以接下来的内容我是在一个新建的web项目中直接Copy其中的部分代码做的测试(注:有关Mapreduce模型的内容请查阅相关资料)。

     

      首先我们要先加载测试数据,这里我们以DNT中的在线用户列表的(结构)作为依据,批量倒入10条记录,代码如下:

 

Mongo db  =   new  Mongo( " Servers=10.0.4.66:27017;ConnectTimeout=300000;ConnectionLifetime=300000;MinimumPoolSize=25;MaximumPoolSize=25;Pooled=true " );         

db.Connect(); 

Database test 
=  db.GetDatabase( " test " );
IMongoCollection things 
=  test[ " things " ];

for  ( int  i  =   1 ; i  <=   10 ;i ++ )
          {    
              Document record 
=   new  Document();
              record[
" _id " =  i;               
              record[
" userid " =  i;   
              record[
" ip " =   " 10.0.7. "   +  i;
              record[
" username " =   " 用户 "   +  i;
              record[
" nickname " =   " 用户 "   +  i;
              record[
" password " =   "" ;
              record[
" groupid " =  i; // 下面将就该字段使用MAPREDUCE方式进行分组统计
              record[ " olimg " =   "" ;
              record[
" adminid " =   0 ;
              record[
" invisible " =   0 ;
              record[
" action " =   0 ;
              record[
" lastactivity " =   1 ;
              record[
" lastposttime " =  DateTime.Now.ToString();
              record[
" lastpostpmtime " =  DateTime.Now.ToString();
              record[
" lastsearchtime " =  DateTime.Now.ToString();
              record[
" lastupdatetime " =   " 1212313221231231213321 " ;
              record[
" forumid " =   0 ;
              record[
" forumname " =   "" ;
              record[
" titleid " =   0 ;
              record[
" title " =   "" ;
              record[
" verifycode " =   "" ;
              record[
" newpms " =   0 ;
              record[
" newnotices " =   0 ;
              things.Insert(record);             

          } 

      db.Disconnect(); 

   

        假定目前我们有这样一个需求,就是找出该表中用户组(groupid)字段为5的用户数,当然这里我们不会使用普通的查询方法,而是使用MAPREDUCE方式,其工作过程分为两个阶段:map阶段和reduce阶段。每个阶段都有键/值对作为输入和输出,并且它们的类型可由程序员选择。下面是其实现方式:

        首先是map方法:    

  string  mapfunction  =   " function() {  if(this.groupid==5) {emit({groupid : 5}, 1);} } " ;

  

        然后是reduce方法:    

  string  reducefunction  =   " function(key, current ){ "   +
                                
"    var count = 0; "   +
                                
"    for(var i in current) { "   +
                                
"        count+=current[i]; "   +
                                
"    } "   +
                                
"    return count; "   +
                              
" }; " ;

  

         最后我们使用下面代码实现对上面MAP,REDUCE的相应代码绑定和MapReduce类的声明:    

MapReduce mr  =  mrcol.MapReduce();
    mr.Map 
=   new  Code(mapfunction);
    mr.Reduce 
=   new  Code(reducefunction4);
    mr.Execute();
    
foreach  (Document doc  in  mr.Documents)
    {
           
int  groupCount  =  Convert.ToInt32(doc[ " value " ]);
    }

    mr.Dispose();

  

       运行上面代码,显示结果如下: 

     mongodb_mapreduce_1

      

     

      当前上面监视窗口中的"id:"{"groupid":5},即是mapfunction中的定义,当然如果要统计所有用户组(10个用户组)中各自的用户数,只把将mapfunction改写成:

      string mapfunction = "function() { emit(this.groupid, 1); }";

 

    这样,它就会按当前用户所属的groupid来作为键(确保不重复),凡是同一组的用户就作为输出进行发送(emit),emit可以理解为调用reduce方法,这里参数为1[即累加1操作])。

 

     目前我在网上打到mongodb示例基本上都是围绕分组统计功能展开的。

     当然就其传参和返回值都可以使用类似元组的方式,记得上面的“emit({groupid : 5}, 1)”代码吗?返回值这里也可以使用下面的方式:  

string  reducefunction  =   " function(key, current ){ "   +
                                 
"    var count = 0; "   +
                                 
"    for(var i in current) { "   +
                                 
"        count+=current[i]; "   +
                                 
"    } "   +
                                 
"    return { groupcount : count }; "   +    // 注意这里的返回方式
                                " }; "

 

      返回类型变了,取值的方式也要发生变成:

int  groupCount  =   int .Parse(((Document)doc[ " value " ])[ " groupcount " ].ToString()); 

 

      当然,上面的MapReduce 类的声明使用方式过于拘谨,下面使用链式调用的方式:

using  (MapReduceBuilder mrb  =  mrcol.MapReduceBuilder().Map(mapfunction).Reduce(reducefunction)) 
{
        
using  (MapReduce mr  =  mrb.Execute())
        {
                   
foreach  (Document doc  in  mr.Documents)
                   {
                       
int  groupCount  =   int .Parse(((Document)doc[ " value " ])[ " groupcount " ].ToString());
                   }
        }

  

       返回的结果与之前的一样,呵呵。

       另外,mongodb还支持更加复杂的数据结构,比如官司方给的下面这个数据结构示例:

          mrcol.Insert( new  Document().Append( " _id " 1 ).Append( " tags " new  String[]{ " dog " " cat " }));
          mrcol.Insert(
new  Document().Append( " _id " 2 ).Append( " tags " new  String[]{ " dog " }));
          mrcol.Insert(
new  Document().Append( " _id " 3 ).Append( " tags " new  String[]{ " mouse " " cat " " dog " }));
          mrcol.Insert(
new  Document().Append( " _id " 4 ).Append( " tags " new  String[]{})); 

  

       可以看出tags字段(这里暂且这么说,呵呵),就是一个字符串数组,而下面的mapreduce方法将会统计里面单词dog,cat,mouse的出现次数:

string  mapfunction  =   " function(){\n "   +
                           
"    this.tags.forEach(\n "   +
                           
"        function(z){\n "   +
                           
"            emit( z , { count : 1 } );\n "   +
                           
"        });\n "   +
                           
" }; " ;
string  reducefunction  =   " function( key , values ){\n "   +
                               
"     var total = 0;\n "   +
                               
"     for ( var i=0; i<values.length; i++ )\n "   +
                               
"         total += values[i].count;\n "   +
                               
"     return { count : total };\n "   +
                               
" }; "

  

      对于如何对(含)日期型数据的键进行分组统计,下面的这个链接中有详细说明(统计每天用户的访问量):

       Counting Unique Items with Map-Reduce 

 

      下面这个链接就是官方给出示例的文档链接页面,其中包括更加复杂的mapreduce示例:

       http://www.mongodb.org/display/DOCS/MapReduce

  

      当然目前对于Mapreduce模式,Mongodb使用一个单独的进程来跑的,这主要是因为JavaScript 引擎的限制。目前开发团队正在设计解决这一问题。原文:

       As of right now, MapReduce jobs on a single mongod process are single threaded. This is due to a design limitation in current JavaScript engines. We are looking into alternatives to solve this issue, but for now if you want to parallelize your MapReduce jobs, you will need to either use sharding or do the aggregation client-side in your code.

 

       另外就是到现在对于MONGODB那一端是如果把输入数据划分成等长的小数据发送到MapReduce(Hadoop把这一操作称为input split,即输入切片),因为这一点对于并发运行的作业进行负载平衡很重要,而在 Hadoop中一个理想的切片大小往往是一个HDFS块的大小,默认是64 MB(Hadoop权威指南(中文版))。

 

       除了上面所提到了,在MONGODB的mapreduce模型中,还支持map输出的临时结果集的持久化,而这一特色还在文档中专门作了如下说明:

Note on Permanent Collections

Even when a permanent collection name is specified, a temporary collection name will be used during processing. At map/reduce completion, the temporary collection will be renamed to the permanent name atomically. Thus, one can perform a map/reduce job periodically with the same target collection name without worrying about a temporary state of incomplete data. This is very useful when generating statistical output collections on a regular basis.

 

     而如果想要持久化该临时集合,只要将mapreduce实例的Keptemp属性设为true,同时使用Out属性(方法)指定输出的集合名称即可。

 

     当然就目前我测试时结果来看,在单台机器上做这种模型测试就效率上是得不尝失的(执行周期太长),特别是数据量特别大(比如3000w以上),所以应用(或运行)场景的选择很重要。

     上面所说的示例比较简单,都是在单一reduce任务中的执行场景,如下图:

   singlereduce

  

     而实际的生产环境要比上图复杂许多,比如多reduce任务情况,在Hadoop中,如果运行多个reduce任务,map任务会对其输出进行分区,为每个reduce任务创建一个分区(partition)。每个分区包含许多键(及其关联的值),但每个键的记录都在同一个分区中。分区可以通过用户定义的partitioner来控制。如下图:

 

      multireducer

 

     鉴于目前网上mongodb相关文档内容并不多,所以这里暂不多做讨论了。



本文转自 daizhenjun 51CTO博客,原文链接:http://blog.51cto.com/daizhj/337440,如需转载请自行联系原作者

相关实践学习
MongoDB数据库入门
MongoDB数据库入门实验。
快速掌握 MongoDB 数据库
本课程主要讲解MongoDB数据库的基本知识,包括MongoDB数据库的安装、配置、服务的启动、数据的CRUD操作函数使用、MongoDB索引的使用(唯一索引、地理索引、过期索引、全文索引等)、MapReduce操作实现、用户管理、Java对MongoDB的操作支持(基于2.x驱动与3.x驱动的完全讲解)。 通过学习此课程,读者将具备MongoDB数据库的开发能力,并且能够使用MongoDB进行项目开发。 &nbsp; 相关的阿里云产品:云数据库 MongoDB版 云数据库MongoDB版支持ReplicaSet和Sharding两种部署架构,具备安全审计,时间点备份等多项企业能力。在互联网、物联网、游戏、金融等领域被广泛采用。 云数据库MongoDB版(ApsaraDB for MongoDB)完全兼容MongoDB协议,基于飞天分布式系统和高可靠存储引擎,提供多节点高可用架构、弹性扩容、容灾、备份回滚、性能优化等解决方案。 产品详情: https://www.aliyun.com/product/mongodb
相关文章
|
分布式计算 NoSQL JavaScript
初识Mongodb之MapReduce操作篇
初识Mongodb之MapReduce操作篇
293 0
初识Mongodb之MapReduce操作篇
|
分布式计算 JavaScript NoSQL
|
分布式计算 JavaScript NoSQL
|
分布式计算 NoSQL 测试技术
|
Java 数据格式 XML