Source code for examol.simulate.ase

"""Utilities for simulation using ASE"""
import json
import os
import re
from hashlib import sha512
from pathlib import Path
from shutil import rmtree
from time import perf_counter

import ase
import numpy as np
from ase import units
from ase.db import connect
from ase.io import Trajectory, read
from ase.optimize import BFGS, FIRE
from ase.io.ulm import InvalidULMFileError
from ase.calculators.gaussian import Gaussian, GaussianOptimizer

import examol.utils.conversions
from . import utils
from .utils import add_vacuum_buffer
from ..base import BaseSimulator, SimResult

# Location of additional basis sets
_cp2k_basis_set_dir = (Path(__file__).parent / 'cp2k-basis').resolve()

# Mapping between basis set and a converged cutoff energy
#  See methods in: https://github.com/exalearn/quantum-chemistry-on-polaris/blob/main/cp2k/
#  We increase the cutoff slightly to be on the safe side
_cutoff_lookup: dict[tuple[str, str], float] = {
    ('BLYP', 'SZV-MOLOPT-GTH'): 700.,
    ('BLYP', 'DZVP-MOLOPT-GTH'): 700.,
    ('B3LYP', 'def2-SVP'): 500.,
    ('B3LYP', 'def2-TZVPD'): 500.,
    ('WB97X_D3', 'def2-TZVPD'): 600.,
}

# Base input file
_cp2k_inp = """&FORCE_EVAL
&DFT
  &XC
     &XC_FUNCTIONAL $XC$
     &END XC_FUNCTIONAL
  &END XC
  &POISSON
     PERIODIC NONE
     PSOLVER MT
  &END POISSON
  &MGRID
    NGRIDS 5
    REL_CUTOFF 60
  &END MGRID
  &QS
    METHOD $METHOD$
  &END QS
  &SCF
    &OUTER_SCF
      MAX_SCF 5
    &END OUTER_SCF
    &OT T
      PRECONDITIONER FULL_ALL
    &END OT
  &END SCF
&END DFT
&SUBSYS
  &TOPOLOGY
    &CENTER_COORDINATES
    &END
  &END
&END FORCE_EVAL
"""

# Solvent data (solvent name -> (gamma, e0) for CP2K, solvent name - xTB/G name for xTB/G)
_solv_data = {
    'acn': (
        29.4500,  # http://www.ddbst.com/en/EED/PCP/SFT_C3.php
        37.5  # https://depts.washington.edu/eooptic/linkfiles/dielectric_chart%5B1%5D.pdf
    )
}
_xtb_solv_names = {'acn': 'acetonitrile'}
_gaussian_solv_names = {'acn': 'acetonitrile'}


[docs] class ASESimulator(BaseSimulator): """Use ASE to perform quantum chemistry calculations The calculator supports calculations with the following codes: - *XTB*: Tight binding using the GFN2-xTB parameterization - *Gaussian*: Supports any of the methods and basis sets of Gaussian using names of the format ``gaussian_[method]_[basis]``. Supply additional arguments to Gaussian as keyword arguments. - *MOPAC*: Semiempirical quantum chemistry. Choose a method by providing a configuration name of the form ``mopac_[method]`` - *CP2K*: Supports only a few combinations of basis sets and XC functions, those for which we have determined appropriate cutoff energies. All are named ``cp2k_[xc name]_[basis]`` Args: cp2k_command: Command to launch CP2K gaussian_command: Command to launch Gaussian. Only the path to the executable is generally needed optimization_steps: Maximum number of optimization steps scratch_dir: Path in which to create temporary directories clean_after_run: Whether to clean output files after a run exits successfully ase_db_path: Path to an ASE db in which to store results retain_failed: Whether to clean output files after a run fails """ def __init__(self, cp2k_command: str | None = None, gaussian_command: str | None = None, optimization_steps: int = 250, scratch_dir: Path | str | None = None, clean_after_run: bool = True, ase_db_path: Path | str | None = None, retain_failed: bool = True): super().__init__(scratch_dir, retain_failed=retain_failed) self.cp2k_command = 'cp2k_shell' if cp2k_command is None else cp2k_command self.optimization_steps = optimization_steps self.gaussian_command = Gaussian.command if gaussian_command is None else f'{gaussian_command} < PREFIX.com > PREFIX.log' self.ase_db_path = None if ase_db_path is None else Path(ase_db_path).absolute() self.clean_after_run = clean_after_run
[docs] def create_configuration(self, name: str, xyz: str, charge: int, solvent: str | None, **kwargs) -> dict: if name == 'xtb': kwargs = {'accuracy': 0.05} if solvent is not None: if solvent not in _xtb_solv_names: # pragma: no-coverage raise ValueError(f'Solvent not defined: {solvent}') kwargs['solvent'] = _xtb_solv_names[solvent] return {'name': 'xtb', 'kwargs': kwargs} elif name.startswith('mopac_'): method = name.split("_")[-1] kwargs = {'method': method.upper(), 'task': '1SCF GRADIENTS'} if solvent is not None: if solvent not in _solv_data: # pragma: no-coverage raise ValueError(f'Solvent not defined: {solvent}') _, e0 = _solv_data[solvent] kwargs['task'] += f" EPS={e0}" # Use the defaults for the other parameters return {'name': 'mopac', 'kwargs': kwargs} elif name.startswith('gaussian_'): # Unpack the name if name.count("_") != 2: raise ValueError('Detected the wrong number of separators. Names for the XC function and basis set should not include underscores.') _, xc, basis = name.split("_") # Create additional options add_options = {} if solvent is not None: add_options['SCRF'] = f'PCM,Solvent={_gaussian_solv_names.get(solvent, solvent)}' add_options['scf'] = 'xqc,MaxConventional=200' n_atoms = int(xyz.split("\n", maxsplit=2)[0]) if n_atoms > 50: # ASE requires the structure to be printed, and Gaussian requires special options to print structures larger than 50 atoms # See: https://gitlab.com/ase/ase/-/merge_requests/2909 and https://gaussian.com/overlay2/ add_options['ioplist'] = ["2/9=2000"] # Build the specification return { 'name': 'gaussian', 'use_gaussian_opt': n_atoms <= 50, 'kwargs': { 'command': self.gaussian_command, 'chk': 'gauss.chk', 'basis': basis, 'method': xc, 'charge': charge, 'mult': abs(charge) + 1, # Assume the worst **add_options, **kwargs } } elif name.startswith('cp2k_'): # Get the name the basis set xc_name, basis_set_id = name.rsplit('_', 2)[-2:] xc_name = xc_name.upper() # Determine the proper basis set, pseudopotential, and method if xc_name in ['B3LYP', 'WB97X_D3']: xc_section = f'\n&HYB_GGA_XC_{xc_name}\n&END HYB_GGA_XC_{xc_name}' basis_set_name = f'def2-{basis_set_id.upper()}' basis_set_file = _cp2k_basis_set_dir / 'DEF2_BASIS_SETS' potential = 'ALL' pp_file_name = 'ALL_POTENTIALS' method = 'GAPW' elif xc_name == 'BLYP': xc_section = xc_name basis_set_name = f'{basis_set_id}-MOLOPT-GTH'.upper() basis_set_file = 'BASIS_MOLOPT' potential = 'GTH-BLYP' pp_file_name = None # Use the default method = 'GPW' else: # pragma: no-coverage raise ValueError(f'XC functional "{xc_name}" not yet supported') # Inject the proper XC functional inp = _cp2k_inp inp = inp.replace('$XC$', xc_section) inp = inp.replace('$METHOD$', method) # Get the cutoff assert (xc_name, basis_set_name) in _cutoff_lookup, f'Cutoff energy not defined for {xc_name}//{basis_set_name}' cutoff = _cutoff_lookup[(xc_name, basis_set_name)] # Add solvent information, if desired if solvent is not None: assert solvent in _solv_data, f"Solvent {solvent} not defined. Available: {', '.join(_solv_data.keys())}" gamma, e0 = _solv_data[solvent] # Inject it in the input file # We use beta=0 and alpha+gamma=0 as these do not matter for solvation energy: https://groups.google.com/g/cp2k/c/7oYTqSIyIqI/m/7D62tXIzBgAJ inp = inp.replace( '&END SCF\n', f"""&END SCF &SCCS ALPHA {-gamma} BETA 0 GAMMA {gamma} RELATIVE_PERMITTIVITY {e0} DERIVATIVE_METHOD CD3 METHOD ANDREUSSI &END SCCS\n""") return { 'name': 'cp2k', 'buffer_size': 6.0, 'kwargs': dict( xc=None, charge=charge, uks=charge != 0, inp=inp, cutoff=cutoff * units.Ry, max_scf=32, basis_set_file=str(basis_set_file), basis_set=basis_set_name, pseudo_potential=potential, potential_file=pp_file_name, poisson_solver=None, stress_tensor=False, command=self.cp2k_command) } else: # pragma: no-cover raise ValueError(f'Configuration not supported: {name}')
[docs] def optimize_structure(self, mol_key: str, xyz: str, config_name: str, charge: int = 0, solvent: str | None = None, **kwargs) \ -> tuple[SimResult, list[SimResult], str | None]: fmax_conv = 0.02 # Convergence threshold in eV/Ang start_time = perf_counter() # Measure when we started # Make the configuration calc_cfg = self.create_configuration(config_name, xyz, charge, solvent) # Parse the XYZ file into atoms atoms = examol.utils.conversions.read_from_string(xyz, 'xyz') # Make the run directory based on a hash of the input configuration run_path = self._make_run_directory('opt', mol_key, xyz, charge, config_name, solvent) # Run inside a temporary directory old_path = Path.cwd() succeeded = False try: os.chdir(run_path) with utils.make_ephemeral_calculator(calc_cfg) as calc: # Get the last atoms from a previous run traj_path = Path('opt.traj') if traj_path.is_file(): try: # Overwrite our atoms with th last in the trajectory with Trajectory(traj_path, mode='r') as traj: atoms = traj[-1] except InvalidULMFileError: traj_path.unlink() pass # Prepare the structure for a specific code if 'cp2k' in config_name: calc_cfg['buffer_size'] *= 2 # In case the molecule expands self._prepare_atoms(atoms, charge, calc_cfg) # Special case: use Gaussian's optimizer if isinstance(calc, Gaussian) and calc_cfg['use_gaussian_opt']: # Start the optimization dyn = GaussianOptimizer(atoms, calc) dyn.run(fmax='tight', steps=self.optimization_steps, opt='calcfc') # Read the energies from the output file traj = read('Gaussian.log', index=':') out_traj = [] for atoms in traj: out_strc = examol.utils.conversions.write_to_string(atoms, 'xyz') out_traj.append(SimResult(config_name=config_name, charge=charge, solvent=solvent, xyz=out_strc, energy=atoms.get_potential_energy(), forces=atoms.get_forces())) out_result = out_traj.pop(-1) return out_result, out_traj, json.dumps({'runtime': perf_counter() - start_time}) # Attach the calculator atoms.calc = calc # Continue to append to the same trajectory from previous runs with Trajectory(str(traj_path), mode='a', atoms=atoms) as traj: # Start with a MDMin optimization to very thin convergence threshold dyn = FIRE(atoms, logfile='opt.log', trajectory=traj) dyn.run(fmax=0.7, steps=self.optimization_steps) # TODO (wardlt) make the fmax configurable # If CP2K, re-expand the simulation cell in chase molecule has expanded if 'cp2k' in config_name: self._prepare_atoms(atoms, charge, calc_cfg) # Make the optimizer dyn = BFGS(atoms, logfile='opt.log', trajectory=traj) # Run an optimization dyn.run(fmax=fmax_conv, steps=self.optimization_steps) max_force = np.max(atoms.get_forces()) if max_force > fmax_conv: raise ValueError(f'Convergence failed after {self.optimization_steps}. fmax={fmax_conv:.3f}') # Get the trajectory with Trajectory(str(traj_path), mode='r') as traj: # Get all atoms in the trajectory traj_lst = [a for a in traj] # Store atoms in the database if self.ase_db_path is not None: self.update_database([atoms], config_name, charge, solvent) self.update_database(traj_lst, config_name, charge, solvent) # Convert to the output format out_traj = [] out_strc = examol.utils.conversions.write_to_string(atoms, 'xyz') out_result = SimResult(config_name=config_name, charge=charge, solvent=solvent, xyz=out_strc, energy=atoms.get_potential_energy(), forces=atoms.get_forces()) for atoms in traj_lst: traj_xyz = examol.utils.conversions.write_to_string(atoms, 'xyz') traj_res = SimResult(config_name=config_name, charge=charge, solvent=solvent, xyz=traj_xyz, energy=atoms.get_potential_energy(), forces=atoms.get_forces()) out_traj.append(traj_res) # Read in the output log out_path = Path('opt.log') out_log = out_path.read_text() if out_path.is_file() else None # Mark that we finished successfully succeeded = True return out_result, out_traj, json.dumps({'runtime': perf_counter() - start_time, 'out_log': out_log}) finally: # Delete the run directory if (succeeded and self.clean_after_run) or (not succeeded and not self.retain_failed): os.chdir(old_path) rmtree(run_path) # Make sure we end back where we started os.chdir(old_path)
def _prepare_atoms(self, atoms: ase.Atoms, charge: int, config: dict): """Make the atoms object ready for the simulation Args: atoms: Atoms object to be adjusted charge: Charge on the system config: Configuration detail """ if 'cp2k' in config['name']: add_vacuum_buffer(atoms, buffer_size=config['buffer_size'], cubic=re.match(r'PSOLVER\s+MT', config['kwargs']['inp'].upper()) is None) elif 'xtb' in config['name'] or 'mopac' in config['name']: utils.initialize_charges(atoms, charge)
[docs] def compute_energy(self, mol_key: str, xyz: str, config_name: str, charge: int = 0, solvent: str | None = None, forces: bool = True, **kwargs) -> tuple[SimResult, str | None]: # Make the configuration start_time = perf_counter() # Measure when we started # Make the configuration calc_cfg = self.create_configuration(config_name, xyz, charge, solvent) # Make the run directory based on a hash of the input configuration run_path = self._make_run_directory('single', mol_key, xyz, charge, config_name, solvent) # Parse the XYZ file into atoms atoms = examol.utils.conversions.read_from_string(xyz, 'xyz') # Run inside a temporary directory old_path = Path.cwd() succeeded = False try: os.chdir(run_path) # Prepare to run the cell with utils.make_ephemeral_calculator(calc_cfg) as calc: # Make any changes to cell needed by the calculator self._prepare_atoms(atoms, charge, calc_cfg) # Run a single point atoms.calc = calc forces = atoms.get_forces() if forces else None energy = atoms.get_potential_energy() # If CP2K, make sure it converged if config_name.startswith('cp2k'): cp2k_output = (run_path / 'cp2k.out') assert cp2k_output.is_file(), f'Cannot find output at: {cp2k_output.absolute()}' if ':: SCF run NOT converged ***' in cp2k_output.read_text(): # pragma: no-coverage raise ValueError('CP2K computation did not converge') # Report the results if self.ase_db_path is not None: self.update_database([atoms], config_name, charge, solvent) out_strc = examol.utils.conversions.write_to_string(atoms, 'xyz') out_result = SimResult(config_name=config_name, charge=charge, solvent=solvent, xyz=out_strc, energy=energy, forces=forces) succeeded = True # So tht we know whether to delete output directory return out_result, json.dumps({'runtime': perf_counter() - start_time}) finally: if (succeeded and self.clean_after_run) or (not succeeded and not self.retain_failed): os.chdir(old_path) rmtree(run_path) os.chdir(old_path)
[docs] def update_database(self, atoms_to_write: list[ase.Atoms], config_name: str, charge: int, solvent: str | None): """Update the ASE database collected along with this class Args: atoms_to_write: List of Atoms objects to store in DB config_name: Name of the configuration used to compute energies charge: Charge on the system solvent: Name of solvent, if any """ # Connect to the database with connect(self.ase_db_path, append=True) as db: for atoms in atoms_to_write: # Get the atom hash hasher = sha512() hasher.update(atoms.positions.round(5).tobytes()) hasher.update(atoms.get_chemical_formula(mode='all', empirical=False).encode('ascii')) atoms_hash = hasher.hexdigest()[-16:] + "=" # See if the database already has this record if db.count(atoms_hash=atoms_hash, config_name=config_name, total_charge=charge, solvent=str(solvent)) > 0: continue db.write(atoms, atoms_hash=atoms_hash, config_name=config_name, total_charge=charge, solvent=str(solvent))