"""Train neural network models using `NFP <https://github.com/NREL/nfp>`_"""
from sklearn.model_selection import train_test_split
try:
from tensorflow.keras import callbacks as cb
except ImportError as e: # pragma: no-coverage
raise ImportError('You may need to install Tensorflow and NFP.') from e
import tensorflow as tf
import numpy as np
import nfp
from examol.store.models import MoleculeRecord
from examol.utils.conversions import convert_string_to_nx
from .base import MultiFidelityScorer
from .utils.multifi import compute_deltas
from .utils.tf import LRLogger, TimeLimitCallback, EpochTimeLogger
[docs]
class ReduceAtoms(tf.keras.layers.Layer):
"""Reduce the atoms along a certain direction
Args:
reduction_op: Name of the operation used for reduction
"""
def __init__(self, reduction_op: str = 'mean', **kwargs):
super().__init__(**kwargs)
self.reduction_op = reduction_op
[docs]
def get_config(self):
config = super().get_config()
config['reduction_op'] = self.reduction_op
return config
[docs]
def call(self, inputs, mask=None): # pragma: no-coverage
"""
Args:
inputs: Matrix to be reduced
mask: Identifies which rows to sum are placeholders
"""
masked_tensor = tf.ragged.boolean_mask(inputs, mask)
reduce_fn = getattr(tf.math, f'reduce_{self.reduction_op}')
return reduce_fn(masked_tensor, axis=1)
# Define the custom layers for our class
custom_objects = nfp.custom_objects.copy()
custom_objects['ReduceAtoms'] = ReduceAtoms
[docs]
def make_simple_network(
atom_features: int = 64,
message_steps: int = 8,
output_layers: list[int] = (512, 256, 128),
reduce_op: str = 'mean',
atomwise: bool = True,
outputs: int = 1,
) -> tf.keras.models.Model:
"""Construct a basic MPNN model using the settings provided by a user
Models will have embeddings for atoms with atomic numbers up to 63,
and 4 types of bonds (single, double, triple, aromatic).
The models use edge, node, and global update for each message passing layer
and a separate set of MLPs for each of the outputs.
There is also a "scaling" layer which can be used to adjust the mean
and standard deviation of the prediction.
Args:
atom_features: Number of features used per atom and bond
message_steps: Number of message passing steps
output_layers: Number of neurons in the readout layers
reduce_op: Operation used to reduce from atom-level to molecule-level vectors
atomwise: Whether to reduce atomwise contributions after the output layers,
or reduce to a single vector per molecule before the output layers
outputs: Number of output properties. Each will use their own output network
Returns:
A model instantiated with the user-defined options
"""
atom = tf.keras.layers.Input(shape=[None], dtype=tf.int32, name='atom')
bond = tf.keras.layers.Input(shape=[None], dtype=tf.int32, name='bond')
connectivity = tf.keras.layers.Input(shape=[None, 2], dtype=tf.int32, name='connectivity')
# Convert from a single integer defining the atom state to a vector
# of weights associated with that class
atom_state = tf.keras.layers.Embedding(64, atom_features, name='atom_embedding', mask_zero=True)(atom)
# Ditto with the bond state
bond_state = tf.keras.layers.Embedding(5, atom_features, name='bond_embedding', mask_zero=True)(bond)
# Here we use our first nfp layer. This is an attention layer that looks at
# the atom and bond states and reduces them to a single, graph-level vector.
# mum_heads * units has to be the same dimension as the atom / bond dimension
global_state = nfp.GlobalUpdate(units=4, num_heads=1, name='problem')([atom_state, bond_state, connectivity])
for _ in range(message_steps): # Do the message passing
new_bond_state = nfp.EdgeUpdate()([atom_state, bond_state, connectivity, global_state])
bond_state = tf.keras.layers.Add()([bond_state, new_bond_state])
new_atom_state = nfp.NodeUpdate()([atom_state, bond_state, connectivity, global_state])
atom_state = tf.keras.layers.Add()([atom_state, new_atom_state])
new_global_state = nfp.GlobalUpdate(units=4, num_heads=1)(
[atom_state, bond_state, connectivity, global_state]
)
global_state = tf.keras.layers.Add()([global_state, new_global_state])
# Condense the features into a single vector if building a molecular fingerprint
if not atomwise:
start_state = ReduceAtoms(reduce_op)(atom_state)
else:
start_state = atom_state
# Build the output layers
output_networks = []
for output_id in range(outputs):
# Build the MLP
output = start_state
for i, shape in enumerate(output_layers):
output = tf.keras.layers.Dense(shape, activation='relu', name=f'output_{output_id}_layer_{i}')(output)
output = tf.keras.layers.Dense(1)(output)
# Reduce to a single prediction per network if needed
if atomwise:
output = ReduceAtoms(reduce_op)(output)
# Apply a scale layer then append to the outputs
output = tf.keras.layers.Dense(1, activation='linear', name=f'scale_{output_id}')(output)
output_networks.append(output)
# Combine them if needed
if len(output_networks) == 1:
output = output_networks[0]
else:
output = tf.keras.layers.Concatenate(axis=-1)(output_networks)
# Construct the tf.keras model
return tf.keras.Model([atom, bond, connectivity], [output])
[docs]
class NFPMessage:
"""Package for sending an MPNN model over connections that require pickling"""
def __init__(self, model: tf.keras.Model):
"""
Args:
model: Model to be sent
"""
self.config = model.to_json()
# Makes a copy of the weights to ensure they are not memoryview objects
self.weights = [np.array(v) for v in model.get_weights()]
# Cached copy of the model
self._model = model
def __getstate__(self):
"""Get state except the model"""
state = self.__dict__.copy()
state['_model'] = None
return state
[docs]
def get_model(self) -> tf.keras.Model:
"""Get a copy of the model
Returns:
The model specified by this message
"""
if self._model is None:
self._model = tf.keras.models.model_from_json(
self.config,
custom_objects=custom_objects
)
self._model.set_weights(self.weights)
return self._model
[docs]
def convert_string_to_dict(mol_string: str) -> dict:
"""Convert a molecule to an NFP-compatible dictionary form
Args:
mol_string: SMILES or InChI string
Returns:
Dictionary
"""
# Convert first to a nx.Graph
graph = convert_string_to_nx(mol_string)
# Get the atom types
atom_type_id = [n['atomic_num'] for _, n in graph.nodes(data=True)]
# Get the bond types, making the data
bond_types = ["", "AROMATIC", "DOUBLE", "SINGLE", "TRIPLE"] # 0 is a dummy type
connectivity = []
edge_type = []
for a, b, d in graph.edges(data=True):
connectivity.append([a, b])
connectivity.append([b, a])
edge_type.append(str(d['bond_type']))
edge_type.append(str(d['bond_type']))
edge_type_id = list(map(bond_types.index, edge_type))
# Sort connectivity array by the first column
# This is needed for the MPNN code to efficiently group messages for
# each node when performing the message passing step
connectivity = np.array(connectivity)
if connectivity.size > 0:
# Skip a special case of a molecule w/o bonds
inds = np.lexsort((connectivity[:, 1], connectivity[:, 0]))
connectivity = connectivity[inds, :]
# Tensorflow's "segment_sum" will cause problems if the last atom
# is not bonded because it returns an array
if connectivity.max() != len(atom_type_id) - 1:
raise ValueError(f"Problem with unconnected atoms for \"{mol_string}\"")
else:
connectivity = np.zeros((0, 2))
return {
'atom': atom_type_id,
'bond': edge_type_id,
'connectivity': connectivity
}
[docs]
def make_data_loader(mol_dicts: list[dict],
values: np.ndarray | list[object] | None = None,
batch_size: int = 32,
repeat: bool = False,
shuffle_buffer: int | None = None,
value_spec: tf.TensorSpec = tf.TensorSpec((), dtype=tf.float32),
drop_last_batch: bool = False) -> tf.data.Dataset:
"""Make an in-memory data loader for data compatible with NFP-style neural networks
Args:
mol_dicts: List of molecules parsed into the moldesign format
values: List of output values, if included in the output
value_spec: Tensorflow specification for the output
batch_size: Number of molecules per batch
repeat: Whether to create an infinitely-repeating iterator
shuffle_buffer: Size of a shuffle buffer. Use ``None`` to leave data unshuffled
drop_last_batch: Whether to keep the last batch in the dataset. Set to ``True`` if, for example, you need every batch to be the same size
Returns:
Data loader that generates molecules in the desired shapes
"""
# Determine the maximum size of molecule, used when padding the arrays
max_atoms = max(len(x['atom']) for x in mol_dicts)
max_bonds = max(len(x['bond']) for x in mol_dicts)
# Make the initial data loader
record_sig = {
"atom": tf.TensorSpec(shape=(None,), dtype=tf.int32),
"bond": tf.TensorSpec(shape=(None,), dtype=tf.int32),
"connectivity": tf.TensorSpec(shape=(None, 2), dtype=tf.int32),
}
if values is None:
def generator():
yield from mol_dicts
else:
def generator():
yield from zip(mol_dicts, values)
record_sig = (record_sig, value_spec)
loader = tf.data.Dataset.from_generator(generator=generator, output_signature=record_sig).cache() # TODO (wardlt): Make caching optional?
# Repeat the molecule list before shuffling
if repeat:
loader = loader.repeat()
# Shuffle, if desired
if shuffle_buffer is not None:
loader = loader.shuffle(shuffle_buffer)
# Make the batches. Pads the data to make them all the same size, adding 0's to signify padded values
padded_records = {
"atom": tf.TensorShape((max_atoms,)),
"bond": tf.TensorShape((max_bonds,)),
"connectivity": tf.TensorShape((max_bonds, 2))
}
if values is not None:
padded_records = (padded_records, value_spec.shape)
loader = loader.padded_batch(batch_size=batch_size, padded_shapes=padded_records, drop_remainder=drop_last_batch)
return loader
[docs]
class NFPScorer(MultiFidelityScorer):
"""Train message-passing neural networks based on the `NFP <https://github.com/NREL/nfp>`_ library.
NFP uses Keras to define message-passing networks, which is backed by Tensorflow for executing the networks on different hardware.
Multi-fidelity models predict the lowest, most-plentiful level of fidelity directly and
correction factors to adjust the low-level predictions for the higher levels (i.e., delta learning).
Training does not require all levels of fidelity to be available and will only measure loss
against the available data.
Inference predicts the low-fidelity value and all correction factors for higher levels,
but uses known values in place of them if available.
"""
_supports_multi_fidelity = True
def __init__(self, retrain_from_scratch: bool = True):
"""
Args:
retrain_from_scratch: Whether to retrain models from scratch or not
"""
self.retrain_from_scratch = retrain_from_scratch
[docs]
def prepare_message(self, model: tf.keras.models.Model, training: bool = False) -> dict | NFPMessage:
if training and self.retrain_from_scratch:
return model.get_config()
else:
return NFPMessage(model)
[docs]
def score(self,
model_msg: NFPMessage,
input_data: list[dict | tuple[dict, np.ndarray]],
batch_size: int = 64,
lower_fidelities: np.ndarray | None = None,
**kwargs) -> np.ndarray:
"""Assign a score to molecules
Args:
model_msg: Model in a transmittable format
input_data: Batch of inputs ready for the model (in dictionary format)
batch_size: Number of molecules to evaluate at each time
lower_fidelities: Properties of the molecule at lower levels, if known
Returns:
The scores to a set of records
"""
model = model_msg.get_model() # Unpack the model
# Run inference
loader = make_data_loader(input_data, batch_size=batch_size)
ml_outputs = np.squeeze(model.predict(loader, verbose=False))
if ml_outputs.ndim == 1: # Single-fidelity learning
return ml_outputs
# For multi-objective, add in the use the known outputs in place of the NN outputs if we know them
if lower_fidelities is not None:
known_deltas = compute_deltas(lower_fidelities)
ml_outputs[:, :-1] = np.where(np.isnan(known_deltas), ml_outputs[:, :-1], known_deltas)
return ml_outputs.sum(axis=1) # The outputs of the networks are deltas
[docs]
def retrain(self,
model_msg: dict | NFPMessage,
input_data: list,
output_data: np.ndarray,
lower_fidelities: None | np.ndarray = None,
num_epochs: int = 4,
batch_size: int = 32,
validation_split: float = 0.1,
learning_rate: float = 1e-3,
device_type: str = 'gpu',
steps_per_exec: int = 1,
patience: int = None,
timeout: float = None,
verbose: bool = False) -> tuple[list[np.ndarray], dict]:
"""Retrain the scorer based on new training records
Args:
model_msg: Model to be retrained
input_data: Training set inputs, as generated by :meth:`transform_inputs`
output_data: Training Set outputs, as generated by :meth:`transform_outputs`
lower_fidelities: Lower-fidelity data, if available
num_epochs: Maximum number of epochs to run
batch_size: Number of molecules per training batch
validation_split: Fraction of molecules used for the training/validation split
learning_rate: Learning rate for the Adam optimizer
device_type: Type of device used for training
steps_per_exec: Number of training steps to run per execution on acceleration
patience: Number of epochs without improvement before terminating training. Default is 10% of ``num_epochs``
timeout: Maximum training time in seconds
verbose: Whether to print training information to screen
Returns:
Message defining how to update the model
"""
# Make the model
if isinstance(model_msg, NFPMessage):
model = model_msg.get_model()
elif isinstance(model_msg, dict):
model = tf.keras.Model.from_config(model_msg, custom_objects=custom_objects)
else: # pragma: no-coverage
raise NotImplementedError(f'Unrecognized message type: {type(model_msg)}')
# Prepare data for single- vs multi-objective
is_single = lower_fidelities is None
if is_single:
# Nothing special: Use a standard loss function, no preprocessing required
loss = 'mean_squared_error'
value_spec = tf.TensorSpec((), dtype=tf.float32)
else:
# Use a loss function which ignores the NaN values
def loss(y_true, y_pred):
"""Measure loss only on the non NaN values"""
is_known = tf.math.is_finite(y_true)
return tf.keras.losses.mean_squared_error(y_true[is_known], y_pred[is_known])
# Prepare the outputs
output_data = np.concatenate([lower_fidelities, output_data[:, None]], axis=1)
output_data = compute_deltas(output_data)
value_spec = tf.TensorSpec((output_data.shape[1],), dtype=tf.float32)
# Split off a validation set
train_x, valid_x, train_y, valid_y = train_test_split(input_data, output_data, test_size=validation_split)
# Make the loaders
steps_per_epoch = len(train_x) // batch_size
train_loader = make_data_loader(train_x, train_y, repeat=True, batch_size=batch_size, drop_last_batch=True, shuffle_buffer=32768, value_spec=value_spec)
valid_steps = len(valid_x) // batch_size
if valid_steps == 0: # pragma: no-coverage
raise ValueError(f'Insufficient validation data. Need at least {batch_size} records')
valid_loader = make_data_loader(valid_x, valid_y, batch_size=batch_size, drop_last_batch=True, value_spec=value_spec)
# Define initial guesses for the "scaling" later
try:
output_data = np.array(output_data)
output_mean = np.nanmean(output_data, axis=0)
outputs_std = np.clip(np.nanstd(output_data, axis=0), 1e-6, a_max=None)
for i, (m, s) in enumerate(zip(np.atleast_1d(output_mean), np.atleast_1d(outputs_std))):
scale_layer = model.get_layer(f'scale_{i}')
scale_layer.set_weights([np.atleast_2d(s), np.atleast_1d(m)])
except ValueError:
pass
# Configure the LR schedule
init_learn_rate = learning_rate
final_learn_rate = init_learn_rate * 1e-3
decay_rate = (final_learn_rate / init_learn_rate) ** (1. / (num_epochs - 1))
def lr_schedule(epoch, lr):
if epoch > 0:
return lr * decay_rate
return lr
# Compile the model then train
model.compile(
tf.optimizers.Adam(init_learn_rate),
loss=loss,
steps_per_execution=steps_per_exec,
)
# Make the callbacks
if patience is None:
patience = num_epochs // 10
early_stopping = cb.EarlyStopping(patience=patience, restore_best_weights=True)
callbacks = [
LRLogger(),
EpochTimeLogger(),
cb.LearningRateScheduler(lr_schedule),
early_stopping,
cb.TerminateOnNaN(),
]
if timeout is not None:
callbacks.append(TimeLimitCallback(timeout))
if timeout is not None:
callbacks.append(TimeLimitCallback(timeout))
history = model.fit(
train_loader,
epochs=num_epochs,
shuffle=False,
verbose=verbose,
callbacks=callbacks,
steps_per_epoch=steps_per_epoch,
validation_data=valid_loader,
validation_steps=valid_steps,
validation_freq=1,
)
# If a timeout is used, make sure we are using the best weights
# The training may have exited without storing the best weights
if timeout is not None:
model.set_weights(early_stopping.best_weights)
# Convert weights to numpy arrays (avoids mmap issues)
weights = []
for v in model.get_weights():
v = np.array(v)
if np.isnan(v).any():
raise ValueError('Found some NaN weights.')
weights.append(v)
# Once we are finished training call "clear_session" to flush the model out of GPU memory
tf.keras.backend.clear_session()
return weights, history.history
[docs]
def update(self, model: tf.keras.models.Model, update_msg: tuple[list[np.ndarray], dict]) -> tf.keras.models.Model:
model.set_weights(update_msg[0])
return model