# -*- coding: utf-8 -*-
"""
Generate communities by stratified random sampling.
"""
import json
import logging
import random
import sys
from itertools import chain
from pathlib import Path
from typing import Iterable
import click
from . import ess, load_state, save_state
from .generate import gen_added_value, gen_sample
from .name import Name
from .types import Community, Ecosystem, NamedCommunity
from .write import write_specs
DEFAULT_NAME = "eco"
JSON = ".json"
PICKLE = ".pickle"
RANDINITIAL = f"random-initial{PICKLE}"
RANDFINAL = f"random-final{PICKLE}"
@click.group(chain=True)
@click.option("--models-per-spec", type=click.INT, help="Split output specs ≤ N models")
@click.option("--random-seed", type=click.INT, help="Random generator seed")
@click.option(
"--random-state",
type=click.Path(exists=True, path_type=Path),
help="Pickled random generator state",
)
@click.option(
"--loglevel",
type=click.STRING,
help="Logging level INFO, DEBUG, WARN",
default="INFO",
)
@click.pass_context
def main(
ctx: click.Context,
models_per_spec: int | None = None,
random_seed: int | None = None,
random_state: Path | None = None,
loglevel: str = "INFO",
):
"""
Generate communities by stratified random sampling from multiple sources and
a general pool, possibly with minus one / added one post-processing.
Use command help for specific options.
Sampling tasks can be chained in one command line
::
\b
sample-communities \\
sample-chain --dest mix --size 10 --source leaf --source root \\
sample-chain --dest eco --size 10 --added-value --source mix
"""
# Logging setup.
loglevel_int = getattr(logging, loglevel.upper(), logging.INFO)
logging.basicConfig(format="%(asctime)s %(message)s", level=loglevel_int)
logging.debug("Setting loglevel to %s = %s", loglevel, loglevel_int)
# Initialize random generator.
if random_state:
load_state(random_state)
elif random_seed:
logging.debug("Random seed %d", random_seed)
random.seed(random_seed)
# Context for commands
ctx.ensure_object(dict)
ctx.obj["models_per_spec"] = models_per_spec
op_name = click.option("--name", help="Output ecosystem name")
op_reps = click.option("--reps", type=click.INT, help="Number of samples", default=50)
op_added = click.option(
"--added-value/--no-added-value",
help="Also make minus/added communities",
type=click.BOOL,
default=False,
)
op_size = click.option("--size", type=click.INT, multiple=True, help="Sample sizes")
op_pool = click.option(
"--pool",
type=click.Path(dir_okay=True, path_type=Path),
multiple=True,
help="Pool to sample from",
)
op_dest = click.option(
"--dest",
required=True,
type=click.Path(dir_okay=True, path_type=Path),
help="Destination directory",
)
op_source = click.option(
"--source",
type=click.Path(dir_okay=True, file_okay=True, path_type=Path),
multiple=True,
help="Directory or community file defining a source ecosystem",
)
@main.command()
@op_name
@op_reps
@op_added
@op_size
@op_pool
@op_dest
@op_source
@click.argument(
"source_args",
nargs=-1,
type=click.Path(exists=True, dir_okay=True, file_okay=True, path_type=Path),
)
@click.pass_context
def sample(
# pylint: disable=R0913
ctx: click.Context,
name: str | None,
reps: int,
added_value: bool,
size: list[int],
pool: list[Path],
dest: Path,
source: list[Path],
source_args: list[Path],
):
"""
Generate communities by stratified random sampling from multiple sources and
a general pool, possibly with minus one / added one post-processing.
Sources may be specified using --source options.
All remaining arguments are considered as sources.
::
\b
sample-communities sample --dest mix --size 10 leaf root soil
sample-communities sample --dest eco --size 10 ecosystemc_*.json
\b
sample-communities sample --dest av --added-value --pool sbml sample/gut
"""
do_sample(ctx, name, reps, added_value, size, pool, dest, source + source_args)
@main.command()
@op_name
@op_reps
@op_added
@op_size
@op_pool
@op_dest
@op_source
@click.pass_context
def sample_chain(
# pylint: disable=R0913
ctx: click.Context,
name: str | None,
reps: int,
added_value: bool,
size: list[int],
pool: list[Path],
dest: Path,
source: list[Path],
):
"""
Chain sampling tasks.
Generate communities by stratified random sampling from multiple sources and
a general pool, possibly with minus one / added one post-processing.
Each source is specified using a --source option.
::
\b
sample-communities sample-chain --dest mix --size 10 --source leaf --source root
sample-communities sample-chain --dest pairs --size 2 --source mix.json
\b
sample-communities \\
sample-chain --dest mix --size 10 --source leaf --source root \\
sample-chain --dest eco --size 10 --added-value --source mix
\b
sample-communities sample-chain --dest av --added-value --pool sbml --source sample/gut
"""
do_sample(ctx, name, reps, added_value, size, pool, dest, source)
[docs]
def do_sample(
# pylint: disable=R0913
ctx: click.Context,
name: str | None,
reps: int,
added_value: bool,
size: list[int],
pool: list[Path],
dest: Path,
all_sources: list[Path],
):
"""
Generate a sample.
"""
models_per_spec = ctx.obj["models_per_spec"]
random_initial_state = random.getstate()
# Log this sampling task.
logging.info(
"Stratified sampling from %d source%s", len(all_sources), ess(all_sources)
)
logging.info("Generating %d reps per sample", reps)
logging.info("Output to %s/%s*%s", str(dest), str(name or DEFAULT_NAME), JSON)
if pool:
logging.info("Pool%s %s", ess(pool), " ".join(str(f) for f in pool))
if models_per_spec:
logging.debug("Models per spec %d", models_per_spec)
# Get sources and the pool.
logging.info("Sources: %d path%s", len(all_sources), ess(all_sources))
all_srcs: Ecosystem = get_eco(all_sources)
if not all_srcs:
logging.warning("No sources, nothing to do")
sys.exit(0)
logging.info("Pool: %d path%s", len(pool), ess(pool))
pool_eco = get_eco(pool)
all_pool: Community = list(chain.from_iterable(get_coms_from_eco(pool_eco)))
# Generate samples for all sizes and reps.
if not size:
logging.info("Since size=[], set reps=1 for trivial sampling")
samples = gen_sample(
sources=all_srcs, reps=reps if size else 1, sizes=size, pool=all_pool, name=name
)
# Generate added value samples for each sample.
if added_value:
samples = gen_added_value(samples, reps, all_pool, name)
# Save the initial random number generator state
dest.mkdir(exist_ok=True, parents=True)
save_state(dest / RANDINITIAL, random_initial_state)
# Write all samples
logging.info("Sample %s: write to %s/%s", name, dest, name)
output = name or DEFAULT_NAME
write_specs(samples, dest, output, models_per_spec)
# Save the final random number generator state
save_state(dest / RANDFINAL)
[docs]
def get_eco(sources: list[Path]) -> Ecosystem:
"""
Retrieve models from a list of paths and build ecosystems.
Each source community may be a directory, a single model, or
a JSON specification of a set of named communities.
"""
def str_com(com: Community):
"""String representation of a list of Models."""
return " ".join(f.stem if isinstance(f, Path) else f for f in com)
expanded = list(chain.from_iterable(expand_source(src) for src in sources))
logging.debug(
"Expanded (%d): %s", len(expanded), " ".join(str(k) for k, _ in expanded)
)
if any(k is None for k, c in expanded):
logging.debug(
"At least one ordinary file source, returning list of anonymous single community"
)
com: Community = list(chain.from_iterable(c for _, c in expanded))
logging.debug("- 1: (%d) %s", len(com), str_com(com))
return [com]
ecos: Iterable[NamedCommunity] = list(
(Name(name=name), com) for name, com in expanded
)
for n, c in ecos:
logging.debug("- %s %s: (%d) %s", str(n), n.ident, len(c), str_com(c))
return ecos
[docs]
def expand_source(source: Path) -> Iterable[tuple[Name | None, Community]]:
"""
Expand a source into a name and a Path list.
"""
if source.is_dir():
com: Community = list(
f for f in source.iterdir() if ".pickle" not in f.suffixes
)
logging.debug("Expand %s [%s]", str(source), " ".join(str(i) for i in com))
return [(Name(eco=source.stem, com=com), com)]
if PICKLE in source.suffixes:
return []
sources: Iterable[tuple[Name | None, Community]]
try:
# See whether this is a JSON file
with open(source, mode="r", encoding="UTF-8") as json_file:
logging.debug("Attempting to read %s as specification", json_file.name)
specification = json.load(json_file)
logging.debug("Specification %s len %d", json_file.name, len(specification))
sources = [(Name.from_string(k), c) for k, c in specification.items()]
except json.JSONDecodeError:
# Otherwise treat as single model file
sources = [(None, [source])]
return sources
[docs]
def get_coms_from_eco(sources: Ecosystem) -> Iterable[Community]:
"""
Retrieve communities from Ecosystems.
"""
coms = (c[1] if isinstance(c, tuple) else c for c in sources)
return coms
if __name__ == "__main__":
main(obj={})