Source code for mira.modeling.amr.petrinet

"""This module implements generation into Petri net models which are defined
at https://github.com/DARPA-ASKEM/Model-Representations/tree/main/petrinet.
"""

__all__ = ["AMRPetriNetModel", "ModelSpecification",
           "template_model_to_petrinet_json"]

import json
import logging
from copy import deepcopy
from typing import Dict, List, Optional

from pydantic import BaseModel, Field

from mira.metamodel import expression_to_mathml, safe_parse_expr, \
    TemplateModel

from .. import Model
from .utils import add_metadata_annotations

logger = logging.getLogger(__name__)

SCHEMA_VERSION = '0.6'
SCHEMA_URL = ('https://raw.githubusercontent.com/DARPA-ASKEM/'
              'Model-Representations/petrinet_v%s/petrinet/'
              'petrinet_schema.json') % SCHEMA_VERSION


[docs]class AMRPetriNetModel: """A class representing a PetriNet model.""" def __init__(self, model: Model): """Instantiate a petri net model from a generic transition model. Parameters ---------- model: The pre-compiled transition model """ self.properties = {} self.initials = [] self.rates = [] self.states = [] self.transitions = [] self.parameters = [] self.metadata = {} self.time = None self.observables = [] self.model_name = 'Model' if model.template_model.annotations and \ model.template_model.annotations.name: self.model_name = model.template_model.annotations.name self.model_description = self.model_name if model.template_model.annotations and \ model.template_model.annotations.description: self.model_description = \ model.template_model.annotations.description vmap = {} for key, var in model.variables.items(): # Use the variable's concept name if possible but fall back # on the key otherwise vmap[key] = name = var.concept.name or str(key) display_name = var.concept.display_name or name # State structure # { # 'id': str, # 'name': str, # 'grounding': {identifiers, modifiers}, # } states_dict = { 'id': name, 'name': display_name, 'grounding': { 'identifiers': {k: v for k, v in var.concept.identifiers.items() if k != 'biomodels.species'}, 'modifiers': var.concept.context, }, } if var.concept.units: states_dict['units'] = { 'expression': str(var.concept.units.expression), 'expression_mathml': expression_to_mathml( var.concept.units.expression.args[0] ), } self.states.append(states_dict) # 'initial' object structure # { # 'target': str, # refers to a state id above # 'expression': str, # 'expression_mathml': str, # } initial = var.data.get('expression') if initial is not None: initial_data = { 'target': name, 'expression': str(initial), 'expression_mathml': expression_to_mathml(initial) } self.initials.append(initial_data) for key, observable in model.observables.items(): display_name = observable.observable.display_name \ if observable.observable.display_name \ else observable.observable.name obs_data = { 'id': observable.observable.name, 'name': display_name, 'expression': str(observable.observable.expression), 'expression_mathml': expression_to_mathml( observable.observable.expression.args[0]), } self.observables.append(obs_data) if model.template_model.time: self.time = {'id': model.template_model.time.name} if model.template_model.time.units: self.time['units'] = { 'expression': str(model.template_model.time.units.expression), 'expression_mathml': expression_to_mathml( model.template_model.time.units.expression.args[0]), } else: self.time = None # Transition structure # { # "id": "t1", # "input": ["s1", "s2"], # "output": ["s3", "s4"], # "grounding": {identifiers, modifiers}, # "properties": {...}, keys: name, grounding > {identifiers, modifiers} # } # Rate structure: # { # target: string, # refers to a transition id above # expression: string, # expression_mathml # } for idx, transition in enumerate(model.transitions.values()): tid = transition.template.name \ if transition.template.name else f"t{idx + 1}" # fixme: get grounding for transition transition_dict = {"id": tid} inputs = [] outputs = [] for c in transition.control: inputs.append(vmap[c.key]) outputs.append(vmap[c.key]) for c in transition.consumed: inputs.append(vmap[c.key]) for p in transition.produced: outputs.append(vmap[p.key]) transition_dict['input'] = inputs transition_dict['output'] = outputs # Include rate law if transition.template.rate_law: rate_law = transition.template.rate_law.args[0] self.rates.append({ 'target': tid, 'expression': str(rate_law), 'expression_mathml': expression_to_mathml(rate_law) }) transition_dict['properties'] = { 'name': tid, } self.transitions.append(transition_dict) for key, param in model.parameters.items(): if param.placeholder: continue param_dict = {'id': str(key)} if param.display_name: param_dict['name'] = param.display_name if param.description: param_dict['description'] = param.description if param.value is not None: param_dict['value'] = param.value if not param.distribution: pass elif param.distribution.type is None: logger.warning("can not add distribution without type: %s", param.distribution) else: param_dict['distribution'] = { 'type': param.distribution.type, 'parameters': param.distribution.parameters, } if param.concept and param.concept.units: param_dict['units'] = { 'expression': str(param.concept.units.expression), 'expression_mathml': expression_to_mathml( param.concept.units.expression.args[0]), } self.parameters.append(param_dict) add_metadata_annotations(self.metadata, model)
[docs] def to_json( self, name: str = None, description: str = None, model_version: str = None ): """Return a JSON dict structure of the Petri net model. Parameters ---------- name : The name of the model. Defaults to the name of the original template model that produced the input Model instance or, if not available, 'Model'. description : A description of the model. Defaults to the description of the original template model that produced the input Model instance or, if not available, the name of the model. model_version : The version of the model. Defaults to '0.1'. Returns ------- : JSON A JSON dict representing the Petri net model. """ return { 'header': { 'name': name or self.model_name, 'schema': SCHEMA_URL, 'schema_name': 'petrinet', 'description': description or self.model_description, 'model_version': model_version or '0.1', }, 'properties': self.properties, 'model': { 'states': self.states, 'transitions': self.transitions, }, 'semantics': {'ode': { 'rates': self.rates, 'initials': self.initials, 'parameters': self.parameters, 'observables': self.observables, 'time': self.time if self.time else {'id': 't'} }}, 'metadata': self.metadata, }
[docs] def to_pydantic(self, name=None, description=None, model_version=None) -> "ModelSpecification": """Return a Pydantic model representation of the Petri net model. Parameters ---------- name : The name of the model. Defaults to the name of the original template model that produced the input Model instance or, if not available, 'Model'. description : A description of the model. Defaults to the description of the original template model that produced the input Model instance or, if not available, the name of the model. model_version : The version of the model. Defaults to '0.1'. Returns ------- : A Pydantic model representation of the Petri net model. """ return ModelSpecification( header=Header( name=name or self.model_name, schema=SCHEMA_URL, schema_name='petrinet', description=description or self.model_description, model_version=model_version or '0.1', ), properties=self.properties, model=PetriModel( states=[State.parse_obj(s) for s in self.states], transitions=[Transition.parse_obj(t) for t in self.transitions], ), semantics=Ode(ode=OdeSemantics( rates=[Rate.parse_obj(r) for r in self.rates], initials=[Initial.parse_obj(i) for i in self.initials], parameters=[Parameter.parse_obj(p) for p in self.parameters], observables=[Observable.parse_obj(o) for o in self.observables], time=Time.parse_obj(self.time) if self.time else Time(id='t') )), metadata=self.metadata, )
[docs] def to_json_str(self, **kwargs) -> str: """Return a JSON string representation of the Petri net model. Parameters ---------- kwargs : Additional keyword arguments to pass to :func:`json.dumps`. Returns ------- : A JSON string representation of the Petri net model. """ return json.dumps(self.to_json(), **kwargs)
[docs] def to_json_file(self, fname, name=None, description=None, model_version=None, **kwargs): """Write the Petri net model to a JSON file Parameters ---------- fname : str The file name to write to. name : str, optional The name of the model. description : str, optional A description of the model. model_version : str, optional The version of the model. kwargs : Additional keyword arguments to pass to :func:`json.dump`. """ indent = kwargs.pop('indent', 1) js = self.to_json(name=name, description=description, model_version=model_version) with open(fname, 'w') as fh: json.dump(js, fh, indent=indent, **kwargs)
[docs]def template_model_to_petrinet_json(tm: TemplateModel): """Convert a template model to a PetriNet JSON dict. Parameters ---------- tm : The template model to convert. Returns ------- A JSON dict representing the PetriNet model. """ return AMRPetriNetModel(Model(tm)).to_json()
class Initial(BaseModel): target: str expression: str expression_mathml: str class TransitionProperties(BaseModel): name: Optional[str] grounding: Optional[Dict] class Rate(BaseModel): target: str expression: str expression_mathml: str class Distribution(BaseModel): type: str parameters: Dict class Units(BaseModel): expression: str expression_mathml: str class State(BaseModel): id: str name: Optional[str] = None grounding: Optional[Dict] units: Optional[Units] = None class Transition(BaseModel): id: str input: List[str] output: List[str] grounding: Optional[Dict] properties: Optional[TransitionProperties] class Parameter(BaseModel): id: str description: Optional[str] = None value: Optional[float] = None grounding: Optional[Dict] distribution: Optional[Distribution] = None units: Optional[Units] = None @classmethod def from_dict(cls, d): d = deepcopy(d) d['id'] = str(d['id']) return cls.parse_obj(d) class Time(BaseModel): id: str units: Optional[Units] = None class Observable(BaseModel): id: str name: Optional[str] grounding: Optional[Dict] expression: str expression_mathml: str class PetriModel(BaseModel): states: List[State] transitions: List[Transition] class OdeSemantics(BaseModel): rates: List[Rate] initials: List[Initial] parameters: List[Parameter] time: Optional[Time] observables: List[Observable] class Ode(BaseModel): ode: Optional[OdeSemantics] class Header(BaseModel): name: str schema_url: str = Field(..., alias='schema') schema_name: str description: str model_version: str
[docs]class ModelSpecification(BaseModel): """A Pydantic model corresponding to the PetriNet JSON schema.""" header: Header properties: Optional[Dict] model: PetriModel semantics: Optional[Ode] metadata: Optional[Dict]