Source code for cocomico.community
# -*- coding: utf-8 -*-
"""
CoCoMiCo community objects.
"""
import json
import logging
import sys
from functools import cached_property
from importlib.resources import as_file, files
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any, Union
import clyngor
from clyngor.as_pyasp import Atom
from clyngor.decoder import objects_from_model
from cocomico.answer import Answer
from cocomico.base import Biomolecule, Exchange, MetaboliteSet, Reaction, Seeds, Taxon
from cocomico.facts import network_facts, seed_facts
from cocomico.model import Model
from cocomico.sbmlreader import SbmlReader
if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self
# Community needs to be a class because it has methods.
[docs]
class Community(dict[Taxon, Any]):
"""
Communities are collections of models, with additional operations to
calculate metabolic scope and identify exchanged and limiting metabolites.
:param sbml_dir: directory where model files are found
:type sbml_dir: Path
"""
# The SBML reader is a class attribute so that we can guarantee that
# models for the same taxon are shared by reference between communities.
sbml_reader: SbmlReader = SbmlReader()
def __init__(self, *args, **kwargs) -> None:
"""
:param sbml_dir: directory containing models (required)
:type spec: Path
:param models: filenames of models (optional), in sbml_dir or absolute
:type spec: list[Path]
"""
# self.sbml_dir is None or a Path specified by keyword
self.sbml_dir: None | Path = None
if d := kwargs.pop("sbml_dir", None):
self.sbml_dir = Path(d)
assert self.sbml_dir.is_dir()
models = kwargs.pop("models", None)
if models:
# Each model is either an absolute Path or a Path relative to sbml_dir.
# If sbml_dir is not defined, relative Paths fall back on pwd.
paths = [
path if path.is_absolute() else (self.sbml_dir or Path(".")) / path
for path in models
]
else:
# No models specified, so crawl sbml_dir, if defined
paths = list(self.sbml_dir.iterdir()) if self.sbml_dir else []
merge: dict[Taxon, Model] = kwargs.pop("merge", None)
dict.__init__(self, *args, **kwargs)
self.models: dict[Taxon, Model] = {}
self.files: dict[Taxon, Path] = {}
if merge:
self._merge_relations(merge)
elif paths:
for path in paths:
# May raise ValueError
taxon, model = Community.sbml_reader.from_file(path)
self.files[taxon] = Community.sbml_reader.normalize(path, self.sbml_dir)
self[taxon] = self.models[taxon] = model
self.clear_answers()
logging.debug(
"Init %s {%s}",
self,
" ".join(f"{k}: {str(self[k])}" for k in sorted(self.keys())),
)
[docs]
@classmethod
def from_specification(cls, spec: Path, sbml_dir: Path) -> dict[str, Self]:
"""
Factory method to create a set of communities from a JSON specification.
:param spec: filename of a JSON specification
:type spec: Path
:param sbml_dir: directory containing models
:type spec: Path
:rtype: set[Community]
"""
with open(spec, mode="r", encoding="UTF-8") as json_file:
logging.info("Community from specification %s", json_file.name)
specification = json.load(json_file)
communities = {
id: cls(models=[Path(f) for f in files], sbml_dir=sbml_dir)
for id, files in specification.items()
}
for name, community in communities.items():
logging.info("Built %s as %s", name, str(community))
return communities
# Properties of a community: product, reactant, taxa
@cached_property
def products(self) -> MetaboliteSet:
"""
Products of all reactions in the models in the community.
:rtype: MetaboliteSet
"""
return MetaboliteSet(
{
m
for taxon in self.taxa
for m, _, _ in self.models[taxon].relations["product"]
}
)
@cached_property
def reactants(self) -> MetaboliteSet:
"""
Reactants of all reactions in the models in the community.
:rtype: MetaboliteSet
"""
return MetaboliteSet(
{
m
for taxon in self.taxa
for m, _, _ in self.models[taxon].relations["reactant"]
}
)
@cached_property
def biomolecule(self) -> MetaboliteSet:
"""
All biomolecules (molecular species) declared in the models in the community.
:rtype: MetaboliteSet
"""
return MetaboliteSet(
{s for taxon in self.taxa for s in self.models[taxon].biomolecule}
)
@cached_property
def reactions(self) -> set[Reaction]:
"""
All reactions declared in the models in the community, duplicated in reverse
form when the reaction is reversible.
:rtype: list[Reaction]
"""
return {
r
for taxon in self.taxa
for _, r, _ in [
*self.models[taxon].relations["product"],
*self.models[taxon].relations["reactant"],
]
}
@property
def taxa(self) -> set[Taxon]:
"""
Taxa of all models in the community.
:rtype: set[Taxon]
"""
if not self.keys():
# community built from a merge
return set(self.models.keys())
# community defined from model files
return set(self.keys())
def _merge_relations(self, models: dict[Taxon, Model]) -> None:
"""
Create community from merge of (taxon, Model) pairs
"""
for taxon, model in models.items():
self.models[taxon] = model
# invalidate cached value
if self.reactions:
del self.reactions
[docs]
def remove_seed_reactions(self) -> None:
"""
Remove reactions with empty products or reactants, which otherwise would
implicitly create seeds. Some modeling tools generate these to represent
exchanges with the environment.
"""
for model in self.models.values():
logging.debug("Removing seed reactions from %s model %s", self, model)
model.remove_seed_reactions()
# invalidate cached value
if self.reactions:
del self.reactions
@property
def knowledge_base(self):
"""
Generate LP facts for this community
"""
return network_facts(self.models, biomolecules_only=True)
# High-level access to results
[docs]
def scope(self, seeds: Seeds, choice: Union[Taxon, None] = None) -> MetaboliteSet:
"""
Compute community scope
:rtype: MetaboliteSet
"""
answer = self.answers(seeds)
community_name = f'self("{ choice }")' if choice is not None else "all"
return answer.scope.get(community_name, MetaboliteSet())
[docs]
def exchange(self, seeds: Seeds) -> dict[Biomolecule, set[Exchange]]:
"""
Compute community exchanges
:rtype: MetaboliteSet
"""
answer = self.answers(seeds)
return answer.exchange
[docs]
def polyopsonist(self, seeds: Seeds) -> dict[Biomolecule, int]:
"""
Compute community polyopsonies
:rtype: set[Biomolecule]
"""
answer = self.answers(seeds)
return answer.polyopsonist
[docs]
def monopsonist(self, seeds: Seeds) -> dict[Biomolecule, int]:
"""
Compute community monopsonies
:rtype: set[Biomolecule]
"""
answer = self.answers(seeds)
return answer.monopsonist
[docs]
def activated(
self, seeds: Seeds, choice: Union[Taxon, None] = None
) -> set[Reaction]:
"""
Compute community activated
:rtype: set[Reaction]
"""
answer = self.answers(seeds)
community_name = f'self("{ choice }")' if choice is not None else "all"
return answer.activated.get(community_name, set())
[docs]
def produced_seeds(
self, seeds: Seeds, choice: Union[Taxon, None] = None
) -> set[Biomolecule]:
"""
Identify community produced seeds
:rtype: set[Biomolecule]
"""
answer = self.answers(seeds)
community_name = f'self("{ choice }")' if choice is not None else "all"
return answer.produced_seeds.get(community_name, set())
[docs]
def consumed_seeds(
self, seeds: Seeds, choice: Union[Taxon, None] = None
) -> set[Biomolecule]:
"""
identify community consumed seeds
:rtype: set[Biomolecule]
"""
answer = self.answers(seeds)
community_name = f'self("{ choice }")' if choice is not None else "all"
return answer.consumed_seeds.get(community_name, set())
# Compute all answers
[docs]
def answers(self, seeds: Seeds) -> Answer:
"""
Solve LP for this community
"""
if seeds in self._answers:
logging.debug("Answers found for %s seeds %s", self, seeds)
return self._answers[seeds]
logging.info("Answers needed for %s seeds %s", self, seeds)
all_atoms: set[Atom] = set()
for atom in seed_facts(seeds):
all_atoms.add(atom)
for atom in self.knowledge_base:
all_atoms.add(atom)
# get lp program from package resources
score_lp_resource = files("cocomico.encodings").joinpath("general_scope.lp")
output_lp_resource = files("cocomico.encodings").joinpath("output.lp")
# solve using lp program and instance file
with NamedTemporaryFile(mode="w+") as lp_instance:
# write facts to a named file
for element in all_atoms:
lp_instance.write(str(element) + ".\n")
lp_instance.flush()
logging.info("Answers wrote instance to %s", lp_instance.name)
with (
as_file(score_lp_resource) as score_prg,
as_file(output_lp_resource) as out_prg,
):
prg = [score_prg, out_prg, lp_instance.name]
# Imitate clyngor.decode(prg, decoders=[Answer])
logging.info(
"Answers solving %s for %s seeds %s", lp_instance.name, self, seeds
)
*_, best = clyngor.solve(prg, nb_model=1).by_predicate
self._answers[seeds], *_ = objects_from_model(Answer, best)
logging.info("Answers computed for %s seeds %s", self, seeds)
return self._answers[seeds]
[docs]
def seeds(self) -> list[Seeds]:
"""
Sets of seeds for which answers are known.
"""
return list(self._answers)
[docs]
def clear_answers(self) -> None:
"""
Clear the cache of answers.
"""
self._answers: dict[Seeds, Answer] = {}
[docs]
def __str__(self) -> str:
"""String representation."""
return f"Community<{hex(id(self))}>"
def __hash__(self) -> int: # type: ignore[override]
return hash(self.__str__())