Source code for examol.steer.single

"""Single-objective and single-fidelity implementation of active learning. As easy as we get"""
import gzip
import json
import pickle as pkl
import shutil
from functools import partial
from pathlib import Path
from queue import Queue
from threading import Event
from time import perf_counter
from typing import Sequence
from concurrent.futures import ProcessPoolExecutor

import numpy as np
from colmena.proxy import get_store
from colmena.queue import ColmenaQueues
from colmena.thinker import event_responder, ResourceCounter, agent
from more_itertools import interleave_longest, batched
from proxystore.proxy import extract, Proxy
from proxystore.store import Store
from proxystore.store.utils import get_key

from .base import MoleculeThinker
from examol.solution import SingleFidelityActiveLearning
from ..score.base import Scorer
from ..store.db.base import MoleculeStore
from ..store.models import MoleculeRecord
from ..store.recipes import PropertyRecipe


def _generate_inputs(record: MoleculeRecord, scorer: Scorer) -> tuple[str, object] | None:
    """Parse a molecule then generate a form ready for inference

    Args:
        record: Molecule record to be parsed
        scorer: Tool used for inference
    Returns:
        - Key for the molecule record
        - Inference-ready format
        Or None if the transformation fails
    """

    try:
        # Compute the features
        readied = scorer.transform_inputs([record])[0]
    except (ValueError, RuntimeError):
        return None
    return record.identifier.smiles, readied


[docs] class SingleStepThinker(MoleculeThinker): """A thinker which submits all computations needed to evaluate a molecule whenever it is selected Args: queues: Queues used to communicate with the task server run_dir: Directory in which to store logs, etc. recipes: Recipes used to compute the target properties database: Connection to the store of molecular data solution: Settings related to tools used to solve the problem (e.g., active learning strategy) search_space: Search space of molecules. Provided as a list of paths to ".smi" files num_workers: Number of simulation tasks to run in parallel inference_chunk_size: Number of molecules to run inference on per task """ """ Args: queues: Queues used to communicate with the task server rec: Tool used to control the number of tasks being deployed on each resource run_dir: Directory in which to store logs, etc. recipes: Recipes used to compute the target properties database: Connection to the store of molecular data solution: Settings related to tools used to solve the problem (e.g., active learning strategy) search_space: Search space of molecules. Provided as a list of paths to ".smi" files inference_chunk_size: Number of molecules to run inference on per task """ search_space_dir: Path """Cache directory for search space""" search_space_smiles: list[list[str]] """SMILES strings of molecules in the search space""" search_space_inputs: list[list[object]] """Inputs (or proxies of inputs) to the machine learning models for each molecule in the search space""" scorer: Scorer """Class used to communicate data and models to distributed workers""" solution: SingleFidelityActiveLearning def __init__(self, queues: ColmenaQueues, run_dir: Path, recipes: Sequence[PropertyRecipe], solution: SingleFidelityActiveLearning, search_space: list[Path | str], database: MoleculeStore, pool: ProcessPoolExecutor, num_workers: int = 2, inference_chunk_size: int = 10000): super().__init__(queues, ResourceCounter(num_workers), run_dir, recipes, solution, search_space, database, pool) self.search_space_dir = self.run_dir / 'search-space' self.scorer = solution.scorer self._cache_search_space(inference_chunk_size, search_space) # Startup-related information self.starter = self.solution.starter # Model tracking information self.models = solution.models.copy() if len(set(map(len, self.models))) > 1: # pragma: no-coverage raise ValueError('You must provide the same number of models for each class') if len(self.models) != len(recipes): # pragma: no-coverage raise ValueError('You must provide as many model ensembles as recipes') self._model_proxies: list[list[Proxy | None]] = [[None] * len(m) for m in self.models] # Proxy for the current model(s) self.ready_models: Queue[tuple[int, int]] = Queue() # Queue of models are ready for inference # Partition the search space into smaller chunks self.search_space_smiles: list[list[str]] self.search_space_inputs: list[list[object]] self.search_space_smiles, self.search_space_inputs = zip(*self._cache_search_space(inference_chunk_size, self.search_space)) # Coordination tools self.start_inference: Event = Event() self.start_training: Event = Event() @property def num_models(self) -> int: """Number of models being trained by this class""" return sum(map(len, self.models)) def _cache_search_space(self, inference_chunk_size: int, search_space: list[str | Path]): """Cache the search space into a directory within the run""" # Check if we must rebuild the cache rebuild = True config_path = self.search_space_dir / 'settings.json' my_config = { 'inference_chunk_size': inference_chunk_size, 'scorer': str(self.scorer), 'paths': [str(Path(p).resolve()) for p in search_space] } if config_path.exists(): config = json.loads(config_path.read_text()) rebuild = config != my_config if rebuild: self.logger.info('Settings have changed. Rebuilding the cache') shutil.rmtree(self.search_space_dir) elif self.search_space_dir.exists(): shutil.rmtree(self.search_space_dir) self.search_space_dir.mkdir(exist_ok=True, parents=True) # Get the paths to inputs and keys, either by rebuilding or reading from disk search_space_keys = {} if rebuild: # Process the inputs and store them to disk search_size = 0 input_func = partial(_generate_inputs, scorer=self.scorer) # Run asynchronously mol_iter = self.pool.map(input_func, self.iterate_over_search_space(), chunksize=1000) mol_iter_no_failures = filter(lambda x: x is not None, mol_iter) for chunk_id, chunk in enumerate(batched(mol_iter_no_failures, inference_chunk_size)): keys, objects = zip(*chunk) search_size += len(keys) chunk_path = self.search_space_dir / f'chunk-{chunk_id}.pkl.gz' with gzip.open(chunk_path, 'wb') as fp: pkl.dump(objects, fp) search_space_keys[chunk_path.name] = keys self.logger.info(f'Saved {search_size} search entries into {len(search_space_keys)} batches') # Save the keys and the configuration with open(self.search_space_dir / 'keys.json', 'w') as fp: json.dump(search_space_keys, fp) with config_path.open('w') as fp: json.dump(my_config, fp) else: # Load in keys self.logger.info(f'Loading search space from {self.search_space_dir}') with open(self.search_space_dir / 'keys.json') as fp: search_space_keys = json.load(fp) # Load in the molecules, storing them as proxies in the "inference" store if there is a store defined self.logger.info(f'Loading in molecules from {len(search_space_keys)} files') output = [] proxy_store = self.inference_store if proxy_store is not None: self.logger.info(f'Will store inference objects to {proxy_store}') for name, keys in search_space_keys.items(): with gzip.open(self.search_space_dir / name, 'rb') as fp: # Load from disk objects = pkl.load(fp) if proxy_store is not None: # If the store exists, make a proxy objects = proxy_store.proxy(objects) output.append((keys, objects)) return output @property def inference_store(self) -> Store | None: """Proxystore used for inference tasks""" if (store_name := self.queues.proxystore_name.get('inference')) is not None: return get_store(store_name) def _get_training_set(self, recipe: PropertyRecipe) -> list[MoleculeRecord]: """Gather molecules for which the target property is available Args: recipe: Recipe to evaluate Returns: List of molecules for which that property is defined """ return [x for x in self.database.iterate_over_records() if recipe.lookup(x) is not None] # TODO (wardlt): Move to a function of the database class?
[docs] def count_training_size(self, recipe: PropertyRecipe) -> int: """Count the number of entries available for training each recipe Args: recipe: Recipe being assessed Return: Number of records for which this property is defined """ return len([None for r in self.database.iterate_over_records() if recipe.name in r.properties and recipe.level in r.properties[recipe.name]])
[docs] @agent(startup=True) def startup(self): """Pre-populate the database, if needed.""" # Determine how many training points are available train_size = min(self.count_training_size(r) for r in self.recipes) # If enough, start by training if train_size > self.solution.minimum_training_size: self.logger.info(f'Training set is larger than the threshold size ({train_size}>{self.solution.minimum_training_size}). Starting model training') self.start_training.set() return # If not, pick some self.logger.info(f'Training set is smaller than the threshold size ({train_size}<{self.solution.minimum_training_size})') search_space_size = sum(map(len, self.search_space_smiles)) subset = self.starter.select(list(interleave_longest(*self.search_space_smiles)), min(self.num_to_run, search_space_size)) self.logger.info(f'Selected {len(subset)} molecules to run') with self.task_queue_lock: for key in subset: self.task_queue.append((key, np.nan)) # All get the same score self.task_queue_lock.notify_all()
[docs] def get_additional_training_information(self, train_set: list[MoleculeRecord], recipe: PropertyRecipe) -> dict[str, object]: """Determine any additional information to be provided during training An example could be to gather low-fidelity data to use to augment the training process Args: train_set: Training set for the model recipe: Recipe being trained Returns: Additional options """ return {}
[docs] @event_responder(event_name='start_training') def retrain(self): """Retrain all models""" # Check if training is still ongoing if self.start_inference.is_set(): self.logger.info('Inference is still ongoing. Will not retrain yet') return # Check that we have enough data for all recipes for recipe in self.recipes: train_size = min(self.count_training_size(r) for r in self.recipes) if train_size < self.solution.minimum_training_size: self.logger.info(f'Too few to entries to train {recipe.name}. Waiting for {self.solution.minimum_training_size}. Have {train_size}') return for recipe_id, recipe in enumerate(self.recipes): # Get the training set train_set = self._get_training_set(recipe) self.logger.info(f'Gathered a total of {len(train_set)} entries for retraining recipe {recipe_id}') # Process to form the inputs and outputs train_inputs = self.scorer.transform_inputs(train_set) train_outputs = self.scorer.transform_outputs(train_set, recipe) train_kwargs = self.get_additional_training_information(train_set, recipe) self.logger.info('Pre-processed the training entries') # Submit all models for model_id, model in enumerate(self.models[recipe_id]): model_msg = self.scorer.prepare_message(model, training=True) self.queues.send_inputs( model_msg, train_inputs, train_outputs, input_kwargs=train_kwargs, method='retrain', topic='train', task_info={'recipe_id': recipe_id, 'model_id': model_id} ) self.logger.info(f'Submitted all models for recipe={recipe_id}') # Retrieve the results for i in range(self.num_models): result = self.queues.get_result(topic='train') self._write_result(result, 'train') assert result.success, f'Training failed: {result.failure_info}' # Update the appropriate model model_id = result.task_info['model_id'] recipe_id = result.task_info['recipe_id'] model_msg = result.value if isinstance(model_msg, Proxy): # Forces resolution. Needed to avoid `submit_inference` from making a proxy of `model_msg`, which can happen if it is not resolved # by `scorer.update` and is a problem because the proxy for `model_msg` can be evicted while other processes need it model_msg = extract(model_msg) self.models[recipe_id][model_id] = self.scorer.update(self.models[recipe_id][model_id], model_msg) self.logger.info(f'Updated model {i + 1}/{self.num_models}. Recipe id={recipe_id}. Model id={model_id}') # Signal to begin inference self.start_inference.set() self.ready_models.put((recipe_id, model_id)) self.logger.info('Finished training all models')
[docs] def submit_inference(self) -> tuple[list[list[str]], np.ndarray, list[np.ndarray]]: """Submit all molecules to be evaluated, return placeholders for their outputs Inference tasks are submitted with a few bits of metadata - recipe_id: Index of the recipe being evaluated - model_id: Index of the model being evaluated - chunk_id: Index of the chunk of molecules - chunk_size: Number of molecules in chunks being evaluated Returns: - Smiles strings of the molecules being evaluated - Boolean array marking if inference task is done ``n_chunks x recipes x ensemble_size`` - List of arrays in which to store inference results a total of ``n_chunks`` arrays of size ``recipes x batch_size x models`` """ # Get the proxystore for inference, if defined store = self.inference_store # Submit a model as soon as it is read for i in range(self.num_models): # Wait for a model to finish training recipe_id, model_id = self.ready_models.get() model = self.models[recipe_id][model_id] # Serialize and, if available, proxy the model model_msg = self.scorer.prepare_message(model, training=False) if store is not None: model_msg = store.proxy(model_msg) self._model_proxies[recipe_id][model_id] = model_msg self.logger.info(f'Preparing to submit tasks for model {i + 1}/{self.num_models}.') for chunk_id, (chunk_inputs, chunk_keys) in enumerate(zip(self.search_space_inputs, self.search_space_smiles)): self.queues.send_inputs( model_msg, chunk_inputs, method='score', topic='inference', task_info={'recipe_id': recipe_id, 'model_id': model_id, 'chunk_id': chunk_id, 'chunk_size': len(chunk_keys)} ) self.logger.info(f'Submitted all tasks for recipe={recipe_id} model={model_id}') # Prepare to store the inference results n_chunks = len(self.search_space_inputs) ensemble_size = len(self.models[0]) all_done: np.ndarray = np.zeros((n_chunks, len(self.recipes), ensemble_size), dtype=bool) inference_results: list[np.ndarray] = [ np.zeros((len(self.recipes), len(chunk), ensemble_size)) for chunk in self.search_space_smiles ] # (chunk, recipe, molecule, model) return list(self.search_space_smiles), all_done, inference_results
def _filter_inference_results(self, chunk_id: int, chunk_smiles: list[str], inference_results: np.ndarray) -> tuple[list[str], np.ndarray]: """Remove entries from the input array before adding to the selector Args: chunk_id: Index of the chunk being processed chunk_smiles: SMILES strings for molecules in this chunk inference_results: Results for the inference Returns: - SMILES strings of chunk after filtering - Inference results after filtering """ return chunk_smiles, inference_results
[docs] @event_responder(event_name='start_inference') def run_inference(self): """Store inference results then update the task list""" # Submit the tasks and prepare the storage chunk_smiles, all_done, inference_results = self.submit_inference() # Reset the selector selector = self.solution.selector selector.update(self.database, self.recipes) selector.start_gathering() # Gather all inference results n_tasks = all_done.size self.logger.info(f'Prepared to receive {n_tasks} results') for i in range(n_tasks): # Find which result this is result = self.queues.get_result(topic='inference') start_time = perf_counter() recipe_id = result.task_info['recipe_id'] model_id = result.task_info['model_id'] chunk_id = result.task_info['chunk_id'] self.logger.info(f'Received inference result {i + 1}/{n_tasks}. Recipe={recipe_id}, model={model_id}, chunk={chunk_id}, success={result.success}') # Save the outcome self._write_result(result, 'inference') assert result.success, f'Inference failed due to {result.failure_info}' # Update the inference results all_done[chunk_id, recipe_id, model_id] = True inference_results[chunk_id][recipe_id, :, model_id] = np.squeeze(result.value) # Check if we are done for the whole chunk (all models for this chunk) if all_done[chunk_id, :, :].all(): self.logger.info(f'Everything done for chunk={chunk_id}. Adding to selector.') filtered_smiles, filtered_results = self._filter_inference_results(chunk_id, chunk_smiles[chunk_id], inference_results[chunk_id]) if len(filtered_smiles) > 0: selector.add_possibilities(filtered_smiles, filtered_results) # If we are done with all chunks for a model if all_done[:, recipe_id, model_id].all(): self.logger.info(f'Done with all inference tasks for recipe={recipe_id} model={model_id}. Evicting proxy, if any.') if self._model_proxies[recipe_id][model_id] is not None: key = get_key(self._model_proxies[recipe_id][model_id]) self.inference_store.evict(key) # Mark that we're done with this result self.logger.info(f'Done processing inference result {i + 1}/{n_tasks}. Time: {perf_counter() - start_time:.2e}s') # Get the top list of molecules self.logger.info('Done storing all results') with self.task_queue_lock: self.task_queue.clear() for key, score in selector.dispense(): self.task_queue.append((str(key), score)) # Notify anyone waiting on more tasks self.task_queue_lock.notify_all() self.logger.info('Updated task queue. All done.')
def _simulations_complete(self, record: MoleculeRecord): self.start_training.set()