import glob
import hashlib
import inspect
import os
import sys
import chardet
import monty.json as mj
from ocelot.routines.disparser import DisParser
from ocelot.routines.disparser_functions import CifFileError
from ocelot.routines.disparser_functions import get_pmg_dict
from ocelot.routines.fileop import createdir
from ocelot.routines.fileop import movefile
from ocelot.schema.configuration import Config
from ocelot.schema.conformer import Chem
from ocelot.schema.conformer import MolConformer
from pymatgen.core.composition import CompositionError
from pymatgen.core.structure import Composition
from pymatgen.core.structure import Structure
from pymatgen.io.cif import CifBlock
from ocelot.curator.DataSchema import *
[docs]class CuratorError(Exception): pass
WMSG = 'WARNING: {}'
WEAK_WMSG = 'WEAKWARNING: {}'
import signal
from contextlib import contextmanager
[docs]class TimeoutException(Exception): pass
[docs]@contextmanager
def time_limit(seconds):
# https://stackoverflow.com/questions/366682/
def signal_handler(signum, frame):
raise TimeoutException("Timed out!")
signal.signal(signal.SIGALRM, signal_handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)
[docs]def clear_cif_from_crystalmaker(s):
identifier, d = get_pmg_dict(s)
if any('generated by CrystalMaker' in v for v in d.values()):
k = '_atom_site_occupancy'
try:
lenk = len(d[k])
except KeyError:
return s
for i in range(lenk):
d[k][i] = 1.0
loops = [[], []]
for key in d.keys():
if '_symmetry_equiv_pos_as_xyz' in key:
loops[0].append(key)
elif '_atom_' in key:
loops[1].append(key)
s = CifBlock(d, loops, identifier).__str__()
return s
[docs]def readfile(file: str):
with open(file, 'rb') as f:
raw = f.read()
raw_encoding = chardet.detect(raw)['encoding']
string = raw.decode(raw_encoding)
return string
[docs]def trim_diffraction(cifstring: str, start_trim="_atom_site_aniso_label"):
lines = cifstring.splitlines()
if len(lines) < 22:
with open('dumm', 'w') as f:
f.write(cifstring)
raise CifFileError('cif string too short!')
itrim = len(lines)
for i in range(22, len(lines)):
if "loop_" in lines[i - 1] and start_trim in lines[i]:
itrim = i - 1
break
new_lines = [lines[i] for i in range(itrim)]
return '\n'.join(new_lines)
[docs]def read_and_trim(filename: str):
try:
s = readfile(filename)
return trim_diffraction(s)
except:
raise CifFileError('read and trim failed for {}'.format(filename))
[docs]def get_property_md5(filename):
hashobj = hashlib.md5()
file_object = open(filename, 'rb')
hashobj.update(file_object.read())
hashed = hashobj.digest()
file_object.close()
return hashed
[docs]def get_property_lattice(filename):
s = read_and_trim(filename)
s = clear_cif_from_crystalmaker(s)
structure = Structure.from_str(s, 'cif')
return structure.lattice
[docs]def get_property_basename(filename):
return os.path.basename(filename)
[docs]def get_unique_files(absolute_path_to_files, unique_property='md5'):
if unique_property == 'md5':
get_property = get_property_md5
elif unique_property == 'lattice':
get_property = get_property_lattice
elif unique_property == 'cifheader':
get_property = get_property_header
elif unique_property == 'basename':
get_property = get_property_basename
else:
raise NotImplementedError('not implemented property: {}'.format(unique_property))
absolute_path_to_files = sorted(absolute_path_to_files)
path_to_property = OrderedDict()
property_exceptions = []
for full_path in absolute_path_to_files:
try:
file_property = get_property(full_path)
path_to_property[full_path] = file_property
except:
property_exceptions.append(full_path)
continue
property_to_path = OrderedDict()
for k, v in path_to_property.items():
try:
property_to_path[v].append(k)
except KeyError:
property_to_path[v] = [k]
unique_path_to_dup_paths = {}
unique_path_to_property = OrderedDict()
unique_paths = []
for prop in property_to_path.keys():
paths = property_to_path[prop]
upath = paths[0]
if len(paths) > 1:
unique_path_to_dup_paths[paths[0]] = paths
unique_path_to_property[upath] = prop
unique_paths.append(upath)
return unique_paths, unique_path_to_property, unique_path_to_dup_paths, property_exceptions
[docs]def curate_cifstring(cifstring):
"""
:return
integrity_class
3 can run all calculations
2 can only run geometric analyses
1 do not run any calculations (exception)
outputs
files written by this function
structural_schemas
structural_schemas['configuration']: the major config
structural_schemas['molgraphs']: a list of unique molgraphs from the major config, rsorted by # of atoms
structural_schemas['molsmiles']: a list of unique molecular smiles from the major config, rsorted by # of atoms
identifier
hashconfig(major_config)
"""
integrity_class = 3
outputs = []
structural_schemas = OrderedDict()
dp = DisParser(cifstring)
try:
print('--- begin DisParser.to_configs ---')
dis_pstructure, dis_unwrap_str, dis_mols, config_infos = dp.to_configs(write_files=True,
vanilla=True) # if True writes conf_x.cif, configs is a list of pmg Structure
except:
emsg = 'ERROR: dp.to_configs failed!'
print(emsg)
raise CuratorError(emsg)
if len(config_infos) not in [1, 2]:
emsg = 'ERROR: vanilla to_configs found too many configs'
print(emsg)
raise CuratorError(emsg)
disorder_class = dp.classification
print('disorder class: {}'.format(disorder_class))
print('--- end DisParser.to_configs ---\n')
print('--- begin composition check ---')
try:
cif_comp = Composition(dp.cifdata['_chemical_formula_sum'])
print('_chemical_formula_sum: {}'.format(cif_comp))
except (KeyError, CompositionError) as e:
cif_comp = None
print(WEAK_WMSG.format('_chemical_formula_sum does not exist or cannot be parsed'))
try:
comp_str = dp.cifdata['_chemical_formula_moiety']
moiety_comps = [Composition(s) for s in comp_str.split(',')]
moiety_comps = sorted(moiety_comps, key=lambda x: len(x), reverse=True)
print('_chemical_formula_moiety:')
for moiety_comp in moiety_comps:
print('-- {}'.format(moiety_comp))
except (KeyError, CompositionError) as e:
print(WEAK_WMSG.format('_chemical_formula_moiety does not exist or cannot be parsed'))
major_config_structure, major_occu = config_infos[0]
print('major config comps: {}'.format(major_config_structure.composition))
print('major config occu: {}'.format(major_occu))
if isinstance(cif_comp, Composition):
if not major_config_structure.composition == cif_comp:
print(WEAK_WMSG.format('major comps does not match _chemical_formula_sum'))
try:
minor_config_structure, minor_occu = config_infos[1]
print('minor config comps: {}'.format(minor_config_structure.composition))
print('minor config occu: {}'.format(minor_occu))
major_minor_comps_match = minor_config_structure.composition == major_config_structure
if major_minor_comps_match:
print(WEAK_WMSG.format('minor and major comps do not match'))
except IndexError:
pass
print('--- end composition check ---\n')
print('--- begin major config check ---')
try:
major_config = Config.from_labeled_clean_pstructure(major_config_structure, major_occu)
except:
emsg = 'ERROR: cannot parse major config structure into a config!'
print(emsg)
raise CuratorError(emsg)
structural_schemas['configuration'] = major_config.as_dict()
major_config_cif = 'curated_major_config.cif'
major_config.pstructure.to('cif', major_config_cif)
outputs.append('{}/{}'.format(os.getcwd(), major_config_cif))
if not major_config.molconformers_all_legit():
emsg = 'ERROR: cannot convert all molconformers to rdkit mol'
print(emsg)
raise CuratorError(emsg)
print('major config moiety comps:')
mc: MolConformer
max_nradicals = 0
imc = 0
for mc in major_config.molconformers:
print(' -- imc {}: {}'.format(imc, mc.composition))
try:
rdmol, smiles, _, _ = mc.to_rdmol(charged_fragments=False)
Chem.SanitizeMol(rdmol)
except:
print(WMSG.format('rdmol sanitize failed, integrity_class is set to 2'))
if integrity_class > 2:
integrity_class = 2
nradicals = mc.is_missing_hydrogen()
print(' missing hydrogen: {}'.format(nradicals))
mc_xyzfils = 'curated_molconformer_{}_{}.xyz'.format(imc, nradicals)
mc.to('xyz', mc_xyzfils)
outputs.append(mc_xyzfils)
if nradicals > max_nradicals:
max_nradicals = nradicals
imc += 1
molsmiles = []
for mc in sorted(major_config.molconformers, key=lambda x: len(x), reverse=True):
# imol == imc, imol is not assigned based on len(mc), so use another for loop to do this
molsmiles.append(mc.smiles)
structural_schemas['molsmiles'] = list(set(molsmiles))
if max_nradicals:
print(WMSG.format('major config missing hydrogen, integrity_class is set to 2'))
if integrity_class > 2:
integrity_class = 2
unique_molgraphs = major_config.molgraph_set()
structural_schemas['molgraphs'] = [umg.as_dict() for umg in sorted(list(unique_molgraphs), key=lambda x: len(x))]
if len(unique_molgraphs) > 1:
emsg = 'ERROR: more than one unique molecule in the major config!'
print(emsg)
raise CuratorError(emsg)
print('--- end major config check ---\n')
print('integrity_class: {}'.format(integrity_class))
return integrity_class, outputs, structural_schemas, major_config.hashconfig()
[docs]class Contribution:
[docs] def __init__(self, data_access, folder_path):
self.data_access = data_access
self.folder_path = folder_path
[docs] def collect_rawdata(self, collect_folder, get_extra_property_from_path=None):
"""
RawData
= content (cif_string no diffraction)
+ data_access
+ _id (data_access & cif_header)
:param collect_folder: a folder to store cif files without diffraction info
:param get_extra_property_from_path: the path here means the absolute path of a file in self.folder_path
:return:
collected_filepaths: absolute path
raw_data_list: a list of RawData
"""
filepaths = sorted(glob.glob('{}/**/*.cif'.format(self.folder_path), recursive=True))
print('the path to this contribution is: {}'.format(self.folder_path))
print('# of cif files: {}'.format(len(filepaths)))
for cif_property in ['basename', 'md5', 'lattice', 'cifheader']:
print('---- checking property: {}'.format(cif_property))
unique_paths, unique_path_to_property, unique_path_to_dup_paths, property_exceptions = get_unique_files(
filepaths, cif_property)
print('# of cif files with duplicate: {}'.format(len(unique_path_to_dup_paths.keys())))
for unique_path in unique_path_to_dup_paths.keys():
print(unique_path)
for dup in unique_path_to_dup_paths[unique_path]:
print('-- {}'.format(dup))
print('# of cif files with exceptions: {}'.format(len(property_exceptions)))
for exception_path in property_exceptions:
print(exception_path)
print('# of cif files unique: {}'.format(len(unique_paths)))
filepaths = unique_paths
print('cif without diffraction info will be written to: {}'.format(collect_folder))
print('please note exceptions will not be written')
createdir(collect_folder)
collected_filepaths = []
def get_property(abspath):
d = OrderedDict({'basename': os.path.basename(abspath)})
if get_extra_property_from_path is None:
return d
else:
exprops = get_extra_property_from_path(abspath)
for k in exprops.keys():
d[k] = exprops[k]
return d
raw_data_list = []
jsonfiles = []
for abspath in unique_paths:
s = read_and_trim(abspath)
collected_filepath = '{}/{}'.format(collect_folder, os.path.basename(abspath))
with open(collected_filepath, 'w') as f:
f.write(s)
collected_filepaths.append(collected_filepath)
raw_data_id = '{}--{}'.format(self.data_access, get_property_header(abspath))
raw_data = RawData(s, self.data_access, raw_data_id, get_property(abspath))
rawdatajson_filepath = '{}/{}.json'.format(collect_folder, os.path.basename(abspath).strip('.cif'))
raw_data.to_jsonfile(rawdatajson_filepath)
raw_data_list.append(raw_data)
jsonfiles.append(rawdatajson_filepath)
return collected_filepaths, raw_data_list, jsonfiles
[docs] def collect_with_log(self, log_folder, collect_folder, get_property_from_path=None):
log_file = '{}/collect_{}.log'.format(log_folder, self.data_access)
f = open(log_file, 'w')
sys.stdout = f
collected_filepaths, raw_data_list, jsonfilepaths = self.collect_rawdata(collect_folder, get_property_from_path)
sys.stdout = sys.__stdout__
return collected_filepaths, raw_data_list, jsonfilepaths
[docs] def curate_one(self, rawdata: RawData, wdir):
"""
CuratedData
= content(structural_schemas)
+ data_access( RawData.data_access)
+ _id(hashconfig or 'failed RawData._id')
+ properties: {integrity_class: x, rawdataid: RawData._id, method: ...}
"""
stdout_original = sys.stdout
logfile = wdir + '/curate.log'
jsonfile = wdir + '/{}.json'.format(rawdata.data_properties['basename'].strip('.cif'))
prev_run = False
if os.path.isfile(logfile) and os.path.isfile(jsonfile):
mtime = os.path.getmtime(logfile)
movefile(logfile, wdir + '/curate-{}.log'.format(mtime))
prev_run = True
whereami = os.getcwd()
os.chdir(wdir)
f = open(logfile, 'w')
sys.stdout = f
if prev_run:
print('found previous log file and json file, try to read json')
try:
curate_data = CuratedData.from_jsonfile(jsonfile)
print('done!')
return curate_data
except:
print('failed, rerun curate_one')
pass
cifstring = rawdata.data_content
rawcif = open('raw.cif', 'w')
rawcif.write(cifstring)
rawcif.close()
props = OrderedDict()
try:
integrity_class, outputs, structural_schemas, identifier = curate_cifstring(cifstring)
except CuratorError:
integrity_class, outputs, structural_schemas, identifier = [1, None, None, None]
identifier = 'FAILED:{}'.format(rawdata._id)
props['integrity_class'] = integrity_class
props['rawdataid'] = rawdata._id
props['method'] = '@method: {}'.format(inspect.stack()[0][3])
props['wdir'] = wdir
curated_data = CuratedData(
data_content=structural_schemas,
data_access=rawdata.data_access,
_id=identifier,
data_properties=props
)
with open(jsonfile, 'w') as cdata_json:
json.dump(curated_data.as_dict(), cdata_json, cls=mj.MontyEncoder)
f.close()
sys.stdout = stdout_original
os.chdir(whereami)
return curated_data
[docs] def curate_all(self, logfolder, rawdata_list: [RawData], wdir_for_curate, unittimelimit=60):
log_file = '{}/curate_{}.log'.format(logfolder, self.data_access)
f = open(log_file, 'w')
sys.stdout = f
print('curate all at: {}'.format(wdir_for_curate))
createdir(wdir_for_curate)
rawdata: RawData
curated_data_list = []
classes = OrderedDict({1: [], 2: [], 3: []})
timeout_entries = []
for rawdata in rawdata_list:
print('WORKING ON: {}'.format(rawdata._id))
this_wdir = '{}/{}'.format(wdir_for_curate, rawdata.data_properties['basename'].strip('.cif'))
createdir(this_wdir)
print('- in folder: {}'.format(this_wdir))
try:
with time_limit(unittimelimit):
curated_data = self.curate_one(rawdata, this_wdir)
except TimeoutException as e:
timeout_entries.append(this_wdir)
continue
curated_data_list.append(curated_data)
integrity_class = curated_data.data_properties['integrity_class']
print('- integrity_class: {}'.format(integrity_class))
classes[integrity_class].append(curated_data)
for integrity_class in classes.keys():
headermsg = '### integrity class: {} ###'.format(integrity_class)
print('#' * len(headermsg))
print(headermsg)
print('#' * len(headermsg))
for cdata in classes[integrity_class]:
print(cdata.data_properties['wdir'] + '/curated_major_config.cif')
print('\n')
print('### timeout wdirs ###')
for timeout_entry in timeout_entries:
print(timeout_entry)
f.close()
sys.stdout = sys.__stdout__
return curated_data_list, classes