Source code for mira.dkg.construct

"""
Generate the nodes and edges file for the MIRA domain knowledge graph.

After these are generated, see the /docker folder in the repository for loading
a neo4j instance.

Example command for local bulk import on mac with neo4j 4.x:

.. code::

    neo4j-admin import --database=mira \
        --delimiter='TAB' \
        --force \
        --skip-duplicate-nodes=true \
        --skip-bad-relationships=true \
        --nodes ~/.data/mira/demo/import/nodes.tsv.gz \
        --relationships ~/.data/mira/demo/import/edges.tsv.gz

Then, restart the neo4j service with homebrew ``brew services neo4j restart``
"""

import csv
import gzip
import json
import pickle
import typing
from collections import Counter, defaultdict
from datetime import datetime
from operator import methodcaller
from pathlib import Path
from typing import Dict, NamedTuple, Sequence, Union, Optional

import biomappings
import bioontologies
import click
import pyobo
import pystow
from bioontologies import obograph
from bioontologies.obograph import Xref
from bioregistry import manager
from pydantic import BaseModel, Field
from pyobo.struct import part_of
from pyobo.sources import ontology_resolver
from tabulate import tabulate
from tqdm.auto import tqdm
from typing_extensions import Literal

from mira.dkg.askemo import get_askemo_terms, get_askemosw_terms, get_askem_climate_ontology_terms
from mira.dkg.models import EntityType
from mira.dkg.resources import SLIMS, get_ncbitaxon
from mira.dkg.resources.extract_ncit import get_ncit_subset
from mira.dkg.resources.probonto import get_probonto_terms
from mira.dkg.units import get_unit_terms
from mira.dkg.physical_constants import get_physical_constant_terms
from mira.dkg.constants import EDGE_HEADER, NODE_HEADER
from mira.dkg.utils import PREFIXES

MODULE = pystow.module("mira")
DEMO_MODULE = MODULE.module("demo", "import")
EDGE_NAMES_PATH = DEMO_MODULE.join(name="relation_info.json")
METAREGISTRY_PATH = DEMO_MODULE.join(name="metaregistry.json")

OBSOLETE = {"oboinowl:ObsoleteClass", "oboinowl:ObsoleteProperty"}


class DKGConfig(BaseModel):
    use_case: str
    prefix: Optional[str] = None
    func: Optional[typing.Callable] = None
    iri: Optional[str] = None,
    prefixes: typing.List[str] = Field(default_factory=list)


cases: Dict[str, DKGConfig] = {
    "epi": DKGConfig(
        use_case="epi",
        prefix="askemo",
        func=get_askemo_terms,
        iri="https://github.com/indralab/mira/blob/main/mira/dkg/askemo/askemo.json",
        prefixes=PREFIXES,
    ),
    "space": DKGConfig(
        use_case="space",
        prefix="askemosw",
        func=get_askemosw_terms,
        iri="https://github.com/indralab/mira/blob/main/mira/dkg/askemo/askemosw.json",
    ),
    "eco": DKGConfig(
        use_case="eco",
        prefixes=["hgnc", "ncbitaxon", "ecocore", "probonto", "reactome"],
    ),
    "genereg": DKGConfig(
        use_case="genereg",
        prefixes=["hgnc", "go", "wikipathways", "probonto"],
    ),
    "climate": DKGConfig(
        use_case="climate",
        prefix="askem.climate",
        func=get_askem_climate_ontology_terms,
        prefixes=["probonto"],
        iri="https://github.com/indralab/mira/blob/main/mira/dkg/askemo/askem.climate.json",
    ),
}


[docs]class UseCasePaths: """A configuration containing the file paths for use case-specific files.""" def __init__(self, use_case: str, config: Optional[DKGConfig] = None): self.use_case = use_case self.config = config or cases[self.use_case] self.askemo_prefix = self.config.prefix self.askemo_getter = self.config.func self.askemo_url = self.config.iri self.prefixes = self.config.prefixes self.module = MODULE.module(self.use_case) self.UNSTANDARDIZED_NODES_PATH = self.module.join( name="unstandardized_nodes.tsv" ) self.UNSTANDARDIZED_EDGES_PATH = self.module.join( name="unstandardized_edges.tsv" ) self.SUB_EDGE_COUNTER_PATH = self.module.join( name="count_subject_prefix_predicate.tsv" ) self.SUB_EDGE_TARGET_COUNTER_PATH = self.module.join( name="count_subject_prefix_predicate_target_prefix.tsv" ) self.EDGE_OBJ_COUNTER_PATH = self.module.join( name="count_predicate_object_prefix.tsv" ) self.EDGE_COUNTER_PATH = self.module.join(name="count_predicate.tsv") self.NODES_PATH = self.module.join(name="nodes.tsv.gz") self.EDGES_PATH = self.module.join(name="edges.tsv.gz") self.EMBEDDINGS_PATH = self.module.join(name="embeddings.tsv.gz") prefixes = list(self.prefixes) if self.askemo_prefix: prefixes.append(self.askemo_prefix) if self.use_case == "space": prefixes.append("uat") if self.use_case == "climate": prefixes.append("eiffel") self.EDGES_PATHS: Dict[str, Path] = { prefix: self.module.join("sources", name=f"edges_{prefix}.tsv") for prefix in prefixes } self.RDF_TTL_PATH = self.module.join(name="dkg.ttl.gz")
LABELS = { "http://www.w3.org/2000/01/rdf-schema#isDefinedBy": "is defined by", "rdf:type": "type", "http://www.w3.org/1999/02/22-rdf-syntax-ns#type": "type", # FIXME deal with these relations "http://purl.obolibrary.org/obo/uberon/core#proximally_connected_to": "proximally_connected_to", "http://purl.obolibrary.org/obo/uberon/core#extends_fibers_into": "proximally_connected_to", "http://purl.obolibrary.org/obo/uberon/core#channel_for": "proximally_connected_to", "http://purl.obolibrary.org/obo/uberon/core#distally_connected_to": "proximally_connected_to", "http://purl.obolibrary.org/obo/uberon/core#channels_into": "channels_into", "http://purl.obolibrary.org/obo/uberon/core#channels_from": "channels_from", "http://purl.obolibrary.org/obo/uberon/core#subdivision_of": "subdivision_of", "http://purl.obolibrary.org/obo/uberon/core#protects": "protects", "http://purl.obolibrary.org/obo/uberon/core#posteriorly_connected_to": "posteriorly_connected_to", "http://purl.obolibrary.org/obo/uberon/core#evolved_from": "evolved_from", "http://purl.obolibrary.org/obo/uberon/core#anteriorly_connected_to": "anteriorly_connected_to", } DEFAULT_VOCABS = [ "oboinowl", "ro", "bfo", "owl", "rdfs", "bspo", # "gorel", "iao", # "sio", "omo", "debio", ]
[docs]class NodeInfo(NamedTuple): curie: str # the id used in neo4j prefix: str # the field used for neo4j labels. can contain semicolon-delimited label: str # the human-readable label synonyms: str deprecated: Literal["true", "false"] # need this for neo4j type: EntityType definition: str xrefs: str alts: str version: str property_predicates: str property_values: str xref_types: str synonym_types: str
@click.command() @click.option( "--add-xref-edges", is_flag=True, help="Add edges for xrefs to external ontology terms", ) @click.option( "--summaries", is_flag=True, help="Print summaries of nodes and edges while building", ) @click.option("--do-upload", is_flag=True, help="Upload to S3 on completion") @click.option("--refresh", is_flag=True, help="Refresh caches") @click.option("--use-case", default="epi", type=click.Choice(list(cases))) def main( add_xref_edges: bool, summaries: bool, do_upload: bool, refresh: bool, use_case: str, ): """Generate the node and edge files.""" if Path(use_case).is_file(): config = DKGConfig.parse_file(use_case) use_case = config.use_case else: config = None construct( use_case=use_case, config=config, refresh=refresh, do_upload=do_upload, add_xref_edges=True, summaries=summaries ) def construct( use_case: Optional[str] = None, config: Optional[DKGConfig] = None, *, refresh: bool = False, do_upload: bool = False, add_xref_edges: bool = False, summaries: bool = False, ): use_case_paths = UseCasePaths(use_case or config.use_case, config=config) if EDGE_NAMES_PATH.is_file(): edge_names = json.loads(EDGE_NAMES_PATH.read_text()) else: edge_names = {} for edge_prefix in DEFAULT_VOCABS: click.secho(f"Caching {manager.get_name(edge_prefix)}", fg="green", bold=True) parse_results = bioontologies.get_obograph_by_prefix(edge_prefix) for edge_graph in parse_results.graph_document.graphs: edge_graph = edge_graph.standardize() for edge_node in edge_graph.nodes: if edge_node.deprecated or edge_node.id.startswith("_:genid"): continue if not edge_node.name: if edge_node.id in LABELS: edge_node.name = LABELS[edge_node.id] elif edge_node.prefix: edge_node.name = edge_node.identifier else: click.secho(f"missing label for {edge_node.curie}") continue if not edge_node.prefix: tqdm.write(f"unparsable IRI: {edge_node.id} - {edge_node.name}") continue edge_names[edge_node.curie] = edge_node.name.strip() EDGE_NAMES_PATH.write_text(json.dumps(edge_names, sort_keys=True, indent=2)) # A mapping from CURIEs to node information tuples nodes: Dict[str, NodeInfo] = {} # A mapping from CURIEs to a set of source strings node_sources = defaultdict(set) unstandardized_nodes = [] unstandardized_edges = [] edge_usage_counter = Counter() subject_edge_usage_counter = Counter() subject_edge_target_usage_counter = Counter() edge_target_usage_counter = Counter() if use_case_paths.askemo_getter is not None: if use_case_paths.askemo_prefix is None: raise ValueError askemo_edges = [] click.secho(f"ASKEM custom: {use_case_paths.askemo_prefix}", fg="green", bold=True) for term in tqdm(use_case_paths.askemo_getter().values(), unit="term"): property_predicates = [] property_values = [] if term.suggested_unit: property_predicates.append("suggested_unit") property_values.append(term.suggested_unit) if term.suggested_data_type: property_predicates.append("suggested_data_type") property_values.append(term.suggested_data_type) if term.physical_min is not None: property_predicates.append("physical_min") property_values.append(str(term.physical_min)) if term.physical_max is not None: property_predicates.append("physical_max") property_values.append(str(term.physical_max)) if term.typical_min is not None: property_predicates.append("typical_min") property_values.append(str(term.typical_min)) if term.typical_max is not None: property_predicates.append("typical_max") property_values.append(str(term.typical_max)) node_sources[term.id].add(use_case_paths.askemo_prefix) nodes[term.id] = NodeInfo( curie=term.id, prefix=term.prefix, label=term.name, synonyms=";".join(synonym.value for synonym in term.synonyms or []), deprecated="false", type=term.type, definition=term.description, xrefs=";".join(xref.id for xref in term.xrefs or []), alts="", version="1.0", property_predicates=";".join(property_predicates), property_values=";".join(property_values), xref_types=";".join( xref.type or "oboinowl:hasDbXref" for xref in term.xrefs or [] ), synonym_types=";".join( synonym.type or "oboInOwl:hasExactSynonym" for synonym in term.synonyms or [] ), ) for parent_curie in term.parents: askemo_edges.append( ( term.id, parent_curie, "subclassof", "rdfs:subClassOf", use_case_paths.askemo_prefix, use_case_paths.askemo_url, "", ) ) with use_case_paths.EDGES_PATHS[use_case_paths.askemo_prefix].open("w") as file: writer = csv.writer(file, delimiter="\t", quoting=csv.QUOTE_MINIMAL) writer.writerow(EDGE_HEADER) writer.writerows(askemo_edges) # Probability distributions probonto_edges = [] for term in tqdm(get_probonto_terms(), unit="term", desc="Loading probonto"): curie, name, parameters = term["curie"], term["name"], term["parameters"] node_sources[curie].add("probonto") property_predicates = ["has_parameter" for _ in range(len(parameters))] property_values = [parameter["name"].replace("\n", " ") for parameter in parameters] nodes[curie] = NodeInfo( curie=curie, prefix="probonto", label=name, synonyms="", deprecated="false", type="class", definition="", xrefs=";".join(eq["curie"] for eq in term.get("equivalent", [])), alts="", version="2.5", property_predicates=";".join(property_predicates), property_values=";".join(property_values), xref_types=";".join("askemo:0000016" for _eq in term.get("equivalent", [])), synonym_types="", ) # Add equivalents? for parameter in term.get("parameters", []): parameter_curie, parameter_name = parameter["curie"], parameter["name"] synonyms = [] synonym_types = [] parameter_symbol = parameter.get("symbol") if parameter_symbol: synonyms.append(parameter_symbol) synonym_types.append("referenced_by_latex") parameter_short = parameter.get("short_name") if parameter_short: synonyms.append(parameter_short) synonym_types.append("oboInOwl:hasExactSynonym") nodes[parameter_curie] = NodeInfo( curie=parameter_curie, prefix="probonto", label=parameter_name, synonyms=";".join(synonyms), deprecated="false", type="class", definition="", xrefs="", alts="", version="2.5", property_predicates="", property_values="", xref_types="", synonym_types=";".join(synonym_types), ) probonto_edges.append(( curie, parameter_curie, "has_parameter", "probonto:c0000062", "probonto", "https://raw.githubusercontent.com/probonto/ontology/master/probonto4ols.owl", "2.5", )) with use_case_paths.EDGES_PATHS["probonto"].open("w") as file: writer = csv.writer(file, delimiter="\t", quoting=csv.QUOTE_MINIMAL) writer.writerow(EDGE_HEADER) writer.writerows(probonto_edges) if use_case == "climate": from .resources.cso import get_cso_obo for term in get_cso_obo().iter_terms(): node_sources[term.curie].add("cso") nodes[term.curie] = get_node_info(term) from .resources.extract_eiffel_ontology import get_eiffel_ontology_terms eiffel_edges = [] for term in tqdm(get_eiffel_ontology_terms(), unit="term", desc="Eiffel"): node_sources[term.curie].add("eiffel") nodes[term.curie] = get_node_info(term) for typedef, object_references in term.relationships.items(): for object_reference in object_references: eiffel_edges.append( ( term.curie, object_reference.curie, typedef.name.replace(" ", "").lower(), typedef.curie, "eiffel", "eiffel", "", ) ) with use_case_paths.EDGES_PATHS["eiffel"].open("w") as file: writer = csv.writer(file, delimiter="\t", quoting=csv.QUOTE_MINIMAL) writer.writerow(EDGE_HEADER) writer.writerows(eiffel_edges) if use_case == "epi": from .resources.geonames import get_geonames_terms geonames_edges = [] for term in tqdm(get_geonames_terms(), unit="term", desc="Geonames"): node_sources[term.curie].add("geonames") nodes[term.curie] = get_node_info(term, type="individual") for parent in term.get_relationships(part_of): geonames_edges.append( ( term.curie, parent.curie, "part_of", part_of.curie.lower(), "geonames", "geonames", "", ) ) with use_case_paths.EDGES_PATHS["geonames"].open("w") as file: writer = csv.writer(file, delimiter="\t", quoting=csv.QUOTE_MINIMAL) writer.writerow(EDGE_HEADER) writer.writerows(geonames_edges) # extras from NCIT for term in tqdm(get_ncit_subset(), unit="term", desc="NCIT"): node_sources[term.curie].add("ncit") nodes[term.curie] = get_node_info(term, type="class") # TODO add edges later, if needed for term in tqdm(get_ncbitaxon(), unit="term", desc="NCBITaxon"): node_sources[term.curie].add("ncbitaxon") nodes[term.curie] = get_node_info(term, type="class") # TODO add edges to source file later, if important if use_case == "space": from .resources.uat import get_uat uat_ontology = get_uat() uat_edges = [] for term in tqdm(uat_ontology, unit="term", desc="UAT"): node_sources[term.curie].add(uat_ontology.ontology) nodes[term.curie] = NodeInfo( curie=term.curie, prefix=term.prefix, label=term.name, synonyms=";".join(synonym.name for synonym in term.synonyms or []), deprecated="false", type="class", definition=term.definition, xrefs=";".join(xref.curie for xref in term.xrefs or []), alts="", version="5.0", property_predicates="", property_values="", xref_types="", # TODO synonym_types=";".join( synonym.type.curie if synonym.type is not None else "skos:exactMatch" for synonym in term.synonyms or [] ), ) for parent in term.parents: uat_edges.append( ( term.curie, parent.curie, "subclassof", "rdfs:subClassOf", uat_ontology.ontology, uat_ontology.ontology, "5.0", ) ) with use_case_paths.EDGES_PATHS[uat_ontology.ontology].open("w") as file: writer = csv.writer(file, delimiter="\t", quoting=csv.QUOTE_MINIMAL) writer.writerow(EDGE_HEADER) writer.writerows(uat_edges) click.secho("Units", fg="green", bold=True) for wikidata_id, label, description, synonyms, xrefs in tqdm(get_unit_terms(), unit="unit", desc="Units"): curie = f"wikidata:{wikidata_id}" node_sources[curie].add("wikidata") nodes[curie] = NodeInfo( curie=curie, prefix="wikidata;unit", label=label, synonyms=";".join(synonyms), deprecated="false", type="class", definition=description, xrefs=";".join(xrefs), alts="", version="", property_predicates="", property_values="", xref_types=";".join("oboinowl:hasDbXref" for _ in xrefs), synonym_types="", ) click.secho("Physical Constants", fg="green", bold=True) for wikidata_id, label, description, synonyms, xrefs, value, formula, symbols in tqdm( get_physical_constant_terms(), desc="Physical Constants" ): curie = f"wikidata:{wikidata_id}" node_sources[curie].add("wikidata") prop_predicates, prop_values = [], [] if value: prop_predicates.append("debio:0000042") prop_values.append(str(value)) # TODO process mathml and make readable # if formula: # prop_predicates.append("debio:0000043") # prop_values.append(str(formula)) synonym_types, synonym_values = [], [] for syn in synonyms: synonym_values.append(syn) synonym_types.append("oboInOwl:hasExactSynonym") for symbol in symbols: synonym_values.append(symbol) synonym_types.append("debio:0000031") nodes[curie] = NodeInfo( curie=curie, prefix="wikidata;constant", label=label, synonyms=";".join(synonym_values), synonym_types=";".join(synonym_types), deprecated="false", type="class", definition=description, xrefs=";".join(xrefs), xref_types=";".join("oboinowl:hasDbXref" for _ in xrefs), alts="", version="", property_predicates=";".join(prop_predicates), property_values=";".join(prop_values), ) def _get_edge_name(curie_: str, strict: bool = False) -> str: if curie_ in LABELS: return LABELS[curie_] elif curie_ in edge_names: return edge_names[curie_] elif curie_ in nodes: return nodes[curie_][2] elif strict: raise ValueError( f"Can not infer name for edge curie: {curie_}. Add an entry to the LABELS dictionary" ) else: return curie_ biomappings_xref_graph = biomappings.get_true_graph() added_biomappings = 0 for prefix in use_case_paths.prefixes: if prefix in {"geonames", "uat", "probonto"}: # added with custom code continue edges = [] _results_pickle_path = DEMO_MODULE.join("parsed", name=f"{prefix}.pkl") if _results_pickle_path.is_file() and not refresh: parse_results = pickle.loads(_results_pickle_path.read_bytes()) else: if prefix in SLIMS: parse_results = bioontologies.get_obograph_by_path(SLIMS[prefix]) elif _pyobo_has(prefix): obo = pyobo.get_ontology(prefix) parse_results = pyobo.parse_results_from_obo(obo) else: parse_results = bioontologies.get_obograph_by_prefix(prefix) if parse_results.graph_document is None: click.secho( f"{manager.get_name(prefix)} has no graph document", fg="red", bold=True, ) _results_pickle_path.write_bytes(pickle.dumps(parse_results)) continue # Standardize graphs before caching parse_results.graph_document.graphs = [ graph.standardize(tqdm_kwargs=dict(leave=False)) for graph in tqdm( parse_results.graph_document.graphs, unit="graph", desc=f"Standardizing graphs from {prefix}", leave=False, ) ] _results_pickle_path.write_bytes(pickle.dumps(parse_results)) if parse_results.graph_document is None: click.secho(f"No graphs in {prefix}, skipping", fg="red") continue _graphs = parse_results.graph_document.graphs click.secho( f"{manager.get_name(prefix)} ({len(_graphs)} graphs)", fg="green", bold=True ) for graph in tqdm(_graphs, unit="graph", desc=prefix, leave=False): graph_id = graph.id or prefix version = graph.version if version == "imports": version = None for node in graph.nodes: if node.deprecated or not node.reference: continue if node.id.startswith("_:gen"): # skip blank nodes continue try: curie = node.curie except ValueError: tqdm.write(f"error parsing {node.id}") continue if node.curie.startswith("_:gen"): continue node_sources[curie].add(prefix) if curie not in nodes or (curie in nodes and prefix == node.prefix): # TODO filter out properties that are covered elsewhere properties = sorted( (prop.predicate.curie, prop.value.curie) for prop in node.properties if prop.predicate and prop.value ) property_predicates, property_values = [], [] for pred_curie, val_curie in properties: property_predicates.append(pred_curie) property_values.append(val_curie) xref_predicates, xref_references = [], [] for xref in node.xrefs or []: if xref.predicate and xref.value: xref_predicates.append(xref.predicate.curie) xref_references.append(xref.value.curie) if node.curie in biomappings_xref_graph: for xref_curie in biomappings_xref_graph.neighbors(node.curie): if ":" not in xref_curie: continue added_biomappings += 1 xref_predicate = biomappings_xref_graph.edges[node.curie, xref_curie][ "relation" ] if xref_predicate == "speciesSpecific": xref_predicate = "debio:0000003" xref_predicates.append(xref_predicate) xref_references.append(xref_curie) nodes[curie] = NodeInfo( curie=node.curie, prefix=node.prefix, label=node.name.strip('"') .strip() .strip('"') .replace("\n", " ") .replace(" ", " ") if node.name else "", synonyms=";".join(synonym.value for synonym in node.synonyms), deprecated="true" if node.deprecated else "false", # type:ignore # TODO better way to infer type based on hierarchy # (e.g., if rdfs:type available, consider as instance) type=node.type.lower() if node.type else "unknown", # type:ignore definition=(node.definition or "") .replace('"', "") .replace("\n", " ") .replace(" ", " "), xrefs=";".join(xref_references), alts=";".join(node.alternative_ids), version=version or "", property_predicates=";".join(property_predicates), property_values=";".join(property_values), xref_types=";".join(xref_predicates), synonym_types=";".join( synonym.predicate.curie if synonym.predicate else synonym.predicate_raw for synonym in node.synonyms ), ) if node.replaced_by: edges.append( ( node.replaced_by, node.curie, "replaced_by", "iao:0100001", prefix, graph_id, version or "", ) ) if node.replaced_by not in nodes: node_sources[node.replaced_by].add(prefix) nodes[node.replaced_by] = NodeInfo( node.replaced_by, node.replaced_by.split(":", 1)[0], label="", synonyms="", deprecated="true", type="class", definition="", xrefs="", alts="", version="", property_predicates="", property_values="", xref_types="", synonym_types="", ) if add_xref_edges: for xref in node.xrefs: if not isinstance(xref, Xref): raise TypeError(f"Invalid type: {type(xref)}: {xref}") if not xref.value: continue if xref.value.prefix in obograph.PROVENANCE_PREFIXES: # Don't add provenance information as xrefs continue edges.append( ( node.curie, xref.value.curie, "xref", "oboinowl:hasDbXref", prefix, graph_id, version or "", ) ) if xref.value.curie not in nodes: node_sources[node.replaced_by].add(prefix) nodes[xref.value.curie] = NodeInfo( curie=xref.value.curie, prefix=xref.value.prefix, label="", synonyms="", deprecated="false", type="class", definition="", xrefs="", alts="", version="", property_predicates="", property_values="", xref_types="", synonym_types="", ) for provenance in node.get_provenance(): if ":" in provenance.identifier: tqdm.write(f"Malformed provenance for {node.curie}: {provenance}") provenance_curie = provenance.curie node_sources[provenance_curie].add(prefix) if provenance_curie not in nodes: nodes[provenance_curie] = NodeInfo( curie=provenance_curie, prefix=provenance.prefix, label="", synonyms="", deprecated="false", type="class", definition="", xrefs="", alts="", version="", property_predicates="", property_values="", xref_types="", synonym_types="", ) edges.append( ( node.curie, provenance_curie, "has_citation", "debio:0000029", prefix, graph_id, version or "", ) ) if summaries: counter = Counter(node.prefix for node in graph.nodes) tqdm.write( "\n" + tabulate( [ (k, count, manager.get_name(k) if k is not None else "") for k, count in counter.most_common() ], headers=["prefix", "count", "name"], tablefmt="github", # intfmt=",", ) ) edge_counter = Counter( edge.predicate.curie for edge in graph.edges if edge.predicate is not None ) tqdm.write( "\n" + tabulate( [ (pred_curie, count, _get_edge_name(pred_curie, strict=True)) for pred_curie, count in edge_counter.most_common() ], headers=["predicate", "count", "name"], tablefmt="github", # intfmt=",", ) + "\n" ) unstandardized_nodes.extend(node.id for node in graph.nodes if not node.reference) unstandardized_edges.extend( edge.pred for edge in graph.edges if edge.predicate is None ) clean_edges = ( edge for edge in graph.edges if ( edge.subject is not None and edge.predicate is not None and edge.object is not None and edge.object.curie not in OBSOLETE ) ) edges.extend( ( edge.subject.curie, edge.object.curie, _get_edge_name(edge.predicate.curie).lower().replace(" ", "_").replace("-", "_"), edge.predicate.curie, prefix, graph_id, version or "", ) for edge in tqdm( sorted(clean_edges, key=methodcaller("as_tuple")), unit="edge", unit_scale=True ) ) for sub, obj, pred_label, pred, *_ in edges: edge_target_usage_counter[pred, pred_label, obj.split(":")[0]] += 1 subject_edge_usage_counter[sub.split(":")[0], pred, pred_label] += 1 subject_edge_target_usage_counter[ sub.split(":")[0], pred, pred_label, obj.split(":")[0] ] += 1 edge_usage_counter[pred, pred_label] += 1 edges_path = use_case_paths.EDGES_PATHS[prefix] with edges_path.open("w") as file: writer = csv.writer(file, delimiter="\t", quoting=csv.QUOTE_MINIMAL) writer.writerow(EDGE_HEADER) writer.writerows(edges) tqdm.write(f"output edges to {edges_path}") tqdm.write(f"incorporated {added_biomappings:,} xrefs from biomappings") with gzip.open(use_case_paths.NODES_PATH, "wt") as file: writer = csv.writer(file, delimiter="\t", quoting=csv.QUOTE_MINIMAL) writer.writerow(NODE_HEADER) writer.writerows( ( (*node, ";".join(sorted(node_sources[curie]))) for curie, node in tqdm(sorted(nodes.items()), unit="node", unit_scale=True) ) ) tqdm.write(f"output edges to {use_case_paths.NODES_PATH}") # CAT edge files together with gzip.open(use_case_paths.EDGES_PATH, "wt") as file: writer = csv.writer(file, delimiter="\t", quoting=csv.QUOTE_MINIMAL) writer.writerow(EDGE_HEADER) for prefix, edge_path in tqdm(sorted(use_case_paths.EDGES_PATHS.items()), desc="cat edges"): with edge_path.open() as edge_file: reader = csv.reader(edge_file, delimiter="\t", quoting=csv.QUOTE_MINIMAL) _header = next(reader) writer.writerows(reader) unstandardized_nodes_counter = Counter(unstandardized_nodes) _write_counter(use_case_paths.UNSTANDARDIZED_NODES_PATH, unstandardized_nodes_counter, title="url") unstandardized_edges_counter = Counter(unstandardized_edges) _write_counter(use_case_paths.UNSTANDARDIZED_EDGES_PATH, unstandardized_edges_counter, title="url") _write_counter( use_case_paths.EDGE_OBJ_COUNTER_PATH, edge_target_usage_counter, unpack=True, title=("predicate", "predicate_label", "object_prefix"), ) _write_counter( use_case_paths.SUB_EDGE_COUNTER_PATH, subject_edge_usage_counter, unpack=True, title=("subject_prefix", "predicate", "predicate_label"), ) _write_counter( use_case_paths.SUB_EDGE_TARGET_COUNTER_PATH, subject_edge_target_usage_counter, unpack=True, title=("subject_prefix", "predicate", "predicate_label", "object_prefix"), ) _write_counter( use_case_paths.EDGE_COUNTER_PATH, edge_usage_counter, unpack=True, title=("predicate", "predicate_label"), ) if do_upload: upload_neo4j_s3(use_case_paths=use_case_paths) from .construct_rdf import _construct_rdf _construct_rdf(upload=do_upload, use_case_paths=use_case_paths) from .construct_registry import EPI_CONF_PATH, _construct_registry _construct_registry( config_path=EPI_CONF_PATH, output_path=METAREGISTRY_PATH, upload=do_upload, ) from .construct_embeddings import _construct_embeddings _construct_embeddings(upload=do_upload, use_case_paths=use_case_paths) return use_case_paths def _write_counter( path: Path, counter: Counter, title: Union[None, str, Sequence[str]] = None, unpack: bool = False, ) -> None: with path.open("w") as file: if title: if unpack: print(*title, "count", sep="\t", file=file) else: print(title, "count", sep="\t", file=file) for key, count in counter.most_common(): if unpack: print(*key, count, sep="\t", file=file) else: print(key, count, sep="\t", file=file)
[docs]def upload_s3( path: Path, *, use_case: str, bucket: str = "askem-mira", s3_client=None ) -> None: """Upload the nodes and edges to S3.""" if s3_client is None: import boto3 s3_client = boto3.client("s3") today = datetime.today().strftime("%Y-%m-%d") # don't include a preceding or trailing slash key = f"dkg/{use_case}/build/{today}/" config = { # https://stackoverflow.com/questions/41904806/how-to-upload-a-file-to-s3-and-make-it-public-using-boto3 "ACL": "public-read", "StorageClass": "INTELLIGENT_TIERING", } s3_client.upload_file( Filename=path.as_posix(), Bucket=bucket, Key=key + path.name, ExtraArgs=config, )
[docs]def upload_neo4j_s3(use_case_paths: UseCasePaths) -> None: """Upload the nodes and edges to S3.""" import boto3 s3_client = boto3.client("s3") paths = [ use_case_paths.UNSTANDARDIZED_EDGES_PATH, use_case_paths.UNSTANDARDIZED_NODES_PATH, use_case_paths.NODES_PATH, use_case_paths.EDGES_PATH, use_case_paths.SUB_EDGE_COUNTER_PATH, use_case_paths.SUB_EDGE_TARGET_COUNTER_PATH, use_case_paths.EDGE_OBJ_COUNTER_PATH, use_case_paths.EDGE_COUNTER_PATH, ] for path in tqdm(paths): tqdm.write(f"uploading {path}") upload_s3(path=path, s3_client=s3_client, use_case=use_case_paths.use_case)
def get_node_info(term: pyobo.Term, type: EntityType = "class"): return NodeInfo( curie=term.curie, prefix=term.prefix, label=term.name, synonyms=";".join(synonym.name for synonym in term.synonyms or []), deprecated="false", type=type, definition=term.definition or "", xrefs="", alts="", version="", property_predicates="", property_values="", xref_types="", synonym_types=";".join( synonym.type.curie if synonym.type is not None else "oboInOwl:hasExactSynonym" for synonym in term.synonyms or [] ), ) def _pyobo_has(prefix: str) -> bool: try: ontology_resolver.lookup(prefix) except KeyError: return False return True if __name__ == "__main__": main()