"""Classes used to import/export data to mongoDB
"""
from Hellas.Thebes import format_header
xlrd = None # reserved to import xlrd on demand
def _xlrd_on_demand():
global xlrd
if xlrd is None:
try:
import xlrd
except ImportError:
print ("this module requires xlrd library please install (pip install xlrd")
raise
return xlrd
[docs]def import_workbook(workbook, db, fields=None, ws_options={'dt_python': True}, stats_every=1000):
"""save all workbook's sheets to a db
consider using :class:`~ImportXls` class instead which is more flexible but imports only a single sheet
:Parameters: see :class:`~ImportXls` class
:Example:
>>> from pymongo import MongoClient
>>> from mongoUtils import _PATH_TO_DATA
>>> db = MongoClient().test
>>> res = import_workbook(_PATH_TO_DATA + "example_workbook.xlsx", db)
>>> res
[{'rows': 368, 'db': 'test', 'collection': 'weather'}, {'rows': 1007, 'db': 'test', 'collection': 'locations'}]
"""
_xlrd_on_demand()
workbook = xlrd.open_workbook(workbook, on_demand=True)
return [ImportXls(workbook, i, db, fields=fields, ws_options=ws_options, stats_every=stats_every)()
for i in range(0, workbook.nsheets)]
[docs]class Import(object):
"""generic class for importing into a mongoDB collection, successors should use/extend this class
:Parameters:
- db: a pynongo database object that will be used for output
- collection: a pymongo collection object that will be used for output
- drop_collection: (defaults to True)
- True drops output collection on init before writing to it
- False appends to output collection
- stats_every: int print import stats every stats_every rows or 0 to cancel stats (defaults to 10000)
"""
format_stats = "|{db:16s}|{collection:16s}|{rows:15,d}|"
format_stats_header = format_header(format_stats)
[docs] def __init__(self, collection, drop_collection=True, stats_every=10000):
if drop_collection:
collection.database.drop_collection(collection.name)
self.info = {'db': collection.database.name, 'collection': collection.name, 'rows': 0}
self.stats_every = stats_every
self.collection = collection
[docs] def import_to_collection(self):
"""successors should implement this"""
raise NotImplementedError
[docs] def _import_to_collection_before(self):
"""successors can call this or implement their's"""
if self.stats_every > 0:
print(self.format_stats_header)
[docs] def _import_to_collection_after(self):
"""successors can call this or implement their's"""
if self.stats_every > 0:
self.print_stats()
[docs] def print_stats(self):
print(self.format_stats.format(**self.info))
def __call__(self):
return self.import_to_collection()
[docs]class ImportXls(Import):
"""save an an xls sheet to a collection
`see <https://github.com/python-excel/xlrd>`_
:Parameters:
- workbook: path to a workbook or an xlrd workbook object
- sheet: name of a work sheet in workbook or an int (sheet number in workbook)
- db: a pymongo database object
- coll_name: str output collection name or None to create name from sheet name (defaults to None)
- row_start: int or None starting raw or None to start from first row (defaults to None)
- row_end:int or None ending raw or None to end at lastrow (defaults to None)
- fields:
- a list with field names
- or True (to treat first row as field names)
- or None (for auto creating field names i.e: [fld_1, fld_2, etc]
- or a function that:
- takes one argument (a list of row values)
- returns a dict (if this dict contains a key '_id' this value will be used for _id)
- >>> lambda x: {'coordinates': [x[0] , x[1]]}
- ws_options: (optional) a dictionary specifying how to treat cell values
- dt_python : bool convert dates to python datetime
- integers_only : round float values to int helpful coz all int values are represented as floats in sheets
- negatives_to_0 : treat all negative numbers as 0's
- drop_collection: (defaults to True)
- True drops output collection on init before writing to it
- False appends to output collection
- stats_every: int print import stats every stats_every rows or 0 to cancel stats (defaults to 10000)
- drop_collection: if True drops collection on init otherwise appends to collection
:Example:
>>> from pymongo import MongoClient
>>> from mongoUtils import _PATH_TO_DATA
>>> db = MongoClient().test
>>> res = ImportXls(_PATH_TO_DATA + "example_workbook.xlsx", 0, db)()
>>> res
{'rows': 367, 'db': u'test', 'collection': u'weather'}
"""
[docs] def __init__(self,
workbook, sheet,
db, coll_name=None,
row_start=None, row_end=None,
fields=True,
ws_options={'dt_python': True, 'integers_only': False, 'negatives_to_0': False},
stats_every=10000,
drop_collection=True):
_xlrd_on_demand()
if not isinstance(workbook, xlrd.book.Book):
workbook = xlrd.open_workbook(workbook, on_demand=True)
self.workbook = workbook
self.sheet = workbook.sheet_by_index(sheet) if isinstance(sheet, int) else workbook.sheet_by_name(sheet)
self._ws_options = {}
self.ws_options_set(ws_options)
coll_name = self.fix_name(self.sheet.name) if coll_name is None else coll_name
if row_start is None:
row_start = 1 if fields is True else 0
self.row_start = row_start
self.row_end = row_end
collection = db[coll_name]
super(ImportXls, self).__init__(collection, drop_collection=drop_collection, stats_every=stats_every)
self.auto_field_names(fields)
@property
def ws_options(self):
return self._ws_options
[docs] def ws_options_set(self, options_dict):
self._ws_options.update(options_dict)
[docs] def fix_name(self, name, cnt=0):
if name == '':
return 'fld_{}'.format(cnt)
else:
return name.replace(' ', '_').replace('.', '_').replace('$', '_')
[docs] def auto_field_names(self, fields):
row0_values = self.sheet.row_values(0)
if fields is True:
self._fields_or_fun = [self.fix_name(fn, cnt) for cnt, fn in enumerate(row0_values)]
elif fields is None:
self._fields_or_fun = ['fld_{}'.format(i) for i in range(len(row0_values))]
elif isinstance(fields, list):
self._fields_or_fun = [self.fix_name(fn, cnt) for cnt, fn in enumerate(fields)]
else: # then it has to be function
self._fields_or_fun = fields
return self._fields_or_fun
[docs] def row_to_doc(self, valueslist, _id=None):
if isinstance(self._fields_or_fun, list):
doc = dict(list(zip(self._fields_or_fun, valueslist)))
else:
doc = self._fields_or_fun(valueslist)
if _id is not None and doc.get('_id') is None:
doc['_id'] = _id
return doc
[docs] def ws_convert_cell(self, cl):
"""
:Parameters:
- cl an xlrd cell object
"""
# XL_CELL_BLANK XL_CELL_BOOLEAN XL_CELL_NUMBER XL_CELL_TEXT
tp = cl.ctype
vl = cl.value
if tp == xlrd.XL_CELL_NUMBER: # number
if self._ws_options.get('integers_only') is True:
vl = int(vl + 0.49999) # kind of round
if vl < 0 and self._ws_options.get('negatives_to_0'):
vl = 0
elif tp == xlrd.XL_CELL_DATE and self._ws_options.get('dt_python') is True:
vl = xlrd.xldate.xldate_as_datetime(vl, self.sheet.book.datemode)
return vl
[docs] def import_to_collection(self):
super(ImportXls, self)._import_to_collection_before()
outlist = []
for i in range(self.row_start, self.row_end or self.sheet.nrows):
self.info['rows'] += 1
row_values = [self.ws_convert_cell(cl) for cl in self.sheet.row(i)]
outlist.append(self.row_to_doc(row_values, i))
if self.stats_every and i % self.stats_every == 0:
self.print_stats()
if len(outlist) == 200:
try:
self.collection.insert_many(outlist)
outlist = []
except Exception:
print (outlist)
raise
if len(outlist) > 0:
self.collection.insert_many(outlist)
super(ImportXls, self)._import_to_collection_after()
return self.info