Source code for mongoUtils.aggregation

"""aggregation operations"""

from mongoUtils.helpers import pp_doc
from pymongo.command_cursor import CommandCursor
from bson.son import SON


[docs]class Aggregation(object): """**a helper for constructing aggregation pipelines** see: `aggregation framework <http://docs.mongodb.org/manual/reference/aggregation/>`_ supports all `aggregation operators <http://docs.mongodb.org/manual/reference/operator/aggregation/>`_ :param obj collection: a pymongo collection object :param list pipeline: (optional) an initial pipeline list :param dict kwargs: (optional) `any arguments <http://docs.mongodb.org/manual/reference/operator/aggregation/>`_ :returns: an aggregation object :Example: >>> from pymongo import MongoClient;from mongoUtils.configuration import testDbConStr # import MongoClient >>> db = MongoClient(testDbConStr).get_default_database() # get test database >>> aggr_obj = Aggregation(db.muTest_tweets_users, allowDiskUse=True) # select users collection >>> aggr_obj.help() # ask for help ['project', 'match', 'redact', 'limit', .... ] # available operators >>> aggr_obj.match({'lang': 'en'}) # match English speaking >>> aggr_obj.group({'_id': None, "avg_followers": {"$avg": "$followers_count"}}) # get average followers >>> print(aggr_obj.code(False)) # print pipeline [{"$match": {"lang": "en"}},{"$group": {"avg_followers": {"$avg": "$followers_count"},"_id": null}}] >>> next(aggr_obj()) # execute and get results {u'avg_followers': 2943.8210227272725, u'_id': None}) # results """ # executes aggregation _operators = 'project match redact limit skip sort unwind group out geoNear'.split(' ') _frmt_str = "{}\nstage#= {:2d}, operation={}"
[docs] def __init__(self, collection, pipeline=None, **kwargs): def _makefun(name): setattr(self, name, lambda value, position=None: self.add('$' + name, value, position)) self._collection = collection self._kwargs = kwargs self._pll = pipeline or [] # pipeline list for item in self._operators: # auto build functions for operators _makefun(item)
@classmethod
[docs] def construct_fields(cls, fields_list=[]): """a constructor for fields """ return SON([(i.replace('.', '_'), '$'+i) for i in fields_list])
@classmethod
[docs] def construct_stats(cls, fields_lst, _id=None, stats=['avg', 'max', 'min'], incl_count=True): """a constructor helper for group statistics :Parameters: - fields_lst: (list) list of field names - stats: (list) list of statistics - incl_count: (Bool) includes a count if True :Example: >>> specs_stats(['foo']) {'max_foo': {'$max': '$foo'}, '_id': None, 'avg_foo': {'$avg': '$foo'}, 'min_foo': {'$min': '$foo'}} """ frmt_field_stats = "{}_{}" res = {} for field in fields_lst: res.update({frmt_field_stats.format(i, field): {'$'+i: '$'+field} for i in stats}) if incl_count: res.update({'count': {'$sum': 1}}) res.update({'_id': _id}) return res
@property def pipeline(self): """returns the pipeline (a list)""" return self._pll @classmethod
[docs] def help(cls, what='operators'): """returns list of available operators""" print(cls._operators)
[docs] def add(self, operator, value, position=None): """adds an operation at specified position in pipeline""" if position is None: position = len(self._pll) self._pll.insert(position, {operator: value})
[docs] def search(self, operator, count=1): """returns (position, operator""" cnt = 0 for i, item in enumerate(self.pipeline): if list(item.keys())[0] == operator: cnt += 1 if cnt == count: return (i, item)
[docs] def save(self, file_pathname): """save pipeline list to file""" with open(file_pathname, 'w') as f: return f.write(self.code(verbose=False))
[docs] def remove(self, position): """remove an element from pipeline list given its position""" return self._ppl.remove(position)
[docs] def code(self, verbose=True): return pp_doc(self.pipeline, 4, sort_keys=False, verbose=verbose)
[docs] def clear(self): self._ppl = []
[docs] def __call__(self, print_n=None, **kwargs): """perform the aggregation when called >>> Aggregation_object() for kwargs see: `aggregate <http://api.mongodb.org/python/current/api/pymongo/collection.html>`_ :Parameters: - print_n: - True: will print results and will return None - None: will cancel result printing - int: will print top n documents - kwargs: if any of kwargs are specified override any arguments provided on instance initialization. """ tmp_kw = self._kwargs.copy() tmp_kw.update(kwargs) rt = self._collection.aggregate(self.pipeline, **tmp_kw) if print_n is not None: print (self._frmt_str.format("--" * 40, len(self.pipeline), str(self.pipeline[-1]))) if isinstance(rt, CommandCursor): for cnt, doc in enumerate(rt): print (doc) if print_n is not True and cnt+2 > print_n: break return None else: print (rt) return rt
[docs]class AggrCounts(Aggregation): """ constructs a group count aggregation pipeline based on :class:`~Aggregation` class :param obj collection: a pymongo collection object :param str field: field name :param dict match: a query match expression, defaults to None :param dict sort: a sort expression defaults to {'count': -1} :param dict kwargs: optional arguments to pass to parent :class:`~Aggregation` :Example: >>> from pymongo import MongoClient;from mongoUtils.configuration import testDbConStr # import MongoClient >>> db = MongoClient(testDbConStr).get_default_database() # get test database >>> AggrCounts(db.muTest_tweets_users, "lang", sort={'count': -1})(verbose=True) # counts by language {u'count': 352, u'_id': u'en'} {u'count': 283, u'_id': u'ja'} {u'count': 100, u'_id': u'es'} ... """
[docs] def __init__(self, collection, field, match=None, sort={'count': -1}, **kwargs): super(AggrCounts, self).__init__(collection, **kwargs) if match is not None: self.match(match) self.group({'_id': '$'+field, 'count': {'$sum': 1}}) if sort is not None: self.sort(sort)