'''
Created on Jul 30, 2015
@author: milon
'''
import threading
import argparse
import datetime
from time import sleep
import random
from mongoUtils.pubsub import Sub, PubSub, MsgState, Acknowledge, PubSubStats
from mongoUtils.client import muClient
from mongoUtils.configuration import testDbConStr
from Hellas.Sparta import DotDot
[docs]def printif(*args):
if __name__ == "__main__":
print (args)
[docs]class PubSubController():
_topics = ['red', 'green', 'blue']
_verbs = ['paint', 'mix']
_ackns = [Acknowledge.NO, Acknowledge.RECEIPT, Acknowledge.RESULTS]
[docs] def __init__(self, collection_name, db, max_jobs=100):
self._continue_jf = True
self._continue_mon = True
self.dt_start = datetime.datetime.utcnow()
self.tailjobs_done = None
self.tailjobs_posts = None
self._collection_name = collection_name
self._db = db
self.pubsub = PubSub(collection_name, db=db, name='controller',
capped=True, reset=True, size=2 ** 30, max_docs=10 ** 6)
@classmethod
def _rnd_topic(cls):
return cls._topics[random.randint(0, len(cls._topics)-1)]
@classmethod
def _rnd_verb(cls):
return cls._verbs[random.randint(0, len(cls._verbs)-1)]
@classmethod
def _rnd_ackn(cls):
return cls._ackns[random.randint(0, len(cls._ackns)-1)]
[docs] def monitor(self, seconds=30):
pubsubstats = PubSubStats(self.pubsub._collection)
while self._continue_mon:
seconds_run = (datetime.datetime.utcnow() - self.dt_start).total_seconds()
if seconds_run >= seconds:
self.stop()
sleep(5)
for i in pubsubstats.job_status(fields_list=['status.state'])():
if 'status_state' in i['_id'].keys():
i['_id']['status_state'] = MsgState.value_name(i['_id']['status_state'])
printif(i)
for i in pubsubstats.responce_stats()():
printif(i)
#{'seconds': 11.902, 'messages': 10000, 'msgPerSec': 840.1949252226517}
printif("msgs: {msgs:4,d} seconds: {seconds:6,.2f} msgsPerSec: {msgsPerSec:6,.2f}".format(**pubsubstats.mesgs_persec()))
return pubsubstats
[docs] def job_factory(self, topic=None, verb=None, target=None, ackn=Acknowledge.RECEIPT, max_jobs=1000):
cnt = 0
while self._continue_jf and (max_jobs is None or cnt < max_jobs):
cnt += 1
r = self.pubsub.pub(payload={'cnt': cnt},
topic=self._rnd_topic() if topic is None else topic,
verb=self._rnd_verb() if verb is None else verb,
target=target, ackn=self._rnd_ackn() if ackn is None else ackn)
sleep(0)
[docs] def monitor_jobs_posts(self, topic=None, verb=None, target=None, start_from_last=True, max_jobs=None):
printif("on start pending jobs{:10,}".format(self.pubsub._collection.count()))
self.tailjobs_posts = self.pubsub.tail(topic=topic, verb=verb, target=None,
projection=None, start_from_last=False, sleep_secs=0)
cnt = 0
for job in self.tailjobs_posts:
cnt += 1
if max_jobs is not None and cnt == max_jobs:
self.pubsub.stop()
if job['ackn'] == Acknowledge.RESULTS:
self.pubsub.acknowledge_done(job, MsgState.SUCCES)
sleep(0)
self.pubsub.restart()
[docs] def monitor_jobs_done(self):
pass
self.tailjobs_done = self.pubsub.poll(topic=None, verb=None, target=None, projection=None, start_from_last=False, sleep_secs=0.1)
for job in self.tailjobs_done:
print "{}\n monitor_jobs_done {}\n{} ".format("-" * 50, self.pubsub.msg_info(job) , "_" * 50)
[docs] def stop(self):
printif("stopping now")
self._continue_jf = False
self._continue_mon = False
sleep(5)
self.pubsub.stop()
printif("stopped")
[docs]def thread_start(target, name, daemon=True, args=(), kwargs={}):
thr = threading.Thread(target=target, name=name, args=args, kwargs=kwargs)
if daemon is True:
thr.daemon = True
thr.start()
# threads_list.append(thr)
return thr
[docs]def run(connection, collection_name, topic=None, verb=None, target=None, start_from_last=True, max_jobs=1000, ms=20):
client = muClient(connection)
assert client.db is not None
pbs_controller = PubSubController(collection_name, client.db)
reckwargs = {'topic': topic, 'verb': verb, 'target': target}
thread_start(pbs_controller.monitor_jobs_posts, 'jmonitor_jobs_posts', daemon=True, kwargs=reckwargs)
thread_start(pbs_controller.job_factory, 'job_factory', daemon=True, kwargs={'max_jobs': max_jobs})
return pbs_controller.monitor(ms)
[docs]def ps_tests(testname, connection, collection_name='PubSubSimulate'):
def speed():
"""non threading, tests for speed checks both writing / reading speed
"""
max_jobs = 10000
res = DotDot()
client = muClient(connection)
pbs_controller = PubSubController(collection_name, client.db)
dt_start_pub = datetime.datetime.utcnow()
pbs_controller.job_factory(topic='foo', verb='bar', target=None, max_jobs=max_jobs)
dt_start_sub = datetime.datetime.utcnow()
pbs_controller.monitor_jobs_posts(topic='foo', verb='bar', target=None, start_from_last=False, max_jobs=max_jobs)
dt_end = datetime.datetime.utcnow()
res.secondsToPub = (dt_start_sub-dt_start_pub).total_seconds()
res.secondsToSub = (dt_end - dt_start_sub).total_seconds()
res.msgsPerSecPub = int(max_jobs / res.secondsToPub)
res.msgsPerSecSub = int(max_jobs / res.secondsToSub)
printif(res)
pbs_controller.pubsub.reset()
return res
def query():
"""non threading, test
"""
max_jobs = 10000
res = DotDot()
client = muClient(connection)
pbs_controller = PubSubController(collection_name, client.db)
pbs_controller.job_factory(topic=None, verb=None, target=None, max_jobs=max_jobs)
pbs_controller.monitor_jobs_posts(topic='red', verb='paint', target=None, start_from_last=False, max_jobs=max_jobs)
printif(res)
pbs_controller.pubsub.reset()
return res
if testname == 'speed':
return speed()
elif testname == 'speedThread':
res = run(connection, collection_name, topic=None, verb=None, target=None,
start_from_last=False, max_jobs=2000, ms=10)
res = res.mesgs_persec()
printif("res", res)
return res
elif testname == 'query':
res = run(connection, collection_name, topic='red', verb='paint', target=None,
start_from_last=False, max_jobs=2000, ms=10)
[docs]def parse_args():
parser = argparse.ArgumentParser(description="tail a mongodb collection")
parser.add_argument('-connection', type=str, default=testDbConStr,
help='a db connection string\
as defined in http://docs.mongodb.org/manual/reference/connection-string/\
that included a database name defaults to testDbConStr in configuration file'
)
parser.add_argument('-collection', type=str, help='collection name', default='PubSubSimulate')
parser.add_argument('-topic', type=str, help='topic to subscribe', default=None)
parser.add_argument('-verb', type=str, help='verb to subscribe', default=None)
parser.add_argument('-target', type=str, help='target to subscribe', default=None)
parser.add_argument('-max_jobs', type=int, help='max_number of Messages', default=1000)
parser.add_argument('-test', type=str, help='test name', default=None,
choices=['speed', 'speedThread', 'query'])
return parser.parse_args()
[docs]def main():
args = parse_args()
printif("starting PubSubBench", vars(args))
if args.test is None:
run(args.connection, args.collection, topic=args.topic, verb=args.verb, target=args.target, max_jobs=args.max_jobs)
else:
ps_tests(args.test, args.connection, args.collection)
if __name__ == "__main__":
main()