for any info/changes follow me: @nickmilon

mongoUtils.pubsub module

publish and/or subscribe to a collection

class mongoUtils.pubsub.MsgState[source]

Bases: Hellas.Sparta.EnumLabels

An enum used to reflect message state used by classes Sub and PubSub


unknown state

SENT = 1

message is unprocessed


message has been picked up for Processing


message processed successfully

FAIL = 4

message processed but failed or (any int > 10 < 100 is considered as error type)

class mongoUtils.pubsub.Acknowledge[source]

Bases: Hellas.Sparta.EnumLabels

An enumeration used while requesting acknowledgement to a message used by class PubSub.

NO = 0

requires NO acknowledgement (kind of broadcast message)


acknowledge by setting state after receiving the message


acknowledge by setting state and issuing a result message with original as parent

class mongoUtils.pubsub.SubTarget[source]

Bases: Hellas.Sparta.EnumLabels

An enumeration used when subscribing to messages it is also possible to specify a string expression used by class PubSub.

ANY = 1

Listen to any targets

NAME = 2

Listen only to Messages targeting (by name) this target


combination of 1 or 2 above


Listen to messages targeting names starting with Name (Regex)

exception mongoUtils.pubsub.MongoUtilsPubSubError[source]

Bases: mongoUtils.helpers.MongoUtilsError

Base class for all MongoUtils exceptions.

class mongoUtils.pubsub.Sub(a_collection, track_field=None, name=None)[source]

Bases: object

generic class for subscribing to a collection useful for implementing task-message queues and oplog tailing see here it can also be used with non capped collections except it must use polling instead of a tailing cursor until it has at least one document stored i.e. to view your local see: example test_SubToCappedOptLog to replay the oplog: set database to ‘local’ collection to ‘’and track_field to ‘ts’

  • a_collection: (obj) a pymongo collection

  • track_field: (str) optional the name of field to base the query (defaults to None)
    • it must be a first level field i.e. contains no dots
    • it’s values must be monotonic increasing
    • it must be an indexed field (especially for large collections)
    • if None the instance will try to get one by examining the documents in collection
  • name: (str) a name for this instance if not given defaults to|

  • MongoUtilsPubSubError if a track_field is not provided and can’t be obtained automatically
  • MongoUtilsPubSubError if track field contains dots
__init__(a_collection, track_field=None, name=None)[source]

oplog_replay query option needs {‘$gte’ or ‘$gt’: ts}

  • start_from_last [True|False|value] (defaults to True
    • on next inserted document if True
    • from 1st document if False i.e kind of replay
    • on next document after self._track_field=number if number
tail(*args, **kwargs)
poll(query={}, projection=None, start_from_last=True, sleep_secs=1, filter_func=<function <lambda>>, limit=10)[source]

subscribe by poll in case collection is not capped or when response time is not critical, instead of a tailing cursor we can use poll. Method is thread safe.

  • limit: (int) number of documents to return in each batch
    • set it to an appropriate number according to use case, a small number (1-10) makes it more responsive a larger value (100 - 1000) makes it more efficient
  • see tail() method for other parameters

sub(*args, **kwargs)[source]

wrapper around poll and tail, it uses tail if collection is capped else poll


called when tail exits useful only for debugging i.e. check cursor state etc


stops subscription

class mongoUtils.pubsub.PubSub(collection_or_name, db=None, name=None, capped=True, reset=False, size=1073741824, max_docs=None)[source]

Bases: mongoUtils.pubsub.Sub

generic class for Publishing/Subscribing to a collection

see here it can also be used with non capped collections except it must use polling instead of a tailing cursor


In case you use this class to tail the oplog Make sure you DO NOT attempt writing to oplog collection also that you understand potential side effects as described here

See also

more info here and here

  • collection_or_name: (obj or str) a pymongo collection or a string
  • db: (obj optional) a pymongo db instance only needed if collection_or_name is a string
  • name: (str) a name for this instance if not given defaults to|
  • capped: (bool optional) set to True to get a capped collection
  • reset: (bool) drops & recreates collection and resets id counters if True
  • size: (int) capped collection size in bytes
  • max_docs:(int) capped collection max documents count
_max_name_len = 32
_reserve_name = ' '
_dt_frmt_info = '{} {:%Y-%m-%d %H:%M:%S %f}'
__init__(collection_or_name, db=None, name=None, capped=True, reset=False, size=1073741824, max_docs=None)[source]

drops collection and resets sequence generator


marks doc as received we check state to make sure than it was not picked by another client meanwhile


descendants should check the doc and return None if don’t want to yield it else the doc to yield

acknowledge_done(msg, state=3)[source]
pub(payload, topic='', verb='', target=None, ackn=1, sentBy=None)[source]
  • payload: (dict) message body
  • topic: message topic
  • verb: message verb
  • target: str or None specifies target(s) name
  • ackn: Request acknowledge see: :class: Acknowledge class
  • sendBy: str or None identifies sender (if None defaults to instance name)
tail(topic=None, verb=None, target=2, projection=None, start_from_last=True, sleep_secs=0.01)[source]

subscribe by tail

  • topic: any arbitrary value specifying a topic or None
  • topic: any arbitrary value specifying a verb or None
  • target: a value specifying target listener or None see SubTarget class
  • projection: a pymongo projection specifying which fields to return or None
  • start_from_last: see Sub.tail() method
  • delay_secs see Sub.tail() method
poll(topic=None, verb=None, target=2, projection=None, start_from_last=True, sleep_secs=1, limit=10)[source]

subscribe by poll

Parameters:see methods Sub.poll() and PubSub.tail()
_tail_adhoc(*args, **kwargs)[source]

bypass protocol and tails as defined by parent - used for testing

_poll_adhoc(*args, **kwargs)[source]

bypass protocol and poll as defined by parent - used for testing

classmethod msg_info(msg)[source]

returns dictionary with human readable info about a message

class mongoUtils.pubsub.PubSubStats(collection)[source]

Bases: object

job_status(name=None, match=None, fields_list=['address.topic', 'status.state'])[source]
responce_stats(name=None, match={}, group=None)[source]
mesgs_persec(query={'status.state': {'$gt': 1}})[source]