Source code for sampling.cwl

# -*- coding: utf-8 -*-

"""
Generate communities by stratified random sampling.
"""

import json
import logging
import random
import sys
from itertools import chain
from pathlib import Path
from typing import Iterable

import click

from . import ess, load_state, save_state
from .generate import gen_added_value, gen_sample
from .name import Name
from .types import Community, Ecosystem, NamedCommunity
from .write import write_specs

DEFAULT_NAME = "eco"
JSON = ".json"
PICKLE = ".pickle"
RANDINITIAL = f"random-initial{PICKLE}"
RANDFINAL = f"random-final{PICKLE}"


@click.group(chain=True)
@click.option("--models-per-spec", type=click.INT, help="Split output specs ≤ N models")
@click.option("--random-seed", type=click.INT, help="Random generator seed")
@click.option(
    "--random-state",
    type=click.Path(exists=True, path_type=Path),
    help="Pickled random generator state",
)
@click.option(
    "--loglevel",
    type=click.STRING,
    help="Logging level INFO, DEBUG, WARN",
    default="INFO",
)
@click.pass_context
def main(
    ctx: click.Context,
    models_per_spec: int | None = None,
    random_seed: int | None = None,
    random_state: Path | None = None,
    loglevel: str = "INFO",
):
    """
    Generate communities by stratified random sampling from multiple sources and
    a general pool, possibly with minus one / added one post-processing.
    Use command help for specific options.

    Sampling tasks can be chained in one command line

    ::

    \b
    sample-communities \\
        sample-chain --dest mix --size 10 --source leaf --source root \\
        sample-chain --dest eco --size 10 --added-value --source mix
    """
    # Logging setup.
    loglevel_int = getattr(logging, loglevel.upper(), logging.INFO)
    logging.basicConfig(format="%(asctime)s %(message)s", level=loglevel_int)
    logging.debug("Setting loglevel to %s = %s", loglevel, loglevel_int)

    # Initialize random generator.
    if random_state:
        load_state(random_state)
    elif random_seed:
        logging.debug("Random seed %d", random_seed)
        random.seed(random_seed)

    # Context for commands
    ctx.ensure_object(dict)
    ctx.obj["models_per_spec"] = models_per_spec


op_name = click.option("--name", help="Output ecosystem name")
op_reps = click.option("--reps", type=click.INT, help="Number of samples", default=50)
op_added = click.option(
    "--added-value/--no-added-value",
    help="Also make minus/added communities",
    type=click.BOOL,
    default=False,
)
op_size = click.option("--size", type=click.INT, multiple=True, help="Sample sizes")
op_pool = click.option(
    "--pool",
    type=click.Path(dir_okay=True, path_type=Path),
    multiple=True,
    help="Pool to sample from",
)
op_dest = click.option(
    "--dest",
    required=True,
    type=click.Path(dir_okay=True, path_type=Path),
    help="Destination directory",
)
op_source = click.option(
    "--source",
    type=click.Path(dir_okay=True, file_okay=True, path_type=Path),
    multiple=True,
    help="Directory or community file defining a source ecosystem",
)


@main.command()
@op_name
@op_reps
@op_added
@op_size
@op_pool
@op_dest
@op_source
@click.argument(
    "source_args",
    nargs=-1,
    type=click.Path(exists=True, dir_okay=True, file_okay=True, path_type=Path),
)
@click.pass_context
def sample(
    # pylint: disable=R0913
    ctx: click.Context,
    name: str | None,
    reps: int,
    added_value: bool,
    size: list[int],
    pool: list[Path],
    dest: Path,
    source: list[Path],
    source_args: list[Path],
):
    """
    Generate communities by stratified random sampling from multiple sources and
    a general pool, possibly with minus one / added one post-processing.
    Sources may be specified using --source options.
    All remaining arguments are considered as sources.

    ::

    \b
    sample-communities sample --dest mix --size 10 leaf root soil
    sample-communities sample --dest eco --size 10 ecosystemc_*.json

    \b
    sample-communities sample --dest av --added-value --pool sbml sample/gut
    """
    do_sample(ctx, name, reps, added_value, size, pool, dest, source + source_args)


@main.command()
@op_name
@op_reps
@op_added
@op_size
@op_pool
@op_dest
@op_source
@click.pass_context
def sample_chain(
    # pylint: disable=R0913
    ctx: click.Context,
    name: str | None,
    reps: int,
    added_value: bool,
    size: list[int],
    pool: list[Path],
    dest: Path,
    source: list[Path],
):
    """
    Chain sampling tasks.
    Generate communities by stratified random sampling from multiple sources and
    a general pool, possibly with minus one / added one post-processing.
    Each source is specified using a --source option.

    ::

    \b
    sample-communities sample-chain --dest mix --size 10 --source leaf --source root
    sample-communities sample-chain --dest pairs --size 2 --source mix.json
    \b
    sample-communities \\
        sample-chain --dest mix --size 10 --source leaf --source root \\
        sample-chain --dest eco --size 10 --added-value --source mix
    \b
    sample-communities sample-chain --dest av --added-value --pool sbml --source sample/gut
    """
    do_sample(ctx, name, reps, added_value, size, pool, dest, source)


[docs] def do_sample( # pylint: disable=R0913 ctx: click.Context, name: str | None, reps: int, added_value: bool, size: list[int], pool: list[Path], dest: Path, all_sources: list[Path], ): """ Generate a sample. """ models_per_spec = ctx.obj["models_per_spec"] random_initial_state = random.getstate() # Log this sampling task. logging.info( "Stratified sampling from %d source%s", len(all_sources), ess(all_sources) ) logging.info("Generating %d reps per sample", reps) logging.info("Output to %s/%s*%s", str(dest), str(name or DEFAULT_NAME), JSON) if pool: logging.info("Pool%s %s", ess(pool), " ".join(str(f) for f in pool)) if models_per_spec: logging.debug("Models per spec %d", models_per_spec) # Get sources and the pool. logging.info("Sources: %d path%s", len(all_sources), ess(all_sources)) all_srcs: Ecosystem = get_eco(all_sources) if not all_srcs: logging.warning("No sources, nothing to do") sys.exit(0) logging.info("Pool: %d path%s", len(pool), ess(pool)) pool_eco = get_eco(pool) all_pool: Community = list(chain.from_iterable(get_coms_from_eco(pool_eco))) # Generate samples for all sizes and reps. if not size: logging.info("Since size=[], set reps=1 for trivial sampling") samples = gen_sample( sources=all_srcs, reps=reps if size else 1, sizes=size, pool=all_pool, name=name ) # Generate added value samples for each sample. if added_value: samples = gen_added_value(samples, reps, all_pool, name) # Save the initial random number generator state dest.mkdir(exist_ok=True, parents=True) save_state(dest / RANDINITIAL, random_initial_state) # Write all samples logging.info("Sample %s: write to %s/%s", name, dest, name) output = name or DEFAULT_NAME write_specs(samples, dest, output, models_per_spec) # Save the final random number generator state save_state(dest / RANDFINAL)
[docs] def get_eco(sources: list[Path]) -> Ecosystem: """ Retrieve models from a list of paths and build ecosystems. Each source community may be a directory, a single model, or a JSON specification of a set of named communities. """ def str_com(com: Community): """String representation of a list of Models.""" return " ".join(f.stem if isinstance(f, Path) else f for f in com) expanded = list(chain.from_iterable(expand_source(src) for src in sources)) logging.debug( "Expanded (%d): %s", len(expanded), " ".join(str(k) for k, _ in expanded) ) if any(k is None for k, c in expanded): logging.debug( "At least one ordinary file source, returning list of anonymous single community" ) com: Community = list(chain.from_iterable(c for _, c in expanded)) logging.debug("- 1: (%d) %s", len(com), str_com(com)) return [com] ecos: Iterable[NamedCommunity] = list( (Name(name=name), com) for name, com in expanded ) for n, c in ecos: logging.debug("- %s %s: (%d) %s", str(n), n.ident, len(c), str_com(c)) return ecos
[docs] def expand_source(source: Path) -> Iterable[tuple[Name | None, Community]]: """ Expand a source into a name and a Path list. """ if source.is_dir(): com: Community = list( f for f in source.iterdir() if ".pickle" not in f.suffixes ) logging.debug("Expand %s [%s]", str(source), " ".join(str(i) for i in com)) return [(Name(eco=source.stem, com=com), com)] if PICKLE in source.suffixes: return [] sources: Iterable[tuple[Name | None, Community]] try: # See whether this is a JSON file with open(source, mode="r", encoding="UTF-8") as json_file: logging.debug("Attempting to read %s as specification", json_file.name) specification = json.load(json_file) logging.debug("Specification %s len %d", json_file.name, len(specification)) sources = [(Name.from_string(k), c) for k, c in specification.items()] except json.JSONDecodeError: # Otherwise treat as single model file sources = [(None, [source])] return sources
[docs] def get_coms_from_eco(sources: Ecosystem) -> Iterable[Community]: """ Retrieve communities from Ecosystems. """ coms = (c[1] if isinstance(c, tuple) else c for c in sources) return coms
if __name__ == "__main__": main(obj={})