for any info/changes follow me: @nickmilon
mongoUtils.pubsub module¶
publish and/or subscribe to a collection
-
class
mongoUtils.pubsub.
MsgState
[source]¶ Bases:
Hellas.Sparta.EnumLabels
An enum used to reflect message state used by classes
Sub
andPubSub
-
UKNKOWN
= 0¶ unknown state
-
SENT
= 1¶ message is unprocessed
-
RECEIVED
= 2¶ message has been picked up for Processing
-
SUCCES
= 3¶ message processed successfully
-
FAIL
= 4¶ message processed but failed or (any int > 10 < 100 is considered as error type)
-
-
class
mongoUtils.pubsub.
Acknowledge
[source]¶ Bases:
Hellas.Sparta.EnumLabels
An enumeration used while requesting acknowledgement to a message used by class
PubSub
.-
NO
= 0¶ requires NO acknowledgement (kind of broadcast message)
-
RECEIPT
= 1¶ acknowledge by setting state after receiving the message
-
RESULTS
= 2¶ acknowledge by setting state and issuing a result message with original as parent
-
-
class
mongoUtils.pubsub.
SubTarget
[source]¶ Bases:
Hellas.Sparta.EnumLabels
An enumeration used when subscribing to messages it is also possible to specify a string expression used by class
PubSub
.-
ANY
= 1¶ Listen to any targets
-
NAME
= 2¶ Listen only to Messages targeting (by name) this target
-
NAME_OR_ANY
= 3¶ combination of 1 or 2 above
-
NAME_RX
= 4¶ Listen to messages targeting names starting with Name (Regex)
-
-
exception
mongoUtils.pubsub.
MongoUtilsPubSubError
[source]¶ Bases:
mongoUtils.helpers.MongoUtilsError
Base class for all MongoUtils exceptions.
-
class
mongoUtils.pubsub.
Sub
(a_collection, track_field=None, name=None)[source]¶ Bases:
object
generic class for subscribing to a collection useful for implementing task-message queues and oplog tailing see here it can also be used with non capped collections except it must use polling instead of a tailing cursor until it has at least one document stored i.e. to view your local see: example test_SubToCappedOptLog to replay the oplog: set database to ‘local’ collection to ‘oplog.rs’and track_field to ‘ts’
Parameters: a_collection: (obj) a pymongo collection
- track_field: (str) optional the name of field to base the query (defaults to None)
- it must be a first level field i.e. contains no dots
- it’s values must be monotonic increasing
- it must be an indexed field (especially for large collections)
- if None the instance will try to get one by examining the documents in collection
name: (str) a name for this instance if not given defaults to db.name|collection.name
Raises: - MongoUtilsPubSubError if a track_field is not provided and can’t be obtained automatically
- MongoUtilsPubSubError if track field contains dots
-
name
¶
-
_init_query
(start_from_last=True)[source]¶ oplog_replay query option needs {‘$gte’ or ‘$gt’: ts}
Parameters: - start_from_last [True|False|value] (defaults to True
- on next inserted document if True
- from 1st document if False i.e kind of replay
- on next document after self._track_field=number if number
-
tail
(*args, **kwargs)¶
-
poll
(query={}, projection=None, start_from_last=True, sleep_secs=1, filter_func=<function <lambda>>, limit=10)[source]¶ subscribe by poll in case collection is not capped or when response time is not critical, instead of a tailing cursor we can use poll. Method is thread safe.
Parameters: - limit: (int) number of documents to return in each batch
- set it to an appropriate number according to use case, a small number (1-10) makes it more responsive a larger value (100 - 1000) makes it more efficient
see
tail()
method for other parameters
-
sub
(*args, **kwargs)[source]¶ wrapper around poll and tail, it uses tail if collection is capped else poll
-
class
mongoUtils.pubsub.
PubSub
(collection_or_name, db=None, name=None, capped=True, reset=False, size=1073741824, max_docs=None)[source]¶ Bases:
mongoUtils.pubsub.Sub
generic class for Publishing/Subscribing to a collection
see here it can also be used with non capped collections except it must use polling instead of a tailing cursor
Warning
In case you use this class to tail the oplog Make sure you DO NOT attempt writing to oplog collection also that you understand potential side effects as described here
Parameters: - collection_or_name: (obj or str) a pymongo collection or a string
- db: (obj optional) a pymongo db instance only needed if collection_or_name is a string
- name: (str) a name for this instance if not given defaults to db.name|collection.name
- capped: (bool optional) set to True to get a capped collection
- reset: (bool) drops & recreates collection and resets id counters if True
- size: (int) capped collection size in bytes
- max_docs:(int) capped collection max documents count
-
_max_name_len
= 32¶
-
_reserve_name
= ' '¶
-
_dt_frmt_info
= '{} {:%Y-%m-%d %H:%M:%S %f}'¶
-
__init__
(collection_or_name, db=None, name=None, capped=True, reset=False, size=1073741824, max_docs=None)[source]¶
-
_acknowledge_received
(msg)[source]¶ marks doc as received we check state to make sure than it was not picked by another client meanwhile
-
_yield_doc
(msg)[source]¶ descendants should check the doc and return None if don’t want to yield it else the doc to yield
-
pub
(payload, topic='', verb='', target=None, ackn=1, sentBy=None)[source]¶ Parameters: - payload: (dict) message body
- topic: message topic
- verb: message verb
- target: str or None specifies target(s) name
- ackn: Request acknowledge see: :class: Acknowledge class
- sendBy: str or None identifies sender (if None defaults to instance name)
-
tail
(topic=None, verb=None, target=2, projection=None, start_from_last=True, sleep_secs=0.01)[source]¶ subscribe by tail
Parameters: - topic: any arbitrary value specifying a topic or None
- topic: any arbitrary value specifying a verb or None
- target: a value specifying target listener or None
see
SubTarget
class - projection: a pymongo projection specifying which fields to return or None
- start_from_last: see
Sub.tail()
method - delay_secs see
Sub.tail()
method
-
poll
(topic=None, verb=None, target=2, projection=None, start_from_last=True, sleep_secs=1, limit=10)[source]¶ subscribe by poll
Parameters: see methods Sub.poll()
andPubSub.tail()
-
_tail_adhoc
(*args, **kwargs)[source]¶ bypass protocol and tails as defined by parent - used for testing