"""
module clients
"""
import logging
from twtPyCurl.py.utilities import DotDot
from twtPyCurl.twt.constants import TWT_URL_MEDIA_UPLOAD, TWT_URL_API_REST, TWT_URL_API_STREAM
from twtPyCurl.py.requests import (simplejson, pycurl, Client, ClientStream,
ErrorRq, ErrorRqCurl, ErrorRqHttp, format_header)
from time import sleep
from twtPyCurl.twt.endpoints import EndPointsRest, EndPointsStream
LOG = logging.getLogger(__name__)
# LOG.addHandler(logging.NullHandler())
LOG.debug("loading module: " + __name__)
[docs]def backoff(seconds): # default backoff method
return sleep(seconds)
[docs]class ErrorTwtStreamDisconnectReq(ErrorRq):
[docs] def __init__(self, error_number, msg):
LOG.error(msg)
super(ErrorTwtStreamDisconnectReq, self).__init__(error_number, msg)
[docs]class ErrorTwtMissingParameters(ErrorRq):
[docs] def __init__(self, parm_names_lst):
msg = "Missing parameters {}".format(" ".join(parm_names_lst))
super(ErrorTwtMissingParameters, self).__init__(msg)
[docs]class ErrorRqHttpTwt(ErrorRq):
[docs] def __init__(self, response):
""" i.e:
{'errors': [{'message': 'Query parameters are missing.', 'code': 25}]}
{'errors': [{'message': 'Invalid or expired token.', 'code': 89}]}
"""
if "errors" in list(response.data.keys()):
rt = {'status': response.status_http, 'msg': response.data['errors'][0]['message'],
'code': response.data['errors'][0]['code']}
else:
rt = {'status': response.status_http}
super(ErrorRqHttpTwt, self).__init__(rt)
[docs]class ClientTwtRest(Client):
"""client for Twitter `REST <https://dev.twitter.com/rest/public>`_ API
examples require a credentials.json in user's home directory see :class:`~.CredentialsProviderFile`
:param Credentials credentials: an instance of :class:`~.Credentials`
:param dict kwargs: for acceptable kwargs see :class:`~.Client`
:example:
:ref:`check here <example-rest>`
"""
[docs] def __init__(self, credentials, **kwargs):
self._endpoints = EndPointsRest(parent=self)
# composition with an endpoints object this allows to:
# 1) call it using dot notation 2) validate endpoints
super(ClientTwtRest, self).__init__(credentials=credentials, **kwargs)
self.api = self._endpoints
[docs] def request_ep(self, end_point, method='GET', parms={}, multipart=False):
"""request end point
:param str end_point: twitter REST end point sortcut ie 'users/search'
:param str method: request method one of GET or POST (defaults to GET)
:param dict parms: parameters dictionary to pass to twitter
:return: an instance of :class:`~.Response`
.. Warning:: doesn't check end_point's validity will raise a twitter API error if not valid
"""
frmt_str = TWT_URL_MEDIA_UPLOAD if end_point == "media/upload" else TWT_URL_API_REST
if end_point == "statuses/update":
# parms['status'] = parms['status'].encode('utf-8')
parms = self._request_ep_media(parms) # check for media
return self.request(frmt_str.format(end_point), method, parms, multipart)
[docs] def on_request_error_http(self, err):
"""we got an http error, if error < 500
twitter's error message is in data i.e: {'errors': [{'message': 'Invalid or expired token.', 'code': 89}]}
"""
if err < 500:
self.response.data = simplejson.loads(self.response.data)
raise ErrorRqHttpTwt(self.response)
else:
raise ErrorRqHttp(err, self.response)
[docs] def on_request_end(self):
self.response.data = simplejson.loads(self.response.data)
[docs] def help(self, *args, **kwargs):
"""delegate help to be handled by endpoints object"""
return self._endpoints._help(*args, **kwargs)
def _twtUploadMP(self, file_or_path):
return self.request(
TWT_URL_MEDIA_UPLOAD,
method='POST', parms={'media': (pycurl.FORM_FILE, file_or_path)}, multipart=True)
[docs] def _adHocCmd_(self, element, *args, **kwargs):
"""this makes the trick of issuing requests against endpoints using dot notation syntax
it is provided only for ease of use when issuing requests from command line.
Applications should not use dot notation but instead call :func:`request_ep` method
"""
dic_keys = str(element).split(self._endpoints.delimiter)[1:] # Note get rid of root
rt = self._endpoints.get_value_validate(dic_keys)
if rt:
# parms_dict= args[:-1]
if rt.path.endswith("/id"):
if not args:
raise ErrorTwtMissingParameters(['id'])
endpoint = rt.path.replace("id", str(args[0]))
else:
endpoint = rt.path
return self.request_ep(endpoint, rt.method, kwargs)
else:
return False
[docs]class ClientTwtStream(ClientStream):
"""*A client for twitter stream API*
disconnect can be initiated by a message to disconnect from twitter
or by the program by setting request_abort property to a tuple (code,message)
:param Credentials credentials: an instance of :class:`~.Credentials`
:param int stats_every: print statististics every n data packets defaults to 0 (disables statics)
:param dict kwargs: for acceptable kwargs see :class:`~.Client` and :class:`~.ClientStream`
:example:
:ref:`check here <example-stream>`
"""
# some strings for formating statistics #############################################
format_stream_stats = ClientStream.format_stream_stats + "{t_data:14,d}|{t_msgs:8,d}|"
format_stream_stats_header = format_header(format_stream_stats)
# ####################################################################################
[docs] def __init__(self, credentials=None, stats_every=1, **kwargs):
self._reset_retry()
self._endpoints = EndPointsStream(parent=self) # class composition with endpoints object
# delegate to endpoints could be done automatically but that would be too hackish
self.stream = self._endpoints.stream
self.sitestream = self._endpoints.sitestream
self.userstream = self._endpoints.userstream
self.name = kwargs.get('name') # ancestor class will set it again but we need it now
super(ClientTwtStream, self).__init__(credentials=credentials, stats_every=stats_every, **kwargs)
self.counters.update({'t_data': 0, 't_msgs': 0})
def _handle_init_end(self):
self.curl_low_speed = (1, 60)
[docs] def on_request_error_curl(self, err):
"""default error handling, for curl (connection) Errors override method for any special handling
`see curl error codes <http://curl.haxx.se/libcurl/c/libcurl-errors.html>`_
and `twitter streaming message types <https://dev.twitter.com/streaming/overview/messages-types>`_
return True to retry request raise an exception or return False to abort
remember! after 1st unsuccessful retry probably the error will be E_COULDNT_CONNECT
"""
LOG.debug("on_request_error_curl:" + str(err))
if err[0] == pycurl.E_PARTIAL_FILE and self._state.retries_curl < 4:
# err (18, 'transfer closed with outstanding read data remaining')
# usually happens in streams due to network/server temporary failure
# possible remedy curl_setopt($curl, CURLOPT_HTTPHEADER, array('Expect:'))?
if self.wait_on_nw_error(self._state.retries_curl) is not False:
self._log_retry("pycurl", err[0], err[1], self._state.retries_curl)
return True
elif err[0] == pycurl.E_OPERATION_TIMEDOUT and err[1].startswith('Operation too slow'):
# timed out as defined in LOW_SPEED_LIMIT LOW_SPEED_TIME
# check the message too because err 28 can come also from Operation timed out after
if self._state.retries_curl < 4 and self.wait_on_nw_error(self._state.retries_curl) is not False:
self._log_retry("curl", err[0], err[1], self._state.retries_curl)
return True
elif err[0] == pycurl.E_COULDNT_CONNECT and self._state.retries_curl > 0 and self._state.retries_curl < 4:
# we check retries_curl > 0 to make sure it is a reconnect attempt initiated by an other curl error
if self.wait_on_nw_error(self._state.retries_curl) is not False:
self._log_retry("curl", err[0], err[1], self._state.retries_curl)
return True
elif err[0] == pycurl.E_WRITE_ERROR and self._request_abort[0] is not None:
code, msg = self.request_abort[1:]
if code <= 12: # https://dev.twitter.com/streaming/overview/messages-types
if code in [2, 4, 7]: # danger duplicate stream or something
self._raise(ErrorTwtStreamDisconnectReq, code, msg)
elif code in [1, 10, 11, 12]: # twitter malfunction
if self.wait_on_nw_error(self._state.retries_curl) is not False:
# try to reconnect
self._log_retry("twt_disconnect_req", code, msg, self._state.retries_curl)
return True
else:
raise self._raise(ErrorTwtStreamDisconnectReq, code + 100, "can't recover: " + str(msg))
else:
raise self._raise(ErrorTwtStreamDisconnectReq, code, "we don't handle:" + str(msg))
elif code == 1001: # by convention > 1000 comes from our side
return False # disconnect gracefully
raise self._raise(ErrorTwtStreamDisconnectReq, code, "we don't handle:" + str(msg))
self._raise(ErrorRqCurl, err[0], err[1])
[docs] def on_request_error_http(self, err):
"""default error handling, for HTTP Errors override method for any special handling
return True to retry request
raise an exception or return False to abort
"""
LOG.debug("on_request_error_http:" + str(err))
if err in [500, 502, 503, 504] and self._state.retries_http < 4:
if self.wait_on_http_error(self._state.retries_http):
self._log_retry("http", err, "", self._state.retries_http)
return True
self.response.data = self.resp_buffer
self._raise(ErrorRqHttp, err, self.response)
def _log_retry(self, error_type, err_num, err_msg, cur_try):
frmt = '{:s} -auto recovering {error_type} error num = {err_num!s} {err_msg}, retries{cur_try:2d}'
LOG.debug(frmt.format(self.name, **locals()))
@classmethod
[docs] def wait_seconds(cls, try_cnt, initial, maximum, tries_max=5, exponential=False):
'''see: https://dev.twitter.com/streaming/overview/connecting
:Parameters:
- try_cnt successive retries count starting with 0
- initial float (seconds or fraction)
- maximum float (seconds or fraction)
- exponential back off exponentially if True else linearly
:Returns:
False or backoff value
'''
if try_cnt <= tries_max:
vl = min((initial ** try_cnt) if exponential else initial * try_cnt, maximum)
backoff(vl)
return vl
else:
return False
@classmethod
[docs] def wait_on_nw_error(cls, current_try):
return cls.wait_seconds(current_try, 0.25, 16)
@classmethod
[docs] def wait_on_http_error(cls, current_try):
return cls.wait_seconds(current_try, 5, 320, exponential=True)
@classmethod
[docs] def wait_on_http_420(cls, current_try):
return cls.wait_seconds(current_try, 60, 600)
[docs] def on_data_default(self, data):
"""this is where actual stream data comes after chunks are merged,
if we don't specify an on_data_cb function on class initialization
"""
# LOG.debug("on_data_default " + str(data))
jdata = simplejson.loads(data)
if self._last_req.subdomain == 'stream': # it is a statuses stream
if jdata.get('source') is not None: # it is a status (all statuses have source key sometimes can be '')
self.counters.t_data += 1
self.on_twitter_data(jdata)
else:
self.counters.t_msgs += 1
self.on_twitter_msg_base(jdata) # then it is a message
else:
pass
[docs] def request_ep(self, end_point, method, test_server=False, **kwargs):
"""shortcut to request constructs url from end_point
:param str end_point: twitter REST end point sortcut ie 'stream/statuses/filter'
:param str method: request method one of GET or POST (defaults to GET)
:param bool test_server: if True channels request to test server
:param dict kwargs: parameters dictionary to pass to twitter
:return: an instance of :class:`~.Response`
:Raises: see request method
:Usage:
>>> client.request_ep("stream/statuses/filter","POST", track="breaking, news")
.. Warning:: doesn't check end_point's validity will raise a twitter API error if not valid
"""
ep_lst = end_point.split("/")
url = TWT_URL_API_STREAM.format(ep_lst[0], "/".join(ep_lst[1:]))
if test_server:
'''modify url to send request to test server at port 8080'''
url = url.replace("https", 'http').replace('.com', '.com:8080')
self._state.retries_extra = 0 # see handle_on_headers (its reset to 0 by a successful connection)
while self._state.retries_extra < 4:
self._state.retries_extra += 1
res = self.request(url, method, kwargs)
LOG.debug('request_ep end headers {:s} buffer=[{:s}]'.format(self.response.headers, self.resp_buffer))
if self.response.status_http == 200 and self.response.headers.get('connection') == 'close':
# sometimes it returns with http 200 but connection:close in headers
LOG.debug('retrying http 200 with connection closed {:d}'.format(self._state.retries_extra))
sleep(10 * self._state.retries_extra) # back off
self._state.retries_extra = 99 # get out of here
return res
[docs] def help(self, *args, **kwargs):
"""delegate help to endpoints
:Usage:
>>> h = client.help("userstream/user")
see at: ( https://dev.twitter.com/streaming/overview ) ...
"""
return self._endpoints._help(*args, **kwargs)
[docs] def _adHocCmd_(self, element, *args, **kwargs):
"""this makes the trick of issuing requests against endpoints using dot notation syntax
it is provided only for ease of use when issuing requests from command line.
Applications should not use dot notation but instead call :func:`request_ep` method
"""
dic_keys = str(element).split(self._endpoints.delimiter)[1:] # get rid of root
if dic_keys[-1] == 'test':
test_server = True
dic_keys = dic_keys[:-1]
if dic_keys[-1] == 'error':
# simulate error testing
return self.request_ep('/'.join(dic_keys), 'GET', test_server, **kwargs)
else:
rt = self._endpoints.get_value_validate(dic_keys)
else:
test_server = False
rt = self._endpoints.get_value_validate(dic_keys)
if rt:
return self.request_ep(rt.path, rt.method, test_server, **kwargs)
else:
raise Exception("no such end point")
def _reset_retry(self):
self._retry_counters = DotDot({'retries': 0, 'bo_err_420': 60, 'bo_err_http': 5})