Source code for mira.sources.system_dynamics.pysd

"""This module implements parsing of a generic pysd model irrespective of source and source type
and extracting its contents to create an equivalent MIRA template model.
"""

__all__ = ["template_model_from_pysd_model"]

import copy
import re
import typing as t
import logging
from more_itertools import chunked

import networkx as nx
import pandas as pd
import sympy
from networkx import topological_sort

from mira.metamodel import *
from mira.metamodel.utils import safe_parse_expr
from mira.metamodel import Concept, TemplateModel
from mira.sources.util import (
    parameter_to_mira,
    transition_to_templates,
    get_sympy,
)

logger = logging.getLogger(__name__)

UNITS_MAPPING = {
    sympy.Symbol("Person"): sympy.Symbol("person"),
    sympy.Symbol("Persons"): sympy.Symbol("person"),
    sympy.Symbol("Day"): sympy.Symbol("day"),
    sympy.Symbol("Days"): sympy.Symbol("day"),
}
SYMPY_FLOW_RATE_PLACEHOLDER = safe_parse_expr("xxplaceholderxx")


[docs]def template_model_from_pysd_model( pysd_model, expression_map, *, grounding_map=None, initials_map: t.Optional[t.Dict[str, float]] = None, ) -> TemplateModel: """Given a model and its accompanying expression_map, extract information from the arguments to create an equivalent MIRA template model. Parameters ---------- pysd_model : Model The pysd model object expression_map : dict[str,str] Map of variable name to expression grounding_map: dict[str, Concept] A grounding map, a map from label to Concept initials_map: dict[str, float] A pre-populated mapping of initial values Returns ------- : MIRA template model """ # dataframe that contains information about each variable in the model model_doc_df = pysd_model.doc # mapping of expressions after they have been processed to be sympy compatible processed_expression_map = {} # Mapping of variable name in vensim model to variable python-equivalent name name_to_identifier = dict(model_doc_df[["Real Name", "Py Name"]].values) identifier_to_name = dict(model_doc_df[["Py Name", "Real Name"]].values) # preprocess expression text to make it sympy parseable for var_name, var_expression in expression_map.items(): new_var_name = name_to_identifier[var_name] processed_expression_map[new_var_name] = preprocess_expression_text( var_expression ) # Mapping of variable's python name to symbol for expression parsing in sympy symbols = dict( zip( model_doc_df["Py Name"], list(map(lambda x: sympy.Symbol(x), model_doc_df["Py Name"])), ) ) # Retrieve the states model_states = model_doc_df.loc[ (model_doc_df["Type"] == "Stateful") & (model_doc_df["Subtype"] == "Integ") ] concepts = {} all_states = set() # map between states and incoming/out-coming rate laws state_rate_map = {} # process states and build mapping of state to input rate laws and output rate laws for index, state in model_states.iterrows(): concept_state = state_to_concept(state, grounding_map=grounding_map) concepts[concept_state.name] = concept_state all_states.add(concept_state.name) symbols[concept_state.name] = sympy.Symbol(concept_state.name) state_id = state["Py Name"] state_rate_map[state_id] = {"input_rates": [], "output_rates": []} state_expr_text = processed_expression_map[state_id] # retrieve the expression of inflows and outflows for the state state_arg_sympy = safe_parse_expr(state_expr_text, symbols) # Create a map of states and whether the flows involved with a state are going in # or out of the state if state_arg_sympy.args: # if it's just the negation of a single symbol if ( sympy.core.numbers.NegativeOne() in state_arg_sympy.args and len(state_arg_sympy.args) == 2 ): str_symbol = str(state_arg_sympy) state_rate_map[state_id]["output_rates"].append(str_symbol[1:]) else: for rate_free_symbol in state_arg_sympy.args: str_rate_free_symbol = str(rate_free_symbol) if "-" in str_rate_free_symbol: # If the symbol representing the flow has a negative sign, it is an # outgoing flow # Add the symbol to outputs symbol without the negative sign state_rate_map[state_id]["output_rates"].append( str_rate_free_symbol[1:] ) else: # else it is an incoming flow state_rate_map[state_id]["input_rates"].append( str_rate_free_symbol ) else: # if it's just a single symbol (i.e. no negation), args property will be empty state_rate_map[state_id]["input_rates"].append(str(state_arg_sympy)) # get a mapping from flow/stock/etc. name to the related Sympy expression. # slightly redundant of the previous block, but this is better encapsulated identifier_to_expr = get_identifier_to_expr( pysd_model, expression_map, concepts ) # process initials, currently we use the value of the state at timestamp 0 # state initial values are listed in the same order as states are for the pysd model # TODO: Current assumption is true for other models but not the Vensim hackathon model. # We have 19 states but 44 initial values so we do not assign the right initial values. # For the hackathon, the only initial that has a value other than 0 is "Susceptibles" with a # value of "1.3392e+09. # One solution for this through Vensim is to text parse the Integ function which is in the form # INTEG(<expression>,<value>) and extract the value as well and keep it in a separate mapping # for initials where we keep track of the value for each initial. Then pass the initial mapping # to "template_model_from_pysd_model" method. So this would be done in "vensim.py" mira_initials = {} for state_initial_value, (state_id, state_concept) in zip( pysd_model.state, concepts.items() ): if initials_map and (mapped_value := initials_map.get(state_id)): initial = Initial( concept=concepts[state_id].copy(deep=True), expression=SympyExprStr(sympy.Float(mapped_value)), ) # if the state value is not a number elif not isinstance(state_initial_value, int) and not isinstance( state_initial_value, float ): logger.warning( f"got non-numeric state value for {state_id}: {state_initial_value}" ) initial = Initial( concept=concepts[state_id].copy(deep=True), expression=SympyExprStr(sympy.Float("0")), ) else: initial = Initial( concept=concepts[state_id].copy(deep=True), expression=SympyExprStr(sympy.Float(state_initial_value)), ) mira_initials[initial.concept.name] = initial # Stella issue # Currently cannot capture all parameters as some of them cannot have their expressions parsed # Additionally, some auxiliary variables are added as parameters due to inability to parse # the auxiliary expression and differentiate it from a parameter mira_parameters = {} # process parameters # No discernible way to identify only parameters in model_doc_df, so go through all variables # in the processed expression map for name, expression in processed_expression_map.items(): # evaluate the expression # try catch exists for stella models, safe_parse_expr will error for incorrectly # constructed parameters in Stella # eval_expression returns a sympy object try: eval_expression = safe_parse_expr(expression).evalf() except TypeError: eval_expression = safe_parse_expr("0") # convert the sympy object to a string str_eval_expression = str(eval_expression) value = None is_initial = False # Stella issue # Sometimes parameter values reference a stock rather than a number if str_eval_expression in mira_initials: value = float(str(mira_initials[str_eval_expression].expression)) is_initial = True # Stella issue # Replace negative signs for placeholder parameter value which is "-1" for Aux structures # that cannot be parsed from stella models. isdecimal() interprets the "-" as a dash. # If the sympy expression object isn't equal to the placeholder # If the string is evaluated to be a number after removing decimals, dashes and empty spaces # then create a parameter if str_eval_expression in mira_initials or ( eval_expression != SYMPY_FLOW_RATE_PLACEHOLDER and str_eval_expression.replace(".", "") .replace(" ", "") .replace("-", "") .isdecimal() ): if not is_initial: value = float(str_eval_expression) model_parameter_info = model_doc_df[model_doc_df["Py Name"] == name] # if units exist if ( model_parameter_info["Units"].values[0] and model_parameter_info["Units"].values[0] != "dimensionless" and model_parameter_info["Units"].values[0] != "fraction" ): unit_text = ( model_parameter_info["Units"].values[0].replace(" ", "") ) parameter = { "id": name, "value": value, "description": model_parameter_info["Comment"].values[0], "units": {"expression": unit_text}, } else: parameter = { "id": name, "value": value, "description": model_parameter_info["Comment"].values[0], } mira_parameters[name] = parameter_to_mira(parameter) # standardize parameter units if they exist if mira_parameters[name].units: param_unit = mira_parameters[name].units for old_unit_symbol, new_unit_symbol in UNITS_MAPPING.items(): param_unit.expression = param_unit.expression.subs( old_unit_symbol, new_unit_symbol ) # construct transitions mapping that determine inputs and outputs states to a rate-law transition_map = {} # List of rates # Currently duplicate rates are added # Using a set breaks the current iteration of tests for system dynamics in # "tests/system_dynamics.py" as the tests there test for hard-coded values in the list of # templates # For example, we test that the first template in the list of templates associated with a # template model is of type ControlledConversion for the SIR model; however, using sets, # the first type of template is of type NaturalConversion. Would require a lot of rewriting # of tests. rates = set() for state_rates in state_rate_map.values(): for input_rates in state_rates["input_rates"]: rates.add(input_rates) for output_rates in state_rates["output_rates"]: rates.add(output_rates) # create map of transitions for rate_name in sorted(rates): rate_expr = identifier_to_expr[rate_name] inputs, outputs, controllers = [], [], [] for state_id, in_out_rate_map in state_rate_map.items(): if ( sympy.Symbol(state_id) in rate_expr.free_symbols and rate_name not in in_out_rate_map["output_rates"] ): controllers.append(state_id) # if a rate is leaving a state, then that state is an input to the rate if rate_name in in_out_rate_map["output_rates"]: inputs.append(state_id) # if a rate is going into a state, then that state is an output to the rate if rate_name in in_out_rate_map["input_rates"]: outputs.append(state_id) transition_map[rate_name] = { "name": rate_name, "expression": rate_expr, "inputs": inputs, "outputs": outputs, "controllers": controllers, } # Create templates from transitions templates_ = [] for template_id, (transition_name, transition) in enumerate( transition_map.items(), start=1 ): templates_.extend( transition_to_templates( input_concepts=[ concepts[input_name].copy(deep=True) for input_name in transition.get("inputs") ], output_concepts=[ concepts[output_name].copy(deep=True) for output_name in transition.get("outputs") ], controller_concepts=[ concepts[controller_name].copy(deep=True) for controller_name in transition.get("controllers") ], transition_rate=( transition["expression"] if transition["expression"] != SYMPY_FLOW_RATE_PLACEHOLDER else None ), transition_id=str(template_id), transition_name=transition_name, ) ) time = Time(name="time") return TemplateModel( templates=templates_, parameters=mira_parameters, initials=mira_initials, time=time, )
def state_to_concept(state, grounding_map=None) -> Concept: """Create a MIRA Concept from a state Parameters ---------- state : pd.Series The series that contains state data grounding_map: dict[str, Concept] A grounding map, a map from label to Concept Returns ------- : The MIRA concept created from the state """ name = state["Py Name"] description = state["Comment"] unit_dict = { "expression": ( state["Units"].replace(" ", "") if state["Units"] else None ) } unit_expr = get_sympy(unit_dict, UNIT_SYMBOLS) units_obj = Unit(expression=unit_expr) if unit_expr else None # If there's something hacked in, use that directly, # keeping the units and description from the model if grounding_map is not None and name in grounding_map: concept = copy.deepcopy(grounding_map[name]) concept.name = name concept.description = description concept.units = units_obj return concept else: print("could not ground", name) return Concept(name=name, units=units_obj, description=description) def preprocess_expression_text(expr_text): """Preprocess a string expression to convert the expression into sympy parseable string Parameters ---------- expr_text : str The string expression Returns ------- : str The processed string expression """ if not expr_text: return expr_text # This regex removes spaces between operands and operators. Also removes spaces between # parenthesis and operators or operands. Works for symbols as well. expr_text = re.sub( r"(?<=[^\w\s])\s+(?=[^\w\s])|(?<=[^\w\s])\s+(?=\w)|(?<=\w)\s+(?=[^\w\s])", "", expr_text, ) # TODO: Use regular expressions for all text preprocessing rather than using string replace # strip leading and trailing white spaces # replace space between two words that makeup a variable name with "_"' # replace single and doubel quotation marks # replace ampersand & with and expr_text = ( expr_text.strip() .replace("^", "**") .replace(" ", "_") .replace("'", "") .replace('"', "") .replace("&", "_") .lower() ) return expr_text def ifthenelse_to_piecewise(expr_text): """Convert Vensim if then else expression to sympy Piecewise string Parameters ---------- expr_text : str The string expression Returns ------- : str The sympy Piecewise expression as a string """ tag = "IF THEN ELSE(" pos = expr_text.index(tag) start_idx = len(tag) + pos # scan through to find positions of commas, keeping track of grouping symbols. # this is akin to a state machine, where we keep track of how many open # parentheses are there, to make sure that we don't consider the comma inside # a function call to be part of the IF THEN ELSE. # Caveat: this does not enable nested IF THEN ELSE statements depth = 0 comma_1_idx, comma_2_idx, end_idx = None, None, None for i in range(start_idx, len(expr_text)): v = expr_text[i] if v == "(": depth += 1 if v == "," and depth == 0: if comma_1_idx is None: comma_1_idx = i elif comma_2_idx is None: comma_2_idx = i else: raise ValueError("too many arguments in IF THEN ELSE") if v == ")": if depth == 0: # we found the final close parentheses end_idx = i depth = 0 condition = expr_text[start_idx:comma_1_idx].strip() then = expr_text[comma_1_idx + 1 : comma_2_idx].strip() else_ = expr_text[comma_2_idx + 1 : end_idx].strip() piecewise = f"Piecewise(({then}, {condition}), ({else_}, True))" # We also need to replace single = with == but make sure if there is a ==, we don't replace # it and also, we need to make sure we don't replace <= or >= piecewise = re.sub(r"(?<!<|>)=(?!=)", "==", piecewise) return piecewise def with_lookup_to_piecewise(expr_text: str) -> str: """Convert a Vensim WITH LOOKUP expression to a piecewise function. The semantics of the ``WITH LOOKUP`` element are documented at https://www.vensim.com/documentation/fn_with_lookup.html. For example, this could come from: .. code-block:: Infection Rate new arrivals= WITH LOOKUP ( Time, ( [(0,0)-(500,100)],(0,0),(1,2),(2,1),(3,0),(4,2),(5,1),(6,2),(7,3),(8,6),(9,2),(10,7\ ),(11,10),(12,4),(13,10),(14,5),(15, 11),(16,14),(17,14),(18,26),(19,34),(20,35),(21,45),(22,55),(23,38),(24,34),(25,24)\ ,(26,40),(27,16),(28,20),(29,12),(30, 23),(31,14),(32,8),(33,14),(34,12),(35,5),(36,9),(37,6),(38,0),(39,0),(40,0),(1000,\ 0) )) ~ Persons/Day ~ | Which ends up being the text (after normalization from vensim) .. code-block:: with_lookup(time,([(0,0)-(500,100)],(0,0),(1,2),(2,1),(3,0),(4,2),(5,1),(6,2),\ (7,3),(8,6),(9,2),(10,7),(11,10),(12,4),(13,10),(1000,0))) """ # there's a variety of ways this is written WITH LOOKUP, with_lookup, withlookup # so just normalize it all out expr_text = expr_text.strip().replace(" ", "").replace("_", "").lower() if not expr_text.lower().startswith("withlookup"): raise ValueError(expr_text) expr_text = expr_text[len("withlookup") :].lstrip("(").rstrip(")") # The first input is either a value or an input variable name # The second input is a list of X,Y pairs variable, second = (x.strip() for x in expr_text.split(",", 1)) second: str = second.lstrip("(").rstrip(")") # This is an undocumented part of this function, but it appears that the # list begins with some kind of definition of the form [(a,b)-(c,d)] if second.startswith("["): box, rest = second.lstrip("[").split("]", 1) x1, y1, x2, y2 = [ float(x.strip()) for x in box.replace("(", "") .replace(")", "") .replace("-", ",") .split(",") ] else: rest = second x1, y1, x2, y2 = [None] * 4 # TODO how to use x1, y1, x2, and y2? It's not clear if/how these are used # to fill in gaps in the lookup table # This gets the list of x,y pairs in order from the lookup table pairs = list( chunked( ( float(x.strip()) for x in rest.strip() .replace("(", "") .replace(")", "") .split(",") if x.strip() ), 2, ) ) # print(variable, x1, y1, x2, y2, pairs) # construct the sympy string as a simple lookup, where the y # value gets returned if the target variable is the given x value # for each x,y pair # FIXME what's the right way to write the conditional here conditions = ",".join(f"({y}, {variable} >= {x})" for x, y in pairs) sympy_str = f"Piecewise({conditions})" return sympy_str def get_identifier_to_expr(pysd_model, name_to_expr_str, concepts): # maps from full length string names to python-appropriate identifiers # maps from python identifier strings to Sympy symbols identifier_to_symbol = { name: sympy.Symbol(name) for name in pysd_model.doc["Py Name"] } name_to_identifier = dict(pysd_model.doc[["Real Name", "Py Name"]].values) # get a subset of states representing flows (i.e., excluding stocks). aux_state_identifiers = { name for name in pysd_model.doc.loc[(pysd_model.doc["Type"] == "Auxiliary")][ "Py Name" ] } # maps sympy symbols for expressions to parsed sympy expressions id_to_expr = {} for real_name, expr_str in name_to_expr_str.items(): processed_expression_str = preprocess_expression_text(expr_str) try: expr = safe_parse_expr( processed_expression_str, identifier_to_symbol ) except TypeError as e: # This is a Stella issue as some expressions aren't created properly for rates logger.warning( f"[{real_name}] failed to parse:\n{processed_expression_str}\n{e}\n" ) expr = SYMPY_FLOW_RATE_PLACEHOLDER id_to_expr[name_to_identifier[real_name]] = expr # look at all expressions from the Vensim model and make a graph # of dependencies where edge (u,v) means u depends on v. # the keys in norm_name_to_expr can be both stocks and flows graph = nx.DiGraph() for identifier, expr in id_to_expr.items(): for arg_symbol in expr.free_symbols: arg_id = str(arg_symbol) graph.add_edge(identifier, arg_id) # get the subgraph of flows, so we can calculate their dependencies # and do recursive substitution flow_dependencies = graph.subgraph(aux_state_identifiers) # Traverse in reverse topological sort order, meaning that at any # position, all the things that position depends on will have # already come. This means we only need one pass for making substitutions # for everything in the current position with ones that have already been # seen and substituted before identifier_ordering = list( reversed(list(nx.topological_sort(flow_dependencies))) ) new_id_to_expr = id_to_expr.copy() for identifier in identifier_ordering: expr = id_to_expr[identifier] # get a set of strings for all symbols that represent flows symbols = set(expr.free_symbols).intersection( sympy.Symbol(s) for s in id_to_expr ) # for each symbol representing a flow, substitute. because we're # traversing in reverse topological order, the new value will always # have only stocks in it, since it also was already substituted # don't substitute parameter values and stocks for symbol in symbols: if ( pysd_model.doc.loc[pysd_model.doc["Py Name"] == str(symbol)][ "Type" ].values[0] == "Constant" or str(symbol) in concepts ): continue expr = expr.subs(symbol, new_id_to_expr[str(symbol)]) new_id_to_expr[identifier] = expr return new_id_to_expr