Source code for cocomico.community

# -*- coding: utf-8 -*-

"""
CoCoMiCo community objects.
"""

import json
import logging
import sys
from functools import cached_property
from importlib.resources import as_file, files
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any, Union

import clyngor
from clyngor.as_pyasp import Atom
from clyngor.decoder import objects_from_model

from cocomico.answer import Answer
from cocomico.base import Biomolecule, Exchange, MetaboliteSet, Reaction, Seeds, Taxon
from cocomico.facts import network_facts, seed_facts
from cocomico.model import Model
from cocomico.sbmlreader import SbmlReader

if sys.version_info >= (3, 11):
    from typing import Self
else:
    from typing_extensions import Self


# Community needs to be a class because it has methods.


[docs] class Community(dict[Taxon, Any]): """ Communities are collections of models, with additional operations to calculate metabolic scope and identify exchanged and limiting metabolites. :param sbml_dir: directory where model files are found :type sbml_dir: Path """ # The SBML reader is a class attribute so that we can guarantee that # models for the same taxon are shared by reference between communities. sbml_reader: SbmlReader = SbmlReader() def __init__(self, *args, **kwargs) -> None: """ :param sbml_dir: directory containing models (required) :type spec: Path :param models: filenames of models (optional), in sbml_dir or absolute :type spec: list[Path] """ # self.sbml_dir is None or a Path specified by keyword self.sbml_dir: None | Path = None if d := kwargs.pop("sbml_dir", None): self.sbml_dir = Path(d) assert self.sbml_dir.is_dir() models = kwargs.pop("models", None) if models: # Each model is either an absolute Path or a Path relative to sbml_dir. # If sbml_dir is not defined, relative Paths fall back on pwd. paths = [ path if path.is_absolute() else (self.sbml_dir or Path(".")) / path for path in models ] else: # No models specified, so crawl sbml_dir, if defined paths = list(self.sbml_dir.iterdir()) if self.sbml_dir else [] merge: dict[Taxon, Model] = kwargs.pop("merge", None) dict.__init__(self, *args, **kwargs) self.models: dict[Taxon, Model] = {} self.files: dict[Taxon, Path] = {} if merge: self._merge_relations(merge) elif paths: for path in paths: # May raise ValueError taxon, model = Community.sbml_reader.from_file(path) self.files[taxon] = Community.sbml_reader.normalize(path, self.sbml_dir) self[taxon] = self.models[taxon] = model self.clear_answers() logging.debug( "Init %s {%s}", self, " ".join(f"{k}: {str(self[k])}" for k in sorted(self.keys())), )
[docs] @classmethod def from_specification(cls, spec: Path, sbml_dir: Path) -> dict[str, Self]: """ Factory method to create a set of communities from a JSON specification. :param spec: filename of a JSON specification :type spec: Path :param sbml_dir: directory containing models :type spec: Path :rtype: set[Community] """ with open(spec, mode="r", encoding="UTF-8") as json_file: logging.info("Community from specification %s", json_file.name) specification = json.load(json_file) communities = { id: cls(models=[Path(f) for f in files], sbml_dir=sbml_dir) for id, files in specification.items() } for name, community in communities.items(): logging.info("Built %s as %s", name, str(community)) return communities
# Properties of a community: product, reactant, taxa @cached_property def products(self) -> MetaboliteSet: """ Products of all reactions in the models in the community. :rtype: MetaboliteSet """ return MetaboliteSet( { m for taxon in self.taxa for m, _, _ in self.models[taxon].relations["product"] } ) @cached_property def reactants(self) -> MetaboliteSet: """ Reactants of all reactions in the models in the community. :rtype: MetaboliteSet """ return MetaboliteSet( { m for taxon in self.taxa for m, _, _ in self.models[taxon].relations["reactant"] } ) @cached_property def biomolecule(self) -> MetaboliteSet: """ All biomolecules (molecular species) declared in the models in the community. :rtype: MetaboliteSet """ return MetaboliteSet( {s for taxon in self.taxa for s in self.models[taxon].biomolecule} ) @cached_property def reactions(self) -> set[Reaction]: """ All reactions declared in the models in the community, duplicated in reverse form when the reaction is reversible. :rtype: list[Reaction] """ return { r for taxon in self.taxa for _, r, _ in [ *self.models[taxon].relations["product"], *self.models[taxon].relations["reactant"], ] } @property def taxa(self) -> set[Taxon]: """ Taxa of all models in the community. :rtype: set[Taxon] """ if not self.keys(): # community built from a merge return set(self.models.keys()) # community defined from model files return set(self.keys()) def _merge_relations(self, models: dict[Taxon, Model]) -> None: """ Create community from merge of (taxon, Model) pairs """ for taxon, model in models.items(): self.models[taxon] = model # invalidate cached value if self.reactions: del self.reactions
[docs] def remove_seed_reactions(self) -> None: """ Remove reactions with empty products or reactants, which otherwise would implicitly create seeds. Some modeling tools generate these to represent exchanges with the environment. """ for model in self.models.values(): logging.debug("Removing seed reactions from %s model %s", self, model) model.remove_seed_reactions() # invalidate cached value if self.reactions: del self.reactions
@property def knowledge_base(self): """ Generate LP facts for this community """ return network_facts(self.models, biomolecules_only=True) # High-level access to results
[docs] def scope(self, seeds: Seeds, choice: Union[Taxon, None] = None) -> MetaboliteSet: """ Compute community scope :rtype: MetaboliteSet """ answer = self.answers(seeds) community_name = f'self("{ choice }")' if choice is not None else "all" return answer.scope.get(community_name, MetaboliteSet())
[docs] def exchange(self, seeds: Seeds) -> dict[Biomolecule, set[Exchange]]: """ Compute community exchanges :rtype: MetaboliteSet """ answer = self.answers(seeds) return answer.exchange
[docs] def polyopsonist(self, seeds: Seeds) -> dict[Biomolecule, int]: """ Compute community polyopsonies :rtype: set[Biomolecule] """ answer = self.answers(seeds) return answer.polyopsonist
[docs] def monopsonist(self, seeds: Seeds) -> dict[Biomolecule, int]: """ Compute community monopsonies :rtype: set[Biomolecule] """ answer = self.answers(seeds) return answer.monopsonist
[docs] def activated( self, seeds: Seeds, choice: Union[Taxon, None] = None ) -> set[Reaction]: """ Compute community activated :rtype: set[Reaction] """ answer = self.answers(seeds) community_name = f'self("{ choice }")' if choice is not None else "all" return answer.activated.get(community_name, set())
[docs] def produced_seeds( self, seeds: Seeds, choice: Union[Taxon, None] = None ) -> set[Biomolecule]: """ Identify community produced seeds :rtype: set[Biomolecule] """ answer = self.answers(seeds) community_name = f'self("{ choice }")' if choice is not None else "all" return answer.produced_seeds.get(community_name, set())
[docs] def consumed_seeds( self, seeds: Seeds, choice: Union[Taxon, None] = None ) -> set[Biomolecule]: """ identify community consumed seeds :rtype: set[Biomolecule] """ answer = self.answers(seeds) community_name = f'self("{ choice }")' if choice is not None else "all" return answer.consumed_seeds.get(community_name, set())
# Compute all answers
[docs] def answers(self, seeds: Seeds) -> Answer: """ Solve LP for this community """ if seeds in self._answers: logging.debug("Answers found for %s seeds %s", self, seeds) return self._answers[seeds] logging.info("Answers needed for %s seeds %s", self, seeds) all_atoms: set[Atom] = set() for atom in seed_facts(seeds): all_atoms.add(atom) for atom in self.knowledge_base: all_atoms.add(atom) # get lp program from package resources score_lp_resource = files("cocomico.encodings").joinpath("general_scope.lp") output_lp_resource = files("cocomico.encodings").joinpath("output.lp") # solve using lp program and instance file with NamedTemporaryFile(mode="w+") as lp_instance: # write facts to a named file for element in all_atoms: lp_instance.write(str(element) + ".\n") lp_instance.flush() logging.info("Answers wrote instance to %s", lp_instance.name) with ( as_file(score_lp_resource) as score_prg, as_file(output_lp_resource) as out_prg, ): prg = [score_prg, out_prg, lp_instance.name] # Imitate clyngor.decode(prg, decoders=[Answer]) logging.info( "Answers solving %s for %s seeds %s", lp_instance.name, self, seeds ) *_, best = clyngor.solve(prg, nb_model=1).by_predicate self._answers[seeds], *_ = objects_from_model(Answer, best) logging.info("Answers computed for %s seeds %s", self, seeds) return self._answers[seeds]
[docs] def seeds(self) -> list[Seeds]: """ Sets of seeds for which answers are known. """ return list(self._answers)
[docs] def clear_answers(self) -> None: """ Clear the cache of answers. """ self._answers: dict[Seeds, Answer] = {}
[docs] def __str__(self) -> str: """String representation.""" return f"Community<{hex(id(self))}>"
def __hash__(self) -> int: # type: ignore[override] return hash(self.__str__())