Source code for aiida_lsmo.workchains.sim_annealing

# -*- coding: utf-8 -*-
"""Simulated Annealing workchain"""

import os
import functools
import ase
from aiida.plugins import CalculationFactory, DataFactory, WorkflowFactory
from aiida.orm import Dict, Str
from aiida.engine import calcfunction
from aiida.engine import WorkChain, ToContext, append_, while_
from aiida_lsmo.utils import check_resize_unit_cell, aiida_cif_merge, validate_dict
from aiida_lsmo.calcfunctions.ff_builder_module import load_yaml

from .isotherm import get_molecule_dict, get_ff_parameters
from .parameters_schemas import FF_PARAMETERS_VALIDATOR, Required

# import sub-workchains
RaspaBaseWorkChain = WorkflowFactory('raspa.base')  # pylint: disable=invalid-name

# import calculations
FFBuilder = CalculationFactory('lsmo.ff_builder')  # pylint: disable=invalid-name

# import aiida data
CifData = DataFactory('cif')  # pylint: disable=invalid-name


# calcfunctions (in order of appearence)
[docs]@calcfunction def get_molecule_from_restart_file(structure_cif, molecule_folderdata, input_dict, molecule_dict): """Get a CifData file having the cell of the initial (unexpanded) structure and the geometry of the loaded molecule. TODO: this is source of error if there are more than one molecule AND the cell has been expanded, as you can not wrap them in the small cell. """ number_of_molecules = input_dict.get_dict()['number_of_molecules'] # Get the atom types of the molecule (ASE accepts only atomic eilements as "symbols") ff_data_molecule = load_yaml()[molecule_dict['name']][molecule_dict['forcefield']] symbolsff = [x[0] for x in ff_data_molecule['atomic_positions']] symbolsff *= number_of_molecules chem = [ff_data_molecule['atom_types'][atom]['pseudo_atom'][2] for atom in symbolsff] # Get the coordinates of the molecule from restart-file in the extended unit cell fobj = molecule_folderdata.get_object_content( os.path.join('Restart', 'System_0', molecule_folderdata.list_object_names(os.path.join('Restart', 'System_0'))[0])) # pylint: disable=protected-access lines_positions = [line.split()[3:6] for line in fobj.splitlines() if 'Adsorbate-atom-position:' in line] symbols_positions = list(zip(chem, lines_positions)) symbols_positions = [(name, position) for name, position in symbols_positions if name != 'M' ] # Removing dummy atoms. symbols, positions = list(zip(*symbols_positions)) # Get the cell, combine the ASE object and return the CifData cell = structure_cif.get_ase().get_cell() molecule_ase = ase.Atoms(symbols=symbols, cell=cell, positions=positions, pbc=True) molecule_ase.wrap() # wrap molecule in the unit cell return CifData(ase=molecule_ase)
[docs]@calcfunction def get_output_parameters(input_dict, min_out_dict, **nvt_out_dict): """Merge energy info from the calculations.""" if 'number_of_molecules' in input_dict.get_dict(): number_of_molecules = input_dict['number_of_molecules'] else: number_of_molecules = 1 temperature_list = input_dict['temperature_list'] out_dict = {'number_of_molecules': number_of_molecules, 'description': [], 'energy_unit': 'kJ/mol'} key_list = [ 'energy_host/ads_tot_final', 'energy_host/ads_vdw_final', 'energy_host/ads_coulomb_final', 'energy_ads/ads_tot_final', 'energy_ads/ads_vdw_final', 'energy_ads/ads_coulomb_final', ] for key in key_list: out_dict[key] = [] for i, temp in enumerate(temperature_list): out_dict['description'].append('NVT simulation at {} K'.format(temp)) nvt_out_dict_i = nvt_out_dict['RaspaNVT_{}'.format(i + 1)] for key in key_list: out_dict[key].append(nvt_out_dict_i['framework_1']['general'][key]) out_dict['description'].append('Final energy minimization') for key in key_list: out_dict[key].append(min_out_dict['framework_1']['general'][key]) return Dict(dict=out_dict)
[docs]class SimAnnealingWorkChain(WorkChain): """A work chain to compute the minimum energy geometry of a molecule inside a framework, using simulated annealing, i.e., decreasing the temperature of a Monte Carlo simulation and finally running and energy minimization step. """ parameters_schema = FF_PARAMETERS_VALIDATOR.extend({ Required('temperature_list', default=[300, 250, 200, 250, 100, 50], description='List of decreasing temperatures for the annealing.'): list, Required('mc_steps', default=int(1e3), description='Number of MC cycles.'): int, Required('number_of_molecules', default=1, description='Number of molecules loaded in the framework.'): int }) parameters_info = parameters_schema.schema # shorthand for printing
[docs] @classmethod def define(cls, spec): super().define(spec) spec.expose_inputs(RaspaBaseWorkChain, namespace='raspa_base', exclude=['raspa.structure', 'raspa.parameters']) spec.input('structure', valid_type=CifData, help='Adsorbent framework CIF.') spec.input('molecule', valid_type=(Str, Dict), help='Adsorbate molecule: settings to be read from the yaml.' + 'Advanced: input a Dict for non-standard settings.') spec.input('parameters', valid_type=Dict, validator=functools.partial(validate_dict, schema=cls.parameters_schema), help='Parameters for the SimAnnealing workchain: will be merged with default ones.') spec.outline(cls.setup, while_(cls.should_run_nvt)(cls.run_raspa_nvt,), cls.run_raspa_min, cls.return_results) spec.output('loaded_molecule', valid_type=CifData, help='CIF containing the final postition of the molecule.') spec.output('loaded_structure', valid_type=CifData, help='CIF containing the loaded structure.') spec.output( 'output_parameters', valid_type=Dict, required=False, #DO later: this needs to implement the parsing of final energies in aiida-raspa help='Information about the final configuration.')
[docs] def _get_raspa_nvt_param(self): """Write Raspa input parameters from scratch, for an MC NVT calculation""" param = { 'GeneralSettings': { 'SimulationType': 'MonteCarlo', 'NumberOfCycles': self.ctx.parameters['mc_steps'], 'PrintPropertiesEvery': int(1e10), 'PrintEvery': self.ctx.parameters['mc_steps'] / 100, 'RemoveAtomNumberCodeFromLabel': True, # BE CAREFULL: needed in AiiDA-1.0.0 because of github.com/aiidateam/aiida-core/issues/3304 'Forcefield': 'Local', 'UseChargesFromCIFFile': 'yes', 'CutOff': self.ctx.parameters['ff_cutoff'], }, 'System': { 'framework_1': { 'type': 'Framework', } }, 'Component': { self.ctx.molecule['name']: { 'MoleculeDefinition': 'Local', 'TranslationProbability': 1.0, 'ReinsertionProbability': 1.0, 'CreateNumberOfMolecules': self.ctx.parameters['number_of_molecules'], }, }, } # Check particular conditions and settings mult = check_resize_unit_cell(self.inputs.structure, 2 * self.ctx.parameters['ff_cutoff']) param['System']['framework_1']['UnitCells'] = '{} {} {}'.format(mult[0], mult[1], mult[2]) if 'block_pocket' in self.ctx.inp['raspa']: param['Component'][self.ctx.molecule['name']]['BlockPocketsFileName'] = 'block_file' if not self.ctx.molecule['singlebead']: param['Component'][self.ctx.molecule['name']].update({'RotationProbability': 1.0}) if self.ctx.molecule['charged']: # NOTE: `Chargemethod Ewald` is the default in Raspa! param['GeneralSettings'].update({'ChargeMethod': 'Ewald', 'EwaldPrecision': 1e-6}) else: param['GeneralSettings'].update({'ChargeMethod': 'None'}) return param
[docs] def setup(self): """Initialize the parameters""" # Get the molecule Dict from the yaml or directly as an input if isinstance(self.inputs.molecule, Str): self.ctx.molecule = get_molecule_dict(self.inputs.molecule) elif isinstance(self.inputs.molecule, Dict): self.ctx.molecule = self.inputs.molecule # Get the parameters Dict, merging defaults with user settings @calcfunction def get_valid_dict(dict_node): return Dict(dict=self.parameters_schema(dict_node.get_dict())) self.ctx.parameters = get_valid_dict(self.inputs.parameters) # Initialize the input for raspa_base, which later will need only minor updates self.ctx.inp = self.exposed_inputs(RaspaBaseWorkChain, 'raspa_base') self.ctx.inp['raspa']['framework'] = {'framework_1': self.inputs.structure} self.ctx.raspa_param = self._get_raspa_nvt_param() # Generate the force field with the ff_builder ff_params = get_ff_parameters(self.ctx.molecule, self.ctx.parameters) files_dict = FFBuilder(ff_params) self.ctx.inp['raspa']['file'] = files_dict self.ctx.count = 0
[docs] def should_run_nvt(self): """Update temperature untill the last of the list.""" if self.ctx.count == 1: #Placed here to work even with just one temperature in temperature_list self.ctx.raspa_param['Component'][self.ctx.molecule['name']]['CreateNumberOfMolecules'] = 0 return self.ctx.count < len(self.ctx.parameters['temperature_list'])
[docs] def run_raspa_nvt(self): """Run a NVT calculation in Raspa.""" temperature = self.ctx.parameters['temperature_list'][self.ctx.count] self.ctx.raspa_param['System']['framework_1']['ExternalTemperature'] = temperature if self.ctx.count > 0: self.ctx.inp['raspa']['retrieved_parent_folder'] = self.ctx.raspa_nvt[self.ctx.count - 1].outputs.retrieved self.ctx.inp['raspa']['parameters'] = Dict(dict=self.ctx.raspa_param) self.ctx.inp['metadata']['label'] = 'RaspaNVT_{}'.format(self.ctx.count + 1) self.ctx.inp['metadata']['call_link_label'] = 'run_raspa_nvt_{}'.format(self.ctx.count + 1) running = self.submit(RaspaBaseWorkChain, **self.ctx.inp) self.report('Running Raspa NVT ({} of {})'.format(self.ctx.count + 1, len(self.ctx.parameters['temperature_list']))) self.ctx.count += 1 return ToContext(raspa_nvt=append_(running))
[docs] def run_raspa_min(self): """Run a Energy Minimization in Raspa.""" # Update parameters Dict and labels self.ctx.raspa_param['GeneralSettings'].update({ 'SimulationType': 'Minimization', 'NumberOfInitializationCycles': 0, # no NVT steps 'NumberOfCycles': 1, # do only one Minimization 'PrintEvery': 1, # not readen: it prints anyway all min iterations 'MaximumNumberOfMinimizationSteps': 10000, # it will stop before, if converged 'RMSGradientTolerance': 1e-6, 'MaxGradientTolerance': 1e-6, }) self.ctx.inp['raspa']['parameters'] = Dict(dict=self.ctx.raspa_param) self.ctx.inp['raspa']['retrieved_parent_folder'] = self.ctx.raspa_nvt[self.ctx.count - 1].outputs.retrieved self.ctx.inp['metadata']['label'] = 'RaspaMin' self.ctx.inp['metadata']['call_link_label'] = 'run_raspa_min' # Create the calculation process, launch it and update pressure index running = self.submit(RaspaBaseWorkChain, **self.ctx.inp) self.report('Running Raspa final minimization') return ToContext(raspa_min=running)
[docs] def return_results(self): """Return molecule position and energy info.""" self.out( 'loaded_molecule', get_molecule_from_restart_file(self.inputs.structure, self.ctx.raspa_min.outputs.retrieved, self.ctx.parameters, self.ctx.molecule)) self.out('loaded_structure', aiida_cif_merge(self.inputs.structure, self.outputs['loaded_molecule'])) nvt_out_dict = {} for calc in self.ctx.raspa_nvt: nvt_out_dict[calc.label] = calc.outputs.output_parameters self.out( 'output_parameters', get_output_parameters(input_dict=self.ctx.parameters, min_out_dict=self.ctx.raspa_min.outputs.output_parameters, **nvt_out_dict)) self.report( 'Work chain competed! Molecule CifData<{}>, loaded structure CifData<{}>, output parameters Dict<{}>'. format(self.outputs['loaded_molecule'].pk, self.outputs['loaded_structure'].pk, self.outputs['output_parameters'].pk))