Source code for ocelot.curator.DataSchema

import datetime
import json
from collections import OrderedDict

import license
import pymongo
from monty.json import MSONable
from monty.json import MontyDecoder
from monty.json import MontyEncoder


[docs]class DataSchemaError(Exception): pass
[docs]def string_acronym(s): return "".join(e[0] for e in s.split())
[docs]class DataProvider(MSONable):
[docs] def __init__(self, name: str, institution: str, url: str): """ this can represent a research group, a journal, or a database """ self.name = name self.institution = institution self.url = url
def __repr__(self): name = '_'.join(self.name.split()) institution = string_acronym(self.institution) return '{}-{}'.format(name, institution)
[docs] @classmethod def from_publication_doi(cls, doi): pass
[docs]class DataAccess(MSONable): # provider + date + lic
[docs] def __init__(self, sharedby: DataProvider, accessdate: datetime.date, license: license.base.License or None): self.sharedby = sharedby self.accessdate = accessdate self.license = license
def __repr__(self): dp = self.sharedby.__repr__() date = self.accessdate.isoformat() return '-'.join([dp, date])
[docs] def as_dict(self) -> dict: d = OrderedDict() d['sharedby'] = self.sharedby.as_dict() d['accessdate'] = self.accessdate.isoformat() try: d['license'] = self.license.name except AttributeError: d['license'] = None return d
[docs] @classmethod def from_dict(cls, d): return cls( DataProvider.from_dict(d['sharedby']), datetime.date.fromisoformat(d['accessdate']), license.find(d['license']) )
[docs]class DataEntry(MSONable):
[docs] def __init__( self, data_content, data_access: DataAccess, _id: str or bytes, data_properties=None ): self.data_content = data_content self.data_access = data_access self._id = _id if data_properties is None: data_properties = dict() self.data_properties = data_properties
[docs] def mongo_insert(self, mongo_uri, database_name, collection_name=None, overwrite=False): """ insert RawData as an entry into a mongodb using pymongo :param mongo_uri: port level is enough :return: """ if collection_name is None: collection_name = self.__class__.__name__ database = pymongo.mongo_client.MongoClient(host=mongo_uri)[database_name] collection = database[collection_name] existed = collection.find_one({'_id': self._id}) inserted = False if existed is None: result = collection.insert_one(json.loads(self.to_json())) if result.acknowledged: inserted = True else: if overwrite: result = collection.replace_one({'_id': self._id}, self.as_dict()) if result.acknowledged: print('{}: previously existed {} was replaced'.format(self._id, result.matched_count)) inserted = True else: print('{}: previously existed and was NOT updated'.format(self._id)) return inserted
[docs] @classmethod def from_jsonfile(cls, jsonfile): with open(jsonfile, 'r') as prev_json: prev_jsons = prev_json.read() return json.loads(prev_jsons, cls=MontyDecoder)
[docs] def to_jsonfile(self, jsonfile): with open(jsonfile, 'w') as cdata_json: json.dump(self.as_dict(), cdata_json, cls=MontyEncoder) with open(jsonfile, 'r') as cdata_json: s = cdata_json.read() if not s == self.to_json(): raise DataSchemaError('rawjsonfile is different from to_json') return jsonfile
[docs] @classmethod def from_json(cls, s): return cls.from_dict(json.loads(s))
[docs] @classmethod def from_mongo(cls, mongo_query, mongo_uri, database_name, collection_name=None): if collection_name is None: collection_name = cls.__name__ if isinstance(mongo_query, str): mongo_query = {'_id': mongo_query} database = pymongo.mongo_client.MongoClient(host=mongo_uri)[database_name] collection = database[collection_name] cursor = collection.find(mongo_query) if cursor.count() > 1: raise DataSchemaError('return multiple results w. query: {}'.format(mongo_query)) elif cursor.count() == 0: raise DataSchemaError('return no results w. query: {}'.format(mongo_query)) else: existed = cursor[0] return cls.from_dict(existed)
[docs]class CuratedData(DataEntry):
[docs] def mongo_insert(self, mongo_uri, database_name, collection_name=None, overwrite=False, rawdatacheck=True, rawdatacollection='RawData'): """ check if raw data is there """ if collection_name is None: collection_name = self.__class__.__name__ database = pymongo.mongo_client.MongoClient(host=mongo_uri)[database_name] if rawdatacheck: rawcoll = database[rawdatacollection] rawid = self.data_properties['rawdataid'] if rawcoll.find_one({'_id': rawid}) is None: print('this curated data relies on non-existing raw data: {}'.format(rawid)) return False collection = database[collection_name] existed = collection.find_one({'_id': self._id}) inserted = False if existed is None: result = collection.insert_one(json.loads(self.to_json())) if result.acknowledged: inserted = True else: if overwrite: result = collection.replace_one({'_id': self._id}, self.as_dict()) if result.acknowledged: print('{}: previously existed {} was replaced'.format(self._id, result.matched_count)) inserted = True else: print('{}: previously existed and was NOT updated'.format(self._id)) return inserted
[docs]class RawData(DataEntry): pass
JohnAnthony_UKY = DataProvider('John Anthony', 'University of Kentucky', 'https://chem.as.uky.edu/users/anthony') MikeHaley_UO = DataProvider('Mike Haley', 'University of Oregon', 'https://haleylab.uoregon.edu/') CSD_CCDC = DataProvider('Cambridge Structural Database', 'Cambridge Crystallographic Data Centre', 'https://www.ccdc.cam.ac.uk/') SeanParkin_UKY = DataProvider('Sean Parkin', 'University of Kentucky', 'https://xray.uky.edu/') # # if __name__ == '__main__': # ACCESS_PROVIDER = JohnAnthony_UKY # ACCESS_DATE = datetime.date(2018, 10, 7) # ACCESS_LIC = None # data_access = DataAccess(ACCESS_PROVIDER, ACCESS_DATE, ACCESS_LIC) # rd = DataEntry('lalala', data_access, 'test') # MLAB_URI = "mongodb://qai222:caer200@ds047782.mlab.com:47782/ocelot_qai?retryWrites=false" # LCC_VM_URI = "mongodb://ocelot:caer200@10.33.28.79:27017/" # inserted = rd.mongo_insert(MLAB_URI, 'ocelot_qai') # print(inserted) #