Source code for mongoUtils.mapreduce

"""map reduce operations

.. Note:: | `see mongodb manual <http://docs.mongodb.org/manual/core/map-reduce/>`_
          | optimize by sorting on emit field
            `see notes <http://edgystuff.tumblr.com/post/7624019777/optimizing-map-reduce-with-mongodb>`_
          | but also see `my ticket <https://jira.mongodb.org/browse/SERVER-16544>`_ (probably solved after v3)
          | when mr output is {'inline':1 } MAX output size is 16MB (Max BOSON doc size)
          | when map reduce runs on a replica secondary only output option is 'inline'

"""

from bson.code import Code
from bson import SON

from mongoUtils.helpers import parse_js_default
from mongoUtils.helpers import coll_index_names


[docs]def mr( coll, # a pymongo collection instance fun_map, # js function used for map fun_reduce=None, # js function used for reduce defaults to one counting values query={}, # a pymongo query dictionary to query coll defaults to {} out={"replace": 'mr_tmp'}, # output dict {'replace'|'merge'|'reduce'|'inline':collection_name|'database':db_name} fun_finalize=None, # js function to run on finalize scope={}, # vars available during map-reduce-finalize sort=None, # i.e: sort= { "_id":1 } short dict to sort before map jsMode=False, # True|False (don't convert to Bson between map & reduce if True) verbose=1 # if 1 includes timing info on output if 2,3 more details ): """simplified generic Map Reduce `see MongoDB Map Reduce <http://docs.mongodb.org/manual/reference/command/mapReduce/>`_ :Parameters: - coll (object) a pymongo collection instance - fun_map js function used for map - fun_reduce js function used for reduce defaults to a function that increments value count - query a pymongo query dictionary to query collection, defaults to {} - out a dictionary for output specification {replace|merge|reduce|:collection_name|db:db_name} also can specify {'inline':1} for in memory operation (with some limitations) defaults to {"replace": 'mr_tmp'} - scope vars available during map-reduce-finalize - sort dictionary to sort before map i.e: sort= { "_id":1 } - jsMode True|False (don't convert to Bson between map & reduce if True) should be False if we expect more than 500K distinct results - db' (optional): database_name if no db is specified output collection will be in same db as input coll :Returns: tuple (results collection or results list if out={"inline" :1}, MR response statistics) :Example: see :func:`group_counts` function """ def mr_cmd(): """returns the actual command from output parameter""" return [i for i in ['replace', 'merge', 'reduce', 'inline'] if i in list(out.keys())][0] command = mr_cmd() out_db = out.get('db', None) out = SON([(command, out[command]), ('nonAtomic', out.get('nonAtomic', False))]) if out_db: out.update(SON([('db', out_db)])) # out = SON([(command, out[command]), ('db', out.get('db')), # ('nonAtomic', out.get('nonAtomic', False))]) # nonAtomic not allowed on replace fun_map = Code(fun_map, {}) if fun_reduce is None: fun_reduce = parse_js_default('MapReduce.js', 'GroupCountsReduce') fun_reduce = Code(fun_reduce, {}) if sort: sort = SON(sort) if verbose > 1: frmt = "Map Reduce {}\n\ collection={coll!s:}\n\ query= {query!s:}\n\ sort= {sort!s:}\n\ scope= {scope!s}\n\ out= {out!s:}\n\ jsMode= {jsMode!s:}\n\ map= {fun_map!s:}\n\ reduce= {fun_reduce!s:}\n\ finalize= {fun_finalize!s:}\n" print(frmt.format('Starting...', **locals())) r = coll.map_reduce(fun_map, fun_reduce, out=out, query=query, finalize=fun_finalize, scope=scope, sort=sort, full_response=True, jsMode=jsMode) if verbose > 0: frmt = "Map Reduce {}\n\ ok= {ok:}\n\ millisecs = {timeMillis:,d}\n\ counts= {counts!s:}\n\ out= {!s:}\n" print(frmt.format('End', out, **r)) if command == 'inline': # if 'results' in list(r.keys()): # @note: results is a list if inline else a coll results = r['results'] del r['results'] else: db_out = out.get('db', coll.database.name) # if db not specified it is the collection_db results = coll.database.client[db_out][out[command]] return results, r
[docs]def group_counts( collection, field_name="_id", query={}, out={"replace": 'mr_tmp'}, sort=None, jsMode=False, verbose=1): """group values of field using map reduce :Parameters: see :func:`mr` function :Example: >>> from pymongo import MongoClient;from mongoUtils.configuration import testDbConStr # import MongoClient >>> db = MongoClient(testDbConStr).get_default_database() # get test database >>> col, res = group_counts(db.muTest_tweets_users, 'lang', out={"replace": "muTest_mr"}) # execute MR >>> res # check MR statistics {u'counts': {u'input': 997, u'reduce': 72, u'emit': 997, u'output': 21}, u'timeMillis': 256, u'ok': 1.0, u'result': u'del_1'} >>> for i in col.find(sort=[('value',-1)]): print i # print MR results {'_id': 'en', 'value': 352.0} {'_id': 'ja', 'value': 283.0} {'_id': 'es', 'value': 100.0} >>> """ FunMap = parse_js_default('MapReduce.js', 'GroupCountsMap', field_name) return mr(collection, FunMap, query=query, out=out, sort=sort, verbose=verbose, jsMode=jsMode)
[docs]def mr2( operation, # one of 'Orphans' or 'Join' col_a, col_a_key, col_b, col_b_key, col_a_query=None, col_b_query=None, db=None, out=None, sort_on_key_fields=False, jsMode=False, verbose=3): """A kind of sets operation on 2 collections Map Reduce two collection objects (col_a, col_b) on a common field (col_a_key, col_b_key) allowing queries (col_a_query, col_b_query) :Parameters: - col_a, col_b: pymongo collection objects - col_a_key col_b_key: name of fields to run MR for col_a & col_b - col_a_query, col_b_query optional queries to run on respective collections - db optional db name to use for results (use 'local' to avoid replication on results) - out: optional output collection name defaults to mr_operation - sort_on_key_fields: (bool) tries to sort on key if key has an index this is supposed to speed up MR - jsMode (True or False) (see mongo documentation) :Returns: a tuple(results_collection collection, MapReduce1 statistics, MapReduce2 statistics) :results_collection: - if operation == 'Orphans': - {'_id': 'XXXXX', 'value': {'A': 2.0, 'sum': 3.0, 'B': 1.0}} - value.a = count of documents in a - value.b count of documents in b, - sum = count of documents in both A+B - to get documents non existing in col_a: - >>> resultCollection.find({'value.a':0}) - to get documents non existing in col_b: - >>> resultCollection.find({'value.b':0}) - to get documents existing in both col_a and col_b - >>> resultCollection.find({'value.a':{'$gt':0}, 'value.a':{'$gt': 0}}) - to check for unique in both collections - >>> resultCollection.find({'value.sum':2}) - if operation = 'Join': - performs a join between 2 collections - {'_id': 'XXXXX', 'value': {'a': document from col_a,'b': document from col_b}} - if a document is missing from a collection its corresponding value is None - .. Warning:: document's value will also be None if key exists but is None so res[0].find({'value.b': None}} means either didn't exist in collection b or its value is None :Example: >>> from pymongo import MongoClient;from mongoUtils.configuration import testDbConStr # import MongoClient >>> db = MongoClient(testDbConStr).get_default_database() # get test database >>> res , stats1, stats2 = mr2('Orphans', db.muTest_tweets, 'user.screen_name', # execute col_b = db.muTest_tweets_users, col_b_key='screen_name', col_b_query= {'screen_name': {'$ne':'Albert000G'}}, verbose=0) >>> res.find({'value.b':0}).count() # not found in b 1 >>> res.find({'value.b':0})[0] {u'_id': u'Albert000G', u'value': {u'a': 1.0, u'sum': 1.0, u'b': 0.0}} >>> res = mr.mr2('Join', db.muTest_tweets, 'user.screen_name', # execute a Join col_b = db.muTest_tweets_users, col_b_key='screen_name', col_a_query= {}, col_b_query= {'screen_name':{'$ne':'Albert000G'}}, verbose=3) >>> f = res[0].find({'value.b': None}) # check missing in b >>> for i in f: print i['value']['a']['user']['screen_name'] Albert000G """ if out is None: out = 'mr2_' + operation map_js = parse_js_default('MapReduce.js', operation + 'Map') reduce_js = parse_js_default('MapReduce.js', operation + 'Reduce') mr_a = mr( col_a, map_js % (col_a_key), fun_reduce=reduce_js, query=col_a_query, out={'db': db, 'replace': out} if db else {'replace': out}, scope={'phase': 1}, sort={col_a_key: 1} if sort_on_key_fields and col_a_key in coll_index_names(col_a) else None, jsMode=jsMode, verbose=verbose ) mr_b = mr( col_b, map_js % (col_b_key), fun_reduce=reduce_js, query=col_b_query, out={'db': db, 'reduce': out} if db else {'reduce': out}, scope={'phase': 2}, sort={col_b_key: 1} if sort_on_key_fields and col_b_key in coll_index_names(col_b) else None, jsMode=jsMode, verbose=verbose ) return mr_b[0], mr_a[1], mr_b[1]