Source code for ocelot.schema.graph

import itertools
import warnings
import hashlib
from collections import OrderedDict
from operator import eq
from ocelot.routines.fileop import stringkey, intkey
import matplotlib.pyplot as plt
import networkx as nx
import networkx.algorithms.isomorphism as iso
import numpy as np
from networkx import Graph
from pymatgen.core.periodic_table import Element
from pymatgen.vis.structure_vtk import EL_COLORS
from rdkit import Chem

from ocelot.routines.conformerparser import ACParser
from ocelot.schema.rdfunc import RdFunc

"""
a graph of integer nodes with a "symbol" attribute
all edges are defined by nodes, no attribute

molgraph is a molecule with single bonds only
only node-induced subgraph in substructure search

If G’=(N’,E’) is a node-induced subgraph, then:
N’ is a subset of N E’ is the subset of edges in E relating nodes in N’
"""


[docs]def to_fracrgb(t): return [x / 255 for x in t]
[docs]class GraphError(Exception): pass
[docs]def hashstr2int(s, hashmethod = hashlib.sha256): return int(hashmethod(s.encode('utf-8')).hexdigest(), 16) % 10**8
[docs]class BasicGraph:
[docs] def __init__(self, graph: nx.Graph): self.graph = graph
# self.rings = nx.minimum_cycle_basis(self.graph) # technically sssr # self.rings = sorted(self.rings, key=lambda x: len(x)) # self.nrings = len(self.rings) def __len__(self): return len(self.graph.nodes) # TODO find a better way to hash a graph schema def __hash__(self): return self.hash_nxgraph(self.graph) # return self.hash_via_smiles(self) @property def props_for_hash(self): """ see https://stackoverflow.com/questions/46999771/ use with caution... """ t = nx.triangles(self.graph) c = nx.number_of_cliques(self.graph) ele = nx.get_node_attributes(self.graph, 'symbol') dv = self.graph.degree props = [(dv[v], t[v], c[v], ele[v]) for v in self.graph] props.sort() return props
[docs] @staticmethod def hash_via_smiles(mg): smiles, rdmol, _, _ = mg.to_rdmol() # return hash("{}: {}".format(mg.__class__.__name__, smiles)) return hashstr2int(smiles)
[docs] @staticmethod def hash_nxgraph(g: nx.Graph): t = nx.triangles(g) c = nx.number_of_cliques(g) ele = nx.get_node_attributes(g, 'symbol') dv = g.degree props = [(dv[v], t[v], c[v], ele[v]) for v in g] props.sort() s = str(props) return hashstr2int(s)
# return hash(tuple(props)) # pnGraph=pn.Graph(g.number_of_nodes()); # edg=list(g.edges); # for E in edg: # pnGraph.connect_vertex(E[0],E[1]); # return hash(pn.certificate(pnGraph)); def __repr__(self): outs = ["{}:".format(self.__class__.__name__)] for n in self.graph.nodes: outs.append('{} {}'.format(n, self.symbols[n])) return "; ".join(outs) def __eq__(self, other): """ equality is measured based on graph isomorphism if smiles hash failed """ try: return hash(self) == hash(other) except: g1 = self.graph g2 = other.graph return nx.is_isomorphic(g1, g2, node_match=iso.generic_node_match('symbol', None, eq))
[docs] def graph_similarity(self, other): """ calculate graph similarity (GED) between two MolGraphs :param other: :return: """ return nx.algorithms.graph_edit_distance(self.graph, other.graph, node_match=iso.generic_node_match('symbol', None, eq))
[docs] def is_subgraph(self, other): """ True if other has a node-induced subgraph is isomorphic to self """ gm = iso.GraphMatcher(other.graph, self.graph, node_match=iso.generic_node_match('symbol', None, eq)) return gm.subgraph_is_isomorphic()
@property def symbols(self): """ a dict s.t. symbols[node] gives symbol """ return nx.get_node_attributes(self.graph, 'symbol')
[docs] @classmethod def from_smiles(cls, smiles:str): m = Chem.MolFromSmiles(smiles) if 'H' in smiles or 'h' in smiles: hm = m else: hm = Chem.AddHs(m) return cls.from_rdmol(hm)
[docs] @classmethod def from_rdmol(cls, rdmol: Chem.Mol, atomidx2nodename=None): """ :param rdmol: :param atomidx2nodename: the dict to convert atomidx in rdmol to graph node, atomidx2nodename[rdmolid] == nodename if not None then nodename will be set just based on atomidx :return: """ g = nx.Graph() if atomidx2nodename: index_dict_no = atomidx2nodename for atom in rdmol.GetAtoms(): g.add_node(index_dict_no[atom.GetIdx()], symbol=atom.GetSymbol(), ) for bond in rdmol.GetBonds(): g.add_edge( index_dict_no[bond.GetBeginAtomIdx()], index_dict_no[bond.GetEndAtomIdx()], ) else: for atom in rdmol.GetAtoms(): g.add_node(atom.GetIdx(), symbol=atom.GetSymbol(), ) for bond in rdmol.GetBonds(): g.add_edge( bond.GetBeginAtomIdx(), bond.GetEndAtomIdx(), ) if not nx.is_connected(g): raise GraphError('the graph is not connected!') return cls(g)
[docs] @classmethod def from_dict(cls, d: dict): g = nx.from_dict_of_lists(intkey(d['graph_dict_of_lists'])) nx.set_node_attributes(g, intkey(d['graph_node_symbols']), name='symbol') return cls(g)
[docs] def to_rdmol(self, sani=True, charge=0, charged_fragments=None, force_single=False, expliciths=True): """ convert to rdmol, it will first try to convert to a radical, if failed b/c of rdkit valence rules, it will try to convert to charged fragment :param sani: switch to call default sanitizer of rdkit :param int charge: molecule charge :param bool charged_fragments: switch to assign atomic charge :param bool force_single: switch to force all single bonds in the resulting molecule :param bool expliciths: add hydrogen explicitly :return: a rdmol object, canonical smiles, atomidx2nodename, nodename2atomidx """ # I cannot use smilestomol as I need to know how atom ids are mapped nodes = sorted(list(self.graph.nodes(data=True)), key=lambda x: x[0]) atomidx2nodename = {} # d[atom idx in the new rdmol] = original graph node nodename2atomidx = {} atom_number_list = [] for i in range(len(nodes)): graph_node = nodes[i][0] # this could be different form i! symbol = nodes[i][1]['symbol'] z = Element(symbol).Z atom_number_list.append(z) atomidx2nodename[i] = graph_node nodename2atomidx[graph_node] = i adj = nx.convert.to_dict_of_dicts(self.graph) new_ac = np.zeros((len(nodes), len(nodes))).astype(int) # atomic connectivity for i in range(len(nodes)): for j in range(i + 1, len(nodes)): if atomidx2nodename[j] in adj[atomidx2nodename[i]].keys(): new_ac[i, j] = 1 new_ac[j, i] = 1 apriori_radicals = {} if hasattr(self, 'joints'): # LBYL for k in self.joints.keys(): apriori_radicals[nodename2atomidx[k]] = len(self.joints[k]) else: apriori_radicals = None ap = ACParser(sani=sani, ac=new_ac, atomnumberlist=atom_number_list, charge=charge, apriori_radicals=apriori_radicals) if charged_fragments is None: try: mol, smiles = ap.parse(charged_fragments=False, force_single=force_single, expliciths=expliciths) except Chem.rdchem.AtomValenceException: warnings.warn('AP parser cannot use radical scheme, trying to use charged frag') mol, smiles = ap.parse(charged_fragments=True, force_single=force_single, expliciths=expliciths) else: mol, smiles = ap.parse(charged_fragments=charged_fragments, force_single=force_single, expliciths=expliciths) return mol, smiles, atomidx2nodename, nodename2atomidx
[docs] def as_dict(self): d = OrderedDict() d['@module'] = self.__class__.__module__ d['@class'] = self.__class__.__name__ d['graph_node_symbols'] = stringkey(self.symbols) d['graph_dict_of_lists'] = stringkey(nx.to_dict_of_lists(self.graph)) # d['rings'] = self.rings # d['nrings'] = self.nrings return d
[docs] def draw(self, output: str, dpi=600): """ draw graph, using jmol color scheme for elements :param output: :param dpi: :return: """ draw_chemgraph(self.graph, output, dpi)
[docs]def draw_chemgraph(graph: nx.Graph, output: str, dpi=600): f = plt.figure() symbols = nx.get_node_attributes(graph, 'symbol') colors = [to_fracrgb(EL_COLORS['Jmol'][symbols[n]]) for n in graph] nx.draw(graph, with_labels=True, labels=symbols, node_color=colors) f.savefig(output, dpi=dpi)
[docs]class FragmentGraph(BasicGraph):
[docs] def __init__(self, graph: nx.Graph, joints: dict, partition_scheme: str): """ fragment is a BasicGraph with joints I feel it's not necessary to add parent BasicGraph as an attribute, but I maybe wrong :param graph: :param joints: a dict, keys are nodes in the subgraph that have parent graph edges not in this subgraph """ super().__init__(graph) self.partition_scheme = partition_scheme self.graph = graph self.joints = joints self.rings = nx.minimum_cycle_basis(self.graph) # technically sssr self.rings = sorted(self.rings, key=lambda x: len(x))
# we don't need ring graph in frag, that should be used in partition process # self.rings = nx.minimum_cycle_basis(self.graph) # self.rings = sorted(self.rings, key=lambda x: len(x)) # self.nrings = len(self.rings)
[docs] def as_dict(self): d = super().as_dict() d['joints'] = stringkey(self.joints) d['partition_scheme'] = self.partition_scheme return d
[docs] @classmethod def from_dict(cls, d: dict): g = nx.from_dict_of_lists(intkey(d['graph_dict_of_lists'])) nx.set_node_attributes(g, intkey(d['graph_node_symbols']), name='symbol') return cls(g, intkey(d['joints']), d['partition_scheme'])
[docs]class BackboneGraph(FragmentGraph): # is this necessary?
[docs] def __init__(self, graph, joints, partition_scheme='lgcr'): super().__init__(graph, joints, partition_scheme)
# self.ringfp = set([len(r) for r in self.rings]) # def as_dict(self): # d = super().as_dict() # # d['ringfp'] = self.ringfp # return d
[docs]class SidechainGraph(FragmentGraph):
[docs] def __init__(self, graph, joints, partition_scheme): """ it is not possible for a side chain to have two frag-joints connected to one bone-joint, as this should be already in backbone side chain (frags) cannot have 2 side joints, each connected to one bone-joint as this would be a ring fused to backbone (side chain is a connected component) :var self.rankmap: this gives the rank for each node, rank is the shortest path length from the node to sc_joint """ super().__init__(graph, joints, partition_scheme) if len(self.joints.keys()) > 1: # in general, sc from chrom partition should not be used # warnings.warn('sidechain has more than one side joint... this could happen when using chrom partition') raise GraphError('sidechain has more than one side joint... impossible!') if len(self.joints.keys()) == 0: raise GraphError('sidechain has no side joint, the molceule may not be a connected graph') sc_joint = list(self.joints.keys())[0] self.rankmap = {} for node in self.graph: rank = nx.shortest_path_length(self.graph, source=node, target=sc_joint) self.rankmap[node] = rank
# self.graph.nodes[node]['rank'] = rank # if self.nrings > 0: # self.hasring = True # else: # self.hasring = False
[docs] def as_dict(self): d = super().as_dict() # d['hasring'] = self.hasring d['rankmap'] = stringkey(self.rankmap) return d
[docs]class MolGraph(BasicGraph):
[docs] def __init__(self, graph): """ this is the schema for molecule with fused rings :param graph: """ super().__init__(graph) self.rings = nx.minimum_cycle_basis(self.graph) # technically sssr self.rings = sorted(self.rings, key=lambda x: len(x)) self.nrings = len(self.rings) if self.nrings < 1: warnings.warn('you are init a MolGraph with no ring!') self.ring_graph = nx.Graph() for ij in itertools.combinations(range(self.nrings), 2): i, j = ij ri = self.rings[i] rj = self.rings[j] shared_nodes = set(ri) & set(rj) ni_connected_to_rj = [ni for ni in ri if len(set(self.graph.neighbors(ni)) & set(rj)) == 1 and ni not in rj] # we set len(shared_nodes) > 0 to avoid breaking spiro bicycle if len(shared_nodes) > 0 or len(ni_connected_to_rj) > 0: self.ring_graph.add_edge(i, j, nshare=len(shared_nodes), nconnect=len(ni_connected_to_rj)) self.connected_rings = self.get_rings('nconnect', 1) # for rubrene len(self.connected_rings[0]) == 8 self.fused_rings = self.get_rings('nshare', 2) # for rubrene len(self.fused_rings[0]) == 4 # notice if two rings share2 then they must connect1 try: self.lgcr = self.connected_rings[0] except IndexError: try: self.lgcr = [self.rings[0]] except IndexError: raise GraphError('no rings in the MolGraph!') try: self.lgfr = self.fused_rings[0] except IndexError: try: self.lgfr = [self.rings[0]] except IndexError: raise GraphError('no rings in the MolGraph!')
[docs] @classmethod def from_basicgraph(cls, basicgraph: BasicGraph): return cls(basicgraph.graph)
[docs] def get_rings(self, k='nconnect', threshold=1): """ get connected rings between which # of edges is larger than a threshold :param k: :param threshold: :return: """ edges = [(u, v) for (u, v, d) in self.ring_graph.edges(data=True) if d[k] >= threshold] subgraph = self.ring_graph.edge_subgraph(edges).copy() return [[self.rings[iring] for iring in c] for c in sorted(nx.connected_components(subgraph), key=len, reverse=True)]
[docs] @staticmethod def get_joints_and_subgraph(subg1_nodes: [int], g: Graph): """ we assume subg1 \cap subg2 == empty and subg1 + subg2 == g """ sg1 = nx.Graph() sg2 = nx.Graph() # this is the graph of anything other than bone nodes symbols = nx.get_node_attributes(g, 'symbol') for node in g: s = symbols[node] if node in subg1_nodes: sg1.add_node(node, symbol=s) else: sg2.add_node(node, symbol=s) joints_subg1_as_keys = {} joints_subg2_as_keys = {} for n, nbrs in g.adj.items(): for nbr, d in nbrs.items(): if n in subg1_nodes and nbr in subg1_nodes: sg1.add_edge(n, nbr, **d) elif n not in subg1_nodes and nbr not in subg1_nodes: sg2.add_edge(n, nbr, **d) elif n in subg1_nodes and nbr not in subg1_nodes: if n not in joints_subg1_as_keys.keys(): joints_subg1_as_keys[n] = [nbr] else: joints_subg1_as_keys[n].append(nbr) elif n not in subg1_nodes and nbr in subg1_nodes: if n not in joints_subg2_as_keys.keys(): joints_subg2_as_keys[n] = [nbr] else: joints_subg2_as_keys[n].append(nbr) sg1.graph['joints'] = joints_subg1_as_keys sg2_components = [] for c in sorted(nx.connected_components(sg2), key=len, reverse=True): nodes = list(c) joints_in_frag = {} for j in joints_subg2_as_keys.keys(): if j in nodes: joints_in_frag[j] = joints_subg2_as_keys[j] frag_graph = nx.Graph(joints=joints_in_frag) # joints_in_frag[frag_node] is a list of bone joints frag_graph.add_nodes_from((n, g.nodes[n]) for n in nodes) frag_graph.add_edges_from( (n, nbr, d) for n, nbrs in g.adj.items() if n in nodes for nbr, d in nbrs.items() if nbr in nodes) sg2_components.append(frag_graph) return joints_subg1_as_keys, joints_subg2_as_keys, sg1, sg2_components
[docs] @staticmethod def get_bone_and_frags_from_nxgraph(bone_graph: nx.Graph, fragments: [nx.Graph], scheme): gb = BackboneGraph(bone_graph, bone_graph.graph['joints'], partition_scheme=scheme) scs = [SidechainGraph(frag_graph, frag_graph.graph['joints'], partition_scheme=scheme) for frag_graph in fragments] return gb, scs
[docs] @staticmethod def get_chrom_and_frags_from_nxgraph(chrom_graph, fragments): cg = FragmentGraph(chrom_graph, chrom_graph.graph['joints'], 'chrom') fgs = [FragmentGraph(frag_graph, frag_graph.graph['joints'], 'chrom') for frag_graph in fragments] return cg, fgs
[docs] def partition(self, bone_selection='lgfr', additional_ring_criteria=None, with_halogen=True): """ parition the molecule into a backbone graph and a list of fragments (graphs) :param with_halogen: :param bone_selection: :param additional_ring_criteria: this should be a function to check whether a ring (a set of siteids) meets additional conditions # this does nothing for chrom scheme :return: """ if bone_selection == 'lgfr': rings = self.lgfr elif bone_selection == 'lgcr': rings = self.lgcr elif bone_selection == 'chrom': try: mol, smiles, atomidx2nodename, nodename2atomidx = self.to_rdmol() except: raise GraphError('to_rdmol failed in chrom partition!') if with_halogen: cgs = RdFunc.get_conjugate_group_with_halogen(mol) else: cgs = RdFunc.get_conjugate_group(mol) try: chromol, aid_to_newid_chromol = cgs[0] except IndexError: raise GraphError('rdkit cannot find a chrom here!') aids_in_chromol = list(aid_to_newid_chromol.keys()) nodes_in_chromol = [atomidx2nodename[aid] for aid in aids_in_chromol] nodes_not_in_chromol = [sid for sid in self.graph if sid not in nodes_in_chromol] chromol_joints, other_joints, chromolsg, sg_components = MolGraph.get_joints_and_subgraph( nodes_in_chromol, self.graph) return chromolsg, sg_components # else: raise GraphError('selection scheme not implemented in graph partition: {}'.format(bone_selection)) if additional_ring_criteria is None: bonerings = rings else: bonerings = [] for r in rings: if additional_ring_criteria(r): bonerings.append(r) backbone_nodes = set([item for sublist in bonerings for item in sublist]) joints_bone_as_keys, joints_frag_as_keys, bone_graph, fragments = self.get_joints_and_subgraph(backbone_nodes, self.graph) return bone_graph, fragments
[docs] def partition_to_bone_frags(self, bone_selection='lgfr', additional_criteria=None): """ partition the molecule into backbone and a list of frags the list of frags is reversely sorted by graph size :param additional_criteria: :param bone_selection: :return: """ bone_graph, fragments = self.partition(bone_selection, additional_criteria) gb, scs = self.get_bone_and_frags_from_nxgraph(bone_graph, fragments, scheme=bone_selection) # gb = BackboneGraph(bone_graph, bone_graph.graph['joints'], partition_scheme=bone_selection) # scs = [SidechainGraph(frag_graph, frag_graph.graph['joints'], partition_scheme=bone_selection) for frag_graph in # fragments] # scs = sorted(scs, key=lambda x: len(x.graph), reverse=True) return gb, scs