Source code for ocelot.curator.Contribution

import glob
import hashlib
import inspect
import os
import sys

import chardet
import monty.json as mj
from ocelot.routines.disparser import DisParser
from ocelot.routines.disparser_functions import CifFileError
from ocelot.routines.disparser_functions import get_pmg_dict
from ocelot.routines.fileop import createdir
from ocelot.routines.fileop import movefile
from ocelot.schema.configuration import Config
from ocelot.schema.conformer import Chem
from ocelot.schema.conformer import MolConformer
from pymatgen.core.composition import CompositionError
from pymatgen.core.structure import Composition
from pymatgen.core.structure import Structure
from pymatgen.io.cif import CifBlock

from ocelot.curator.DataSchema import *


[docs]class CuratorError(Exception): pass
WMSG = 'WARNING: {}' WEAK_WMSG = 'WEAKWARNING: {}' import signal from contextlib import contextmanager
[docs]class TimeoutException(Exception): pass
[docs]@contextmanager def time_limit(seconds): # https://stackoverflow.com/questions/366682/ def signal_handler(signum, frame): raise TimeoutException("Timed out!") signal.signal(signal.SIGALRM, signal_handler) signal.alarm(seconds) try: yield finally: signal.alarm(0)
[docs]def clear_cif_from_crystalmaker(s): identifier, d = get_pmg_dict(s) if any('generated by CrystalMaker' in v for v in d.values()): k = '_atom_site_occupancy' try: lenk = len(d[k]) except KeyError: return s for i in range(lenk): d[k][i] = 1.0 loops = [[], []] for key in d.keys(): if '_symmetry_equiv_pos_as_xyz' in key: loops[0].append(key) elif '_atom_' in key: loops[1].append(key) s = CifBlock(d, loops, identifier).__str__() return s
[docs]def readfile(file: str): with open(file, 'rb') as f: raw = f.read() raw_encoding = chardet.detect(raw)['encoding'] string = raw.decode(raw_encoding) return string
[docs]def trim_diffraction(cifstring: str, start_trim="_atom_site_aniso_label"): lines = cifstring.splitlines() if len(lines) < 22: with open('dumm', 'w') as f: f.write(cifstring) raise CifFileError('cif string too short!') itrim = len(lines) for i in range(22, len(lines)): if "loop_" in lines[i - 1] and start_trim in lines[i]: itrim = i - 1 break new_lines = [lines[i] for i in range(itrim)] return '\n'.join(new_lines)
[docs]def read_and_trim(filename: str): try: s = readfile(filename) return trim_diffraction(s) except: raise CifFileError('read and trim failed for {}'.format(filename))
[docs]def get_property_md5(filename): hashobj = hashlib.md5() file_object = open(filename, 'rb') hashobj.update(file_object.read()) hashed = hashobj.digest() file_object.close() return hashed
[docs]def get_property_lattice(filename): s = read_and_trim(filename) s = clear_cif_from_crystalmaker(s) structure = Structure.from_str(s, 'cif') return structure.lattice
[docs]def get_property_header(filename): s = read_and_trim(filename) s = clear_cif_from_crystalmaker(s) identifier, d = get_pmg_dict(s) return identifier
[docs]def get_property_basename(filename): return os.path.basename(filename)
[docs]def get_unique_files(absolute_path_to_files, unique_property='md5'): if unique_property == 'md5': get_property = get_property_md5 elif unique_property == 'lattice': get_property = get_property_lattice elif unique_property == 'cifheader': get_property = get_property_header elif unique_property == 'basename': get_property = get_property_basename else: raise NotImplementedError('not implemented property: {}'.format(unique_property)) absolute_path_to_files = sorted(absolute_path_to_files) path_to_property = OrderedDict() property_exceptions = [] for full_path in absolute_path_to_files: try: file_property = get_property(full_path) path_to_property[full_path] = file_property except: property_exceptions.append(full_path) continue property_to_path = OrderedDict() for k, v in path_to_property.items(): try: property_to_path[v].append(k) except KeyError: property_to_path[v] = [k] unique_path_to_dup_paths = {} unique_path_to_property = OrderedDict() unique_paths = [] for prop in property_to_path.keys(): paths = property_to_path[prop] upath = paths[0] if len(paths) > 1: unique_path_to_dup_paths[paths[0]] = paths unique_path_to_property[upath] = prop unique_paths.append(upath) return unique_paths, unique_path_to_property, unique_path_to_dup_paths, property_exceptions
[docs]def curate_cifstring(cifstring): """ :return integrity_class 3 can run all calculations 2 can only run geometric analyses 1 do not run any calculations (exception) outputs files written by this function structural_schemas structural_schemas['configuration']: the major config structural_schemas['molgraphs']: a list of unique molgraphs from the major config, rsorted by # of atoms structural_schemas['molsmiles']: a list of unique molecular smiles from the major config, rsorted by # of atoms identifier hashconfig(major_config) """ integrity_class = 3 outputs = [] structural_schemas = OrderedDict() dp = DisParser(cifstring) try: print('--- begin DisParser.to_configs ---') dis_pstructure, dis_unwrap_str, dis_mols, config_infos = dp.to_configs(write_files=True, vanilla=True) # if True writes conf_x.cif, configs is a list of pmg Structure except: emsg = 'ERROR: dp.to_configs failed!' print(emsg) raise CuratorError(emsg) if len(config_infos) not in [1, 2]: emsg = 'ERROR: vanilla to_configs found too many configs' print(emsg) raise CuratorError(emsg) disorder_class = dp.classification print('disorder class: {}'.format(disorder_class)) print('--- end DisParser.to_configs ---\n') print('--- begin composition check ---') try: cif_comp = Composition(dp.cifdata['_chemical_formula_sum']) print('_chemical_formula_sum: {}'.format(cif_comp)) except (KeyError, CompositionError) as e: cif_comp = None print(WEAK_WMSG.format('_chemical_formula_sum does not exist or cannot be parsed')) try: comp_str = dp.cifdata['_chemical_formula_moiety'] moiety_comps = [Composition(s) for s in comp_str.split(',')] moiety_comps = sorted(moiety_comps, key=lambda x: len(x), reverse=True) print('_chemical_formula_moiety:') for moiety_comp in moiety_comps: print('-- {}'.format(moiety_comp)) except (KeyError, CompositionError) as e: print(WEAK_WMSG.format('_chemical_formula_moiety does not exist or cannot be parsed')) major_config_structure, major_occu = config_infos[0] print('major config comps: {}'.format(major_config_structure.composition)) print('major config occu: {}'.format(major_occu)) if isinstance(cif_comp, Composition): if not major_config_structure.composition == cif_comp: print(WEAK_WMSG.format('major comps does not match _chemical_formula_sum')) try: minor_config_structure, minor_occu = config_infos[1] print('minor config comps: {}'.format(minor_config_structure.composition)) print('minor config occu: {}'.format(minor_occu)) major_minor_comps_match = minor_config_structure.composition == major_config_structure if major_minor_comps_match: print(WEAK_WMSG.format('minor and major comps do not match')) except IndexError: pass print('--- end composition check ---\n') print('--- begin major config check ---') try: major_config = Config.from_labeled_clean_pstructure(major_config_structure, major_occu) except: emsg = 'ERROR: cannot parse major config structure into a config!' print(emsg) raise CuratorError(emsg) structural_schemas['configuration'] = major_config.as_dict() major_config_cif = 'curated_major_config.cif' major_config.pstructure.to('cif', major_config_cif) outputs.append('{}/{}'.format(os.getcwd(), major_config_cif)) if not major_config.molconformers_all_legit(): emsg = 'ERROR: cannot convert all molconformers to rdkit mol' print(emsg) raise CuratorError(emsg) print('major config moiety comps:') mc: MolConformer max_nradicals = 0 imc = 0 for mc in major_config.molconformers: print(' -- imc {}: {}'.format(imc, mc.composition)) try: rdmol, smiles, _, _ = mc.to_rdmol(charged_fragments=False) Chem.SanitizeMol(rdmol) except: print(WMSG.format('rdmol sanitize failed, integrity_class is set to 2')) if integrity_class > 2: integrity_class = 2 nradicals = mc.is_missing_hydrogen() print(' missing hydrogen: {}'.format(nradicals)) mc_xyzfils = 'curated_molconformer_{}_{}.xyz'.format(imc, nradicals) mc.to('xyz', mc_xyzfils) outputs.append(mc_xyzfils) if nradicals > max_nradicals: max_nradicals = nradicals imc += 1 molsmiles = [] for mc in sorted(major_config.molconformers, key=lambda x: len(x), reverse=True): # imol == imc, imol is not assigned based on len(mc), so use another for loop to do this molsmiles.append(mc.smiles) structural_schemas['molsmiles'] = list(set(molsmiles)) if max_nradicals: print(WMSG.format('major config missing hydrogen, integrity_class is set to 2')) if integrity_class > 2: integrity_class = 2 unique_molgraphs = major_config.molgraph_set() structural_schemas['molgraphs'] = [umg.as_dict() for umg in sorted(list(unique_molgraphs), key=lambda x: len(x))] if len(unique_molgraphs) > 1: emsg = 'ERROR: more than one unique molecule in the major config!' print(emsg) raise CuratorError(emsg) print('--- end major config check ---\n') print('integrity_class: {}'.format(integrity_class)) return integrity_class, outputs, structural_schemas, major_config.hashconfig()
[docs]class Contribution:
[docs] def __init__(self, data_access, folder_path): self.data_access = data_access self.folder_path = folder_path
[docs] def collect_rawdata(self, collect_folder, get_extra_property_from_path=None): """ RawData = content (cif_string no diffraction) + data_access + _id (data_access & cif_header) :param collect_folder: a folder to store cif files without diffraction info :param get_extra_property_from_path: the path here means the absolute path of a file in self.folder_path :return: collected_filepaths: absolute path raw_data_list: a list of RawData """ filepaths = sorted(glob.glob('{}/**/*.cif'.format(self.folder_path), recursive=True)) print('the path to this contribution is: {}'.format(self.folder_path)) print('# of cif files: {}'.format(len(filepaths))) for cif_property in ['basename', 'md5', 'lattice', 'cifheader']: print('---- checking property: {}'.format(cif_property)) unique_paths, unique_path_to_property, unique_path_to_dup_paths, property_exceptions = get_unique_files( filepaths, cif_property) print('# of cif files with duplicate: {}'.format(len(unique_path_to_dup_paths.keys()))) for unique_path in unique_path_to_dup_paths.keys(): print(unique_path) for dup in unique_path_to_dup_paths[unique_path]: print('-- {}'.format(dup)) print('# of cif files with exceptions: {}'.format(len(property_exceptions))) for exception_path in property_exceptions: print(exception_path) print('# of cif files unique: {}'.format(len(unique_paths))) filepaths = unique_paths print('cif without diffraction info will be written to: {}'.format(collect_folder)) print('please note exceptions will not be written') createdir(collect_folder) collected_filepaths = [] def get_property(abspath): d = OrderedDict({'basename': os.path.basename(abspath)}) if get_extra_property_from_path is None: return d else: exprops = get_extra_property_from_path(abspath) for k in exprops.keys(): d[k] = exprops[k] return d raw_data_list = [] jsonfiles = [] for abspath in unique_paths: s = read_and_trim(abspath) collected_filepath = '{}/{}'.format(collect_folder, os.path.basename(abspath)) with open(collected_filepath, 'w') as f: f.write(s) collected_filepaths.append(collected_filepath) raw_data_id = '{}--{}'.format(self.data_access, get_property_header(abspath)) raw_data = RawData(s, self.data_access, raw_data_id, get_property(abspath)) rawdatajson_filepath = '{}/{}.json'.format(collect_folder, os.path.basename(abspath).strip('.cif')) raw_data.to_jsonfile(rawdatajson_filepath) raw_data_list.append(raw_data) jsonfiles.append(rawdatajson_filepath) return collected_filepaths, raw_data_list, jsonfiles
[docs] def collect_with_log(self, log_folder, collect_folder, get_property_from_path=None): log_file = '{}/collect_{}.log'.format(log_folder, self.data_access) f = open(log_file, 'w') sys.stdout = f collected_filepaths, raw_data_list, jsonfilepaths = self.collect_rawdata(collect_folder, get_property_from_path) sys.stdout = sys.__stdout__ return collected_filepaths, raw_data_list, jsonfilepaths
[docs] def curate_one(self, rawdata: RawData, wdir): """ CuratedData = content(structural_schemas) + data_access( RawData.data_access) + _id(hashconfig or 'failed RawData._id') + properties: {integrity_class: x, rawdataid: RawData._id, method: ...} """ stdout_original = sys.stdout logfile = wdir + '/curate.log' jsonfile = wdir + '/{}.json'.format(rawdata.data_properties['basename'].strip('.cif')) prev_run = False if os.path.isfile(logfile) and os.path.isfile(jsonfile): mtime = os.path.getmtime(logfile) movefile(logfile, wdir + '/curate-{}.log'.format(mtime)) prev_run = True whereami = os.getcwd() os.chdir(wdir) f = open(logfile, 'w') sys.stdout = f if prev_run: print('found previous log file and json file, try to read json') try: curate_data = CuratedData.from_jsonfile(jsonfile) print('done!') return curate_data except: print('failed, rerun curate_one') pass cifstring = rawdata.data_content rawcif = open('raw.cif', 'w') rawcif.write(cifstring) rawcif.close() props = OrderedDict() try: integrity_class, outputs, structural_schemas, identifier = curate_cifstring(cifstring) except CuratorError: integrity_class, outputs, structural_schemas, identifier = [1, None, None, None] identifier = 'FAILED:{}'.format(rawdata._id) props['integrity_class'] = integrity_class props['rawdataid'] = rawdata._id props['method'] = '@method: {}'.format(inspect.stack()[0][3]) props['wdir'] = wdir curated_data = CuratedData( data_content=structural_schemas, data_access=rawdata.data_access, _id=identifier, data_properties=props ) with open(jsonfile, 'w') as cdata_json: json.dump(curated_data.as_dict(), cdata_json, cls=mj.MontyEncoder) f.close() sys.stdout = stdout_original os.chdir(whereami) return curated_data
[docs] def curate_all(self, logfolder, rawdata_list: [RawData], wdir_for_curate, unittimelimit=60): log_file = '{}/curate_{}.log'.format(logfolder, self.data_access) f = open(log_file, 'w') sys.stdout = f print('curate all at: {}'.format(wdir_for_curate)) createdir(wdir_for_curate) rawdata: RawData curated_data_list = [] classes = OrderedDict({1: [], 2: [], 3: []}) timeout_entries = [] for rawdata in rawdata_list: print('WORKING ON: {}'.format(rawdata._id)) this_wdir = '{}/{}'.format(wdir_for_curate, rawdata.data_properties['basename'].strip('.cif')) createdir(this_wdir) print('- in folder: {}'.format(this_wdir)) try: with time_limit(unittimelimit): curated_data = self.curate_one(rawdata, this_wdir) except TimeoutException as e: timeout_entries.append(this_wdir) continue curated_data_list.append(curated_data) integrity_class = curated_data.data_properties['integrity_class'] print('- integrity_class: {}'.format(integrity_class)) classes[integrity_class].append(curated_data) for integrity_class in classes.keys(): headermsg = '### integrity class: {} ###'.format(integrity_class) print('#' * len(headermsg)) print(headermsg) print('#' * len(headermsg)) for cdata in classes[integrity_class]: print(cdata.data_properties['wdir'] + '/curated_major_config.cif') print('\n') print('### timeout wdirs ###') for timeout_entry in timeout_entries: print(timeout_entry) f.close() sys.stdout = sys.__stdout__ return curated_data_list, classes