__all__ = ["ModelComparisonGraphdata", "TemplateModelComparison",
"TemplateModelDelta", "RefinementClosure",
"get_dkg_refinement_closure"]
from collections import defaultdict
from itertools import combinations, count, product
from typing import Literal, Optional, Mapping, List, Tuple, Dict, Callable, \
Union, Set
import networkx as nx
import sympy
from pydantic import BaseModel, conint, Field
from tqdm import tqdm
from .templates import Provenance, Concept, Template, SympyExprStr, IS_EQUAL, \
REFINEMENT_OF, CONTROLLER, CONTROLLERS, SUBJECT, OUTCOME, SpecifiedTemplate
from .template_model import Initial, TemplateModel, get_concept_graph_key, \
get_template_graph_key
from .utils import safe_parse_expr
TAG1_COLOR = "blue"
TAG2_COLOR = "green"
MERGE_COLOR = "orange"
class DataNode(BaseModel):
"""A node in a ModelComparisonGraphdata"""
node_type: Literal["template", "concept"]
model_id: conint(ge=0, strict=True)
class TemplateNode(DataNode):
"""A node in a ModelComparisonGraphdata representing a Template"""
type: str
rate_law: Optional[SympyExprStr] = \
Field(default=None, description="The rate law of this template")
initials: Optional[Mapping[str, Initial]] = \
Field(default=None, description="The initial conditions associated "
"with the rate law for this template")
provenance: List[Provenance] = Field(default_factory=list)
class ConceptNode(Concept, DataNode):
"""A node in a ModelComparisonGraphdata representing a Concept"""
curie: str
DataNodeKey = Tuple[str, ...]
class DataEdge(BaseModel):
"""An edge in a ModelComparisonGraphdata"""
source_id: DataNodeKey
target_id: DataNodeKey
class InterModelEdge(DataEdge):
role: Literal["refinement_of", "is_equal"]
class IntraModelEdge(DataEdge):
role: Literal["subject", "outcome", "controller"]
[docs]class ModelComparisonGraphdata(BaseModel):
"""A data structure holding a graph representation of TemplateModel delta"""
class Config:
arbitrary_types_allowed = True
json_encoders = {
SympyExprStr: lambda e: str(e),
}
json_decoders = {
SympyExprStr: lambda e: safe_parse_expr(e),
Template: lambda t: Template.from_json(data=t),
}
template_models: Dict[int, TemplateModel] = Field(
..., description="A mapping of template model keys to template models"
)
concept_nodes: Dict[int, Dict[int, Concept]] = Field(
default_factory=list,
description="A mapping of model identifiers to a mapping of node "
"identifiers to nodes. Node identifiers have the structure of 'mXnY' "
"where X is the model id and Y is the node id within the model.",
)
template_nodes: Dict[int, Dict[int, SpecifiedTemplate]] = Field(
default_factory=list,
description="A mapping of model identifiers to a mapping of node "
"identifiers to nodes. Node identifiers have the structure of 'mXnY' "
"where X is the model id and Y is the node id within the model.",
)
# nodes are tuples of (model id, node id) for look
inter_model_edges: List[Tuple[Tuple[int, int], Tuple[int, int], str]] = \
Field(
default_factory=list,
description="List of edges. Each edge is a tuple of"
"(source node lookup, target node lookup, role) where role describes "
"if the edge is a refinement of or equal to another node in another "
"model (inter model edge). The edges are considered directed for "
"refinements and undirected for equalities. The node lookup is a "
"tuple of (model id, node id) that defines the lookup of the node "
"in the nodes mapping.",
)
intra_model_edges: List[Tuple[Tuple[int, int], Tuple[int, int], str]] = Field(
default_factory=list,
description="List of edges. Each edge is a tuple of"
"(source node lookup, target node lookup, role) where role describes "
"if the edge incoming to, outgoing from or controls a "
"template/process in the same model (intra model edge). The edges "
"are considered directed. The node lookup is a tuple of "
"(model id, node id) that defines the lookup of the node in the "
"nodes mapping.",
)
[docs] def get_similarity_score(self, model1_id: int, model2_id: int) -> float:
"""Get the similarity score of the model comparison
Parameters
----------
model1_id :
The id of the first model
model2_id :
The id of the second model
Returns
-------
:
The similarity score
"""
# Get all concept nodes for each model
model1_concept_nodes = set()
for node_id, node in self.concept_nodes[model1_id].items():
model1_concept_nodes.add((model1_id, node_id))
model2_concept_nodes = set()
for node_id, node in self.concept_nodes[model2_id].items():
model2_concept_nodes.add((model2_id, node_id))
# Check which model has the most nodes
n_nodes1 = len(model1_concept_nodes)
n_nodes2 = len(model2_concept_nodes)
# Set model 1 to be the model with the most nodes
if n_nodes2 > n_nodes1:
# Switch the sets
model1_concept_nodes, model2_concept_nodes = \
model2_concept_nodes, model1_concept_nodes
# Switch the number of nodes
n_nodes2, n_nodes1 = n_nodes1, n_nodes2
# Switch the model ids
model1_id, model2_id = model2_id, model1_id
# Create an index of all the edges between the two models
index = defaultdict(lambda: defaultdict(set))
for t in (IS_EQUAL, REFINEMENT_OF):
for (msource_id, source_id), (mtarget_id, target_id), e_type in \
self.inter_model_edges:
source_tuple = (msource_id, source_id)
target_tuple = (mtarget_id, target_id)
if e_type != t:
continue
# Add model1 -> model2 edge
if msource_id == model1_id and mtarget_id == model2_id:
index[t][source_tuple].add(target_tuple)
# Add model2 -> model1 edge
if msource_id == model2_id and mtarget_id == model1_id:
index[t][target_tuple].add(source_tuple)
score = 0
for model1_node_json in model1_concept_nodes:
if model1_node_json in index[IS_EQUAL]:
# todo: fix this check
score += 1
elif model1_node_json in index[REFINEMENT_OF]:
score += 0.5
# Todo: Come up with a better metric?
concept_similarity_score = score / n_nodes1
return concept_similarity_score
[docs] def get_similarity_scores(self):
"""Get the similarity scores for all model comparisons
Returns
-------
:
A list of dictionaries with the model ids and the similarity score
"""
scores = []
for i, j in combinations(range(len(self.template_models)), 2):
scores.append({
'models': (i,j),
'score': self.get_similarity_score(i, j)
})
return scores
[docs] @classmethod
def from_template_models(
cls,
template_models: List[TemplateModel],
refinement_func: Callable[[str, str], bool]
) -> "ModelComparisonGraphdata":
"""Create a ModelComparisonGraphdata from a list of TemplateModels
Parameters
----------
template_models :
The list of TemplateModels to compare
refinement_func :
The refinement function to use when comparing concepts
Returns
-------
:
The ModelComparisonGraphdata
"""
return TemplateModelComparison(
template_models, refinement_func
).model_comparison
[docs]class TemplateModelComparison:
"""Compares TemplateModels in a graph friendly structure"""
model_comparison: ModelComparisonGraphdata
def __init__(
self,
template_models: List[TemplateModel],
refinement_func: Callable[[str, str], bool]
):
"""Create a ModelComparisonGraphdata from a list of TemplateModels
Parameters
----------
template_models :
The list of TemplateModels to compare
refinement_func :
The refinement function to use when comparing concepts
"""
# Todo: Add more identifiable ID to template model than index?
if len(template_models) < 2:
raise ValueError("Need at least two models to make comparison")
self.template_node_lookup: Dict[Tuple, Template] = {}
self.concept_node_lookup: Dict[Tuple, Concept] = {}
self.intra_model_edges: List[Tuple[Tuple, Tuple, str]] = []
self.inter_model_edges: List[Tuple[Tuple, Tuple, str]] = []
self.refinement_func = refinement_func
self.template_models: Dict[int, TemplateModel] = {
ix: tm for ix, tm in enumerate(iterable=template_models)
}
self.compare_models()
def _add_concept_nodes_edges(
self,
template_node_id: Tuple,
role: str,
concept: Union[Concept, List[Concept]]):
model_id = template_node_id[0]
# Add one or several concept nodes with their template-concept edges
if isinstance(concept, Concept):
# Just need some hashable id for the concept and then translate
# it to an integer
concept_node_id = (model_id,) + get_concept_graph_key(concept)
if concept_node_id not in self.concept_node_lookup:
self.concept_node_lookup[concept_node_id] = concept
# Add edges for subjects, controllers and outcomes
if role in [CONTROLLER, CONTROLLERS, SUBJECT]:
self.intra_model_edges.append(
(concept_node_id, template_node_id, role)
)
elif role == OUTCOME:
self.intra_model_edges.append(
(template_node_id, concept_node_id, role)
)
else:
raise ValueError(f"Invalid role {role}")
elif isinstance(concept, list):
for conc in concept:
self._add_concept_nodes_edges(
template_node_id, role, conc
)
else:
raise TypeError(f"Invalid concept type {type(concept)}")
def _add_template_model(
self, model_id: int, template_model: TemplateModel
):
# Create the graph data for this template model
for template in template_model.templates:
template_node_id = (model_id, ) + get_template_graph_key(template)
if template_node_id not in self.template_node_lookup:
self.template_node_lookup[template_node_id] = template
# Add concept nodes and intra model edges
for role, concept in template.get_concepts_by_role().items():
self._add_concept_nodes_edges(template_node_id, role, concept)
def _add_inter_model_edges(
self,
node_id1: Tuple[str, ...],
data_node1: Union[Concept, Template],
node_id2: Tuple[str, ...],
data_node2: Union[Concept, Template],
):
if data_node1.is_equal_to(data_node2, with_context=True):
# Add equality edge
self.inter_model_edges.append(
(node_id1, node_id2, "is_equal")
)
elif data_node1.refinement_of(data_node2, self.refinement_func, with_context=True):
self.inter_model_edges.append(
(node_id1, node_id2, "refinement_of")
)
elif data_node2.refinement_of(data_node1, self.refinement_func, with_context=True):
self.inter_model_edges.append(
(node_id2, node_id1, "refinement_of")
)
[docs] def compare_models(self):
"""Run model comparison"""
for model_id, template_model in self.template_models.items():
self._add_template_model(model_id, template_model)
# Create inter model edges, i.e refinements and equalities
for (node_id1, data_node1), (node_id2, data_node2) in \
tqdm(combinations(self.template_node_lookup.items(), r=2),
desc="Comparing model templates"):
if node_id1[:2] == node_id2[:2]:
continue
self._add_inter_model_edges(node_id1, data_node1,
node_id2, data_node2)
# Create inter model edges, i.e refinements and equalities
for (node_id1, data_node1), (node_id2, data_node2) in \
tqdm(combinations(self.concept_node_lookup.items(), r=2),
desc="Comparing model concepts"):
if node_id1[:2] == node_id2[:2]:
continue
self._add_inter_model_edges(node_id1, data_node1,
node_id2, data_node2)
concept_nodes = defaultdict(dict)
template_nodes = defaultdict(dict)
model_node_counters = {}
old_new_map = {}
for old_node_id, node in self.template_node_lookup.items():
m_id = old_node_id[0]
# Restart node counter for new models
if m_id not in model_node_counters:
model_node_counter = count()
model_node_counters[m_id] = model_node_counter
else:
model_node_counter = model_node_counters[m_id]
node_id = next(model_node_counter)
old_new_map[old_node_id] = (m_id, node_id)
template_nodes[m_id][node_id] = node
for old_node_id, node in self.concept_node_lookup.items():
m_id = old_node_id[0]
# Restart node counter for new models
if m_id not in model_node_counters:
model_node_counter = count()
model_node_counters[m_id] = model_node_counter
else:
model_node_counter = model_node_counters[m_id]
node_id = next(model_node_counter)
old_new_map[old_node_id] = (m_id, node_id)
concept_nodes[m_id][node_id] = node
# todo: consider doing nested arrays instead of nested mappings
# for both nodes and models
# nodes: [
# [{node}, ...],
# [{node}, ...],
# ]
# translate old node ids to new node ids in the edges
inter_model_edges = [
(old_new_map[old_node_id1], old_new_map[old_node_id2], edge_type)
for old_node_id1, old_node_id2, edge_type in self.inter_model_edges
]
intra_model_edges = [
(old_new_map[old_node_id1], old_new_map[old_node_id2], edge_type)
for old_node_id1, old_node_id2, edge_type in self.intra_model_edges
]
self.model_comparison = ModelComparisonGraphdata(
template_models=self.template_models,
template_nodes=template_nodes,
concept_nodes=concept_nodes,
inter_model_edges=inter_model_edges,
intra_model_edges=intra_model_edges
)
[docs]class TemplateModelDelta:
"""Defines the differences between TemplateModels as a networkx graph"""
def __init__(
self,
template_model1: TemplateModel,
template_model2: TemplateModel,
refinement_function: Callable[[str, str], bool],
tag1: str = "1",
tag2: str = "2",
tag1_color: str = TAG1_COLOR,
tag2_color: str = TAG2_COLOR,
merge_color: str = MERGE_COLOR,
):
"""Create a TemplateModelDelta
Parameters
----------
template_model1 :
The first template model
template_model2 :
The second template model
refinement_function :
The refinement function to use when comparing concepts
tag1 :
The tag for the first template model. Default: "1"
tag2 :
The tag for the second template model. Default: "2"
tag1_color :
The color for the first template model. Default: "blue"
tag2_color :
The color for the second template model. Default: "green"
merge_color :
The color for the merged template model. Default: "orange"
"""
self.refinement_func = refinement_function
self.template_model1 = template_model1
self.templ1_graph = template_model1.generate_model_graph()
self.tag1 = tag1
self.tag1_color = tag1_color
self.template_model2 = template_model2
self.templ2_graph = template_model2.generate_model_graph()
self.tag2 = tag2
self.tag2_color = tag2_color
self.merge_color = merge_color
self.comparison_graph = nx.DiGraph()
self.comparison_graph.graph["rankdir"] = "LR" # transposed node tables
self._assemble_comparison()
def _add_node(self, template: Template, tag: str):
# Get a unique identifier for node
node_id = (*get_template_graph_key(template), tag)
self.comparison_graph.add_node(
node_id,
type=template.type,
template_key=template.get_key(),
label=template.type,
color=self.tag1_color if tag == self.tag1 else self.tag2_color,
shape="record",
)
return node_id
def _add_edge(
self,
source: Template,
source_tag: str,
target: Template,
target_tag: str,
edge_type: Literal["refinement_of", "is_equal"],
):
n1_id = self._add_node(source, tag=source_tag)
n2_id = self._add_node(target, tag=target_tag)
if edge_type == "refinement_of":
# source is a refinement of target
self.comparison_graph.add_edge(n1_id, n2_id, label=edge_type,
color="red", weight=2)
else:
# is_equal: add edges both ways
self.comparison_graph.add_edge(n1_id, n2_id, label=edge_type,
color="red", weight=2)
self.comparison_graph.add_edge(n2_id, n1_id, label=edge_type,
color="red", weight=2)
def _add_graphs(self):
# Add the graphs together
nodes_to_add = []
template_node_ids = set()
for node, node_data in self.templ1_graph.nodes(data=True):
# If Template node, append tag to node id
if "template_key" in node_data:
# NOTE: if we want to merge Template nodes skip appending
# the tag to the tuple
node_id = (*node, self.tag1)
template_node_ids.add(node)
else:
# Assumed to be a Concept node
node_id = node
node_data["color"] = self.tag1_color
nodes_to_add.append((node_id, {"tags": {self.tag1}, **node_data}))
self.comparison_graph.add_nodes_from(nodes_to_add)
model1_identity_keys = {
data['concept_identity_key']: node for node, data
in self.templ1_graph.nodes(data=True)
if 'concept_identity_key' in data
}
to_contract = set()
# For the other template, add nodes that are missing, update data
# for the ones that are already in
for node, node_data in self.templ2_graph.nodes(data=True):
# NOTE: if we want to merge Template nodes skip appending
# the tag to the tuple
if "template_key" in node_data:
node_id = (*node, self.tag2)
template_node_ids.add(node)
node_data["tags"] = {self.tag2}
node_data["color"] = self.tag2_color
self.comparison_graph.add_node(node_id, **node_data)
else:
# There is an exact match for this node so we don't need
# to add it
if node in self.comparison_graph.nodes:
# If node already exists, add to tags and update color
self.comparison_graph.nodes[node]["tags"].add(self.tag2)
self.comparison_graph.nodes[node]["color"] = self.merge_color
# There is an identity match but tha names (unstandardized)
# don't match. So we merge these nodes later
elif node_data['concept_identity_key'] in model1_identity_keys:
# Make sure the color will be the merge color
matching_node = model1_identity_keys[node_data['concept_identity_key']]
self.comparison_graph.nodes[matching_node]["color"] = self.merge_color
# We still add the node, it will be contracted later
node_data["tags"] = {self.tag2}
node_data["color"] = self.merge_color
self.comparison_graph.add_node(node, **node_data)
# Add to the list of contracted nodes
to_contract.add((node, matching_node))
# There is no match so we add a new node
else:
# If node doesn't exist, add it
node_data["tags"] = {self.tag2}
node_data["color"] = self.tag2_color
self.comparison_graph.add_node(node, **node_data)
def extend_data(d, color):
d["color"] = color
return d
self.comparison_graph.add_edges_from(
((*u, self.tag1) if u in template_node_ids else u,
(*v, self.tag1) if v in template_node_ids else v,
extend_data(d, self.tag1_color))
for u, v, d in self.templ1_graph.edges(data=True)
)
self.comparison_graph.add_edges_from(
((*u, self.tag2) if u in template_node_ids else u,
(*v, self.tag2) if v in template_node_ids else v,
extend_data(d, self.tag2_color))
for u, v, d in self.templ2_graph.edges(data=True)
)
# Add lookup of concepts so we can add refinement edges
templ1_concepts = {}
for templ1 in self.template_model1.templates:
for concept in templ1.get_concepts():
key = get_concept_graph_key(concept)
templ1_concepts[key] = concept
templ2_concepts = {}
for templ2 in self.template_model2.templates:
for concept in templ2.get_concepts():
key = get_concept_graph_key(concept)
templ2_concepts[key] = concept
concept_refinement_edges = []
joint_concept_keys = set().union(templ1_concepts.keys()).union(templ2_concepts.keys())
ref_dict = dict(label="refinement_of", color="red", weight=2)
for (n_a, data_a), (n_b, data_b) in combinations(self.comparison_graph.nodes(data=True), 2):
if n_a in joint_concept_keys and n_b in joint_concept_keys:
if self.tag1 in data_a["tags"]:
c1 = templ1_concepts[n_a]
elif self.tag1 in data_b["tags"]:
c1 = templ1_concepts[n_b]
else:
continue
if self.tag2 in data_a["tags"]:
c2 = templ2_concepts[n_a]
elif self.tag2 in data_b["tags"]:
c2 = templ2_concepts[n_b]
else:
continue
if c1.is_equal_to(c2, with_context=True):
continue
if c1.refinement_of(c2,
refinement_func=self.refinement_func,
with_context=True):
concept_refinement_edges.append((n_a, n_b, ref_dict))
if c2.refinement_of(c1,
refinement_func=self.refinement_func,
with_context=True):
concept_refinement_edges.append((n_b, n_a, ref_dict))
if concept_refinement_edges:
self.comparison_graph.add_edges_from(concept_refinement_edges)
for u, v in to_contract:
self.comparison_graph = \
nx.contracted_nodes(self.comparison_graph, u, v)
def _assemble_comparison(self):
self._add_graphs()
for templ1, templ2 in product(self.template_model1.templates,
self.template_model2.templates):
# Check for refinement and equality
if templ1.is_equal_to(templ2, with_context=True):
self._add_edge(
source=templ1,
source_tag=self.tag1,
target=templ2,
target_tag=self.tag2,
edge_type="is_equal",
)
elif templ1.refinement_of(templ2,
refinement_func=self.refinement_func,
with_context=True):
self._add_edge(
source=templ1,
source_tag=self.tag1,
target=templ2,
target_tag=self.tag2,
edge_type="refinement_of",
)
elif templ2.refinement_of(templ1,
refinement_func=self.refinement_func,
with_context=True):
self._add_edge(
source=templ2,
source_tag=self.tag2,
target=templ1,
target_tag=self.tag1,
edge_type="refinement_of",
)
[docs] def draw_graph(
self, path: str, prog: str = "dot", args: str = "", format: Optional[str] = None
):
"""Draw a pygraphviz graph of the differences using
Parameters
----------
path :
The path to the output file
prog :
The graphviz layout program to use, such as "dot", "neato", etc.
format :
Set the file format explicitly
args :
Additional arguments to pass to the graphviz bash program as a
string. Example: "args="-Nshape=box -Edir=forward -Ecolor=red "
"""
# draw graph
agraph = nx.nx_agraph.to_agraph(self.comparison_graph)
agraph.draw(path, format=format, prog=prog, args=args)
[docs] def graph_as_json(self) -> Dict:
"""Return the comparison graph json serializable node-link data"""
return nx.node_link_data(self.comparison_graph)
[docs] @classmethod
def for_jupyter(
cls,
template_model1,
template_model2,
refinement_function,
name="model.png",
tag1="1",
tag2="2",
tag1_color=TAG1_COLOR,
tag2_color=TAG2_COLOR,
merge_color=MERGE_COLOR,
prog: str = "dot",
args: str = "",
format: Optional[str] = None,
**kwargs
):
"""Display in jupyter
Parameters
----------
template_model1 :
The first template model
template_model2 :
The second template model
refinement_function :
The refinement function to use
name :
The name of the output file
tag1 :
The tag for the first template model
tag2 :
The tag for the second template model
tag1_color :
The color for the first template model
tag2_color :
The color for the second template model
merge_color :
The color for the merged template model
prog :
The graphviz layout program to use, such as "dot", "neato", etc.
format :
Set the file format explicitly
args :
Additional arguments to pass to the graphviz bash program as a
string. Example: "args="-Nshape=box -Edir=forward -Ecolor=red"
kwargs :
Keyword arguments to pass to IPython.display.Image
Returns
-------
:
The IPython Image object
"""
from IPython.display import Image
if not name.endswith(".png"):
name += ".png"
print(f"Appending .png to name. New name: {name}")
TemplateModelDelta(template_model1=template_model1,
template_model2=template_model2,
refinement_function=refinement_function,
tag1=tag1,
tag2=tag2,
tag1_color=tag1_color,
tag2_color=tag2_color,
merge_color=merge_color
).draw_graph(name,
prog=prog,
args=args,
format=format)
return Image(name, **kwargs)
[docs]class RefinementClosure:
"""A wrapper class for storing a transitive closure and exposing a
function to check for refinement relationship.
Typical usage would involve:
>>> from mira.dkg.web_client import get_transitive_closure_web
>>> rc = RefinementClosure(get_transitive_closure_web())
>>> rc.is_ontological_child('doid:0080314', 'bfo:0000016')
"""
def __init__(self, transitive_closure: Set[Tuple[str, str]]):
"""Initialize the RefinementClosure
Parameters
----------
transitive_closure :
The transitive closure of the refinement relationship
"""
self.transitive_closure = transitive_closure
[docs] def is_ontological_child(self, child_curie: str, parent_curie: str) -> bool:
"""Check if the child is a refinement of the parent
Parameters
----------
child_curie :
The child curie
parent_curie :
The parent curie
Returns
-------
:
True if the child is a refinement of the parent, False otherwise
"""
return (child_curie, parent_curie) in self.transitive_closure
[docs]def get_dkg_refinement_closure() -> RefinementClosure:
"""Return a refinement closure from the DKG
Returns
-------
:
The refinement closure
"""
# Import here to avoid dependency upon module import
from mira.dkg.web_client import get_transitive_closure_web
rc = RefinementClosure(get_transitive_closure_web())
return rc