Coverage for /builds/debichem-team/python-ase/ase/calculators/lammpsrun.py: 87.68%
211 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-03-06 04:00 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-03-06 04:00 +0000
1"""ASE calculator for the LAMMPS classical MD code"""
2# lammps.py (2011/03/29)
3#
4# Copyright (C) 2009 - 2011 Joerg Meyer, joerg.meyer@ch.tum.de
5#
6# This library is free software; you can redistribute it and/or
7# modify it under the terms of the GNU Lesser General Public
8# License as published by the Free Software Foundation; either
9# version 2.1 of the License, or (at your option) any later version.
10#
11# This library is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Lesser General Public License for more details.
15#
16# You should have received a copy of the GNU Lesser General Public
17# License along with this file; if not, write to the Free Software
18# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
19# USA or see <http://www.gnu.org/licenses/>.
22import os
23import shlex
24import shutil
25import subprocess
26import warnings
27from re import IGNORECASE
28from re import compile as re_compile
29from tempfile import NamedTemporaryFile, mkdtemp
30from tempfile import mktemp as uns_mktemp
31from threading import Thread
32from typing import Any, Dict
34import numpy as np
36from ase.calculators.calculator import Calculator, all_changes
37from ase.calculators.lammps import (
38 CALCULATION_END_MARK,
39 Prism,
40 convert,
41 write_lammps_in,
42)
43from ase.data import atomic_masses, chemical_symbols
44from ase.io.lammpsdata import write_lammps_data
45from ase.io.lammpsrun import read_lammps_dump
47__all__ = ["LAMMPS"]
50class LAMMPS(Calculator):
51 """LAMMPS (https://lammps.sandia.gov/) calculator
53 The environment variable :envvar:`ASE_LAMMPSRUN_COMMAND` must be defined to
54 tell ASE how to call a LAMMPS binary. This should contain the path to the
55 lammps binary, or more generally, a command line possibly also including an
56 MPI-launcher command.
58 For example (in a Bourne-shell compatible environment):
60 .. code-block:: bash
62 export ASE_LAMMPSRUN_COMMAND=/path/to/lmp_binary
64 or possibly something similar to
66 .. code-block:: bash
68 export ASE_LAMMPSRUN_COMMAND="/path/to/mpirun --np 4 lmp_binary"
70 Parameters
71 ----------
72 files : list[str]
73 List of files needed by LAMMPS. Typically a list of potential files.
74 parameters : dict[str, Any]
75 Dictionary of settings to be passed into the input file for calculation.
76 specorder : list[str]
77 Within LAAMPS, atoms are identified by an integer value starting from 1.
78 This variable allows the user to define the order of the indices
79 assigned to the atoms in the calculation, with the default
80 if not given being alphabetical
81 keep_tmp_files : bool, default: False
82 Retain any temporary files created. Mostly useful for debugging.
83 tmp_dir : str, default: None
84 path/dirname (default None -> create automatically).
85 Explicitly control where the calculator object should create
86 its files. Using this option implies 'keep_tmp_files=True'.
87 no_data_file : bool, default: False
88 Controls whether an explicit data file will be used for feeding
89 atom coordinates into lammps. Enable it to lessen the pressure on
90 the (tmp) file system. THIS OPTION MIGHT BE UNRELIABLE FOR CERTAIN
91 CORNER CASES (however, if it fails, you will notice...).
92 keep_alive : bool, default: True
93 When using LAMMPS as a spawned subprocess, keep the subprocess
94 alive (but idling when unused) along with the calculator object.
95 always_triclinic : bool, default: False
96 Force LAMMPS to treat the cell as tilted, even if the cell is not
97 tilted, by printing ``xy``, ``xz``, ``yz`` in the data file.
98 reduce_cell : bool, default: False
99 If True, reduce cell to have shorter lattice vectors.
100 write_velocities : bool, default: False
101 If True, forward ASE velocities to LAMMPS.
102 verbose: bool, default: False
103 If True, print additional debugging output to STDOUT.
105 Examples
106 --------
107 Provided that the respective potential file is in the working directory,
108 one can simply run (note that LAMMPS needs to be compiled to work with EAM
109 potentials)
111 ::
113 from ase import Atom, Atoms
114 from ase.build import bulk
115 from ase.calculators.lammpsrun import LAMMPS
117 parameters = {'pair_style': 'eam/alloy',
118 'pair_coeff': ['* * NiAlH_jea.eam.alloy H Ni']}
120 files = ['NiAlH_jea.eam.alloy']
122 Ni = bulk('Ni', cubic=True)
123 H = Atom('H', position=Ni.cell.diagonal()/2)
124 NiH = Ni + H
126 lammps = LAMMPS(files=files, **parameters)
128 NiH.calc = lammps
129 print("Energy ", NiH.get_potential_energy())
130 """
132 name = "lammpsrun"
133 implemented_properties = ["energy", "free_energy", "forces", "stress",
134 "energies"]
136 # parameters to choose options in LAMMPSRUN
137 ase_parameters: Dict[str, Any] = dict(
138 specorder=None,
139 atorder=True,
140 always_triclinic=False,
141 reduce_cell=False,
142 keep_alive=True,
143 keep_tmp_files=False,
144 no_data_file=False,
145 tmp_dir=None,
146 files=[], # usually contains potential parameters
147 verbose=False,
148 write_velocities=False,
149 binary_dump=True, # bool - use binary dump files (full
150 # precision but long long ids are casted to
151 # double)
152 lammps_options="-echo log -screen none -log /dev/stdout",
153 trajectory_out=None, # file object, if is not None the
154 # trajectory will be saved in it
155 )
157 # parameters forwarded to LAMMPS
158 lammps_parameters = dict(
159 boundary=None, # bounadry conditions styles
160 units="metal", # str - Which units used; some potentials
161 # require certain units
162 atom_style="atomic",
163 special_bonds=None,
164 # potential informations
165 pair_style="lj/cut 2.5",
166 pair_coeff=["* * 1 1"],
167 masses=None,
168 pair_modify=None,
169 # variables controlling the output
170 thermo_args=[
171 "step", "temp", "press", "cpu", "pxx", "pyy", "pzz",
172 "pxy", "pxz", "pyz", "ke", "pe", "etotal", "vol", "lx",
173 "ly", "lz", "atoms", ],
174 dump_properties=["id", "type", "x", "y", "z", "vx", "vy",
175 "vz", "fx", "fy", "fz", ],
176 dump_period=1, # period of system snapshot saving (in MD steps)
177 )
179 default_parameters = dict(ase_parameters, **lammps_parameters)
181 def __init__(self, label="lammps", **kwargs):
182 super().__init__(label=label, **kwargs)
184 self.prism = None
185 self.calls = 0
186 self.forces = None
187 # thermo_content contains data "written by" thermo_style.
188 # It is a list of dictionaries, each dict (one for each line
189 # printed by thermo_style) contains a mapping between each
190 # custom_thermo_args-argument and the corresponding
191 # value as printed by lammps. thermo_content will be
192 # re-populated by the read_log method.
193 self.thermo_content = []
195 if self.parameters['tmp_dir'] is not None:
196 # If tmp_dir is pointing somewhere, don't remove stuff!
197 self.parameters['keep_tmp_files'] = True
198 self._lmp_handle = None # To handle the lmp process
200 if self.parameters['tmp_dir'] is None:
201 self.parameters['tmp_dir'] = mkdtemp(prefix="LAMMPS-")
202 else:
203 self.parameters['tmp_dir'] = os.path.realpath(
204 self.parameters['tmp_dir'])
205 if not os.path.isdir(self.parameters['tmp_dir']):
206 os.mkdir(self.parameters['tmp_dir'], 0o755)
208 for f in self.parameters['files']:
209 shutil.copy(
210 f, os.path.join(self.parameters['tmp_dir'],
211 os.path.basename(f)))
213 def get_lammps_command(self):
214 cmd = self.parameters.get('command')
216 if cmd is None:
217 from ase.config import cfg
218 envvar = f'ASE_{self.name.upper()}_COMMAND'
219 cmd = cfg.get(envvar)
221 if cmd is None:
222 # TODO deprecate and remove guesswork
223 cmd = 'lammps'
225 opts = self.parameters.get('lammps_options')
227 if opts is not None:
228 cmd = f'{cmd} {opts}'
230 return cmd
232 def clean(self, force=False):
234 self._lmp_end()
236 if not self.parameters['keep_tmp_files'] or force:
237 shutil.rmtree(self.parameters['tmp_dir'])
239 def check_state(self, atoms, tol=1.0e-10):
240 # Transforming the unit cell to conform to LAMMPS' convention for
241 # orientation (c.f. https://lammps.sandia.gov/doc/Howto_triclinic.html)
242 # results in some precision loss, so we use bit larger tolerance than
243 # machine precision here. Note that there can also be precision loss
244 # related to how many significant digits are specified for things in
245 # the LAMMPS input file.
246 return Calculator.check_state(self, atoms, tol)
248 def calculate(self, atoms=None, properties=None, system_changes=None):
249 if properties is None:
250 properties = self.implemented_properties
251 if system_changes is None:
252 system_changes = all_changes
253 Calculator.calculate(self, atoms, properties, system_changes)
254 self.run()
256 def _lmp_alive(self):
257 # Return True if this calculator is currently handling a running
258 # lammps process
259 return self._lmp_handle and not isinstance(
260 self._lmp_handle.poll(), int
261 )
263 def _lmp_end(self):
264 # Close lammps input and wait for lammps to end. Return process
265 # return value
266 if self._lmp_alive():
267 # !TODO: handle lammps error codes
268 try:
269 self._lmp_handle.communicate(timeout=5)
270 except subprocess.TimeoutExpired:
271 self._lmp_handle.kill()
272 self._lmp_handle.communicate()
273 err = self._lmp_handle.poll()
274 assert err is not None
275 return err
277 def set_missing_parameters(self):
278 """Verify that all necessary variables are set.
279 """
280 symbols = self.atoms.get_chemical_symbols()
281 # If unspecified default to atom types in alphabetic order
282 if not self.parameters.get('specorder'):
283 self.parameters['specorder'] = sorted(set(symbols))
285 # !TODO: handle cases were setting masses actual lead to errors
286 if not self.parameters.get('masses'):
287 self.parameters['masses'] = []
288 for type_id, specie in enumerate(self.parameters['specorder']):
289 mass = atomic_masses[chemical_symbols.index(specie)]
290 self.parameters['masses'] += [
291 f"{type_id + 1:d} {mass:f}"
292 ]
294 # set boundary condtions
295 if not self.parameters.get('boundary'):
296 b_str = " ".join(["fp"[int(x)] for x in self.atoms.pbc])
297 self.parameters['boundary'] = b_str
299 def run(self, set_atoms=False):
300 # !TODO: split this function
301 """Method which explicitly runs LAMMPS."""
302 pbc = self.atoms.get_pbc()
303 if all(pbc):
304 cell = self.atoms.get_cell()
305 elif not any(pbc):
306 # large enough cell for non-periodic calculation -
307 # LAMMPS shrink-wraps automatically via input command
308 # "periodic s s s"
309 # below
310 cell = 2 * np.max(np.abs(self.atoms.get_positions())) * np.eye(3)
311 else:
312 warnings.warn(
313 "semi-periodic ASE cell detected - translation "
314 + "to proper LAMMPS input cell might fail"
315 )
316 cell = self.atoms.get_cell()
317 self.prism = Prism(cell)
319 self.set_missing_parameters()
320 self.calls += 1
322 # change into subdirectory for LAMMPS calculations
323 tempdir = self.parameters['tmp_dir']
325 # setup file names for LAMMPS calculation
326 label = f"{self.label}{self.calls:>06}"
327 lammps_in = uns_mktemp(
328 prefix="in_" + label, dir=tempdir
329 )
330 lammps_log = uns_mktemp(
331 prefix="log_" + label, dir=tempdir
332 )
333 lammps_trj_fd = NamedTemporaryFile(
334 prefix="trj_" + label,
335 suffix=(".bin" if self.parameters['binary_dump'] else ""),
336 dir=tempdir,
337 delete=(not self.parameters['keep_tmp_files']),
338 )
339 lammps_trj = lammps_trj_fd.name
340 if self.parameters['no_data_file']:
341 lammps_data = None
342 else:
343 lammps_data_fd = NamedTemporaryFile(
344 prefix="data_" + label,
345 dir=tempdir,
346 delete=(not self.parameters['keep_tmp_files']),
347 mode='w',
348 encoding='ascii'
349 )
350 write_lammps_data(
351 lammps_data_fd,
352 self.atoms,
353 specorder=self.parameters['specorder'],
354 force_skew=self.parameters['always_triclinic'],
355 reduce_cell=self.parameters['reduce_cell'],
356 velocities=self.parameters['write_velocities'],
357 prismobj=self.prism,
358 units=self.parameters['units'],
359 atom_style=self.parameters['atom_style'],
360 )
361 lammps_data = lammps_data_fd.name
362 lammps_data_fd.flush()
364 # see to it that LAMMPS is started
365 if not self._lmp_alive():
366 command = self.get_lammps_command()
367 # Attempt to (re)start lammps
368 self._lmp_handle = subprocess.Popen(
369 shlex.split(command, posix=(os.name == "posix")),
370 stdin=subprocess.PIPE,
371 stdout=subprocess.PIPE,
372 encoding='ascii',
373 )
374 lmp_handle = self._lmp_handle
376 # Create thread reading lammps stdout (for reference, if requested,
377 # also create lammps_log, although it is never used)
378 if self.parameters['keep_tmp_files']:
379 lammps_log_fd = open(lammps_log, "w")
380 fd = SpecialTee(lmp_handle.stdout, lammps_log_fd)
381 else:
382 fd = lmp_handle.stdout
383 thr_read_log = Thread(target=self.read_lammps_log, args=(fd,))
384 thr_read_log.start()
386 # write LAMMPS input (for reference, also create the file lammps_in,
387 # although it is never used)
388 if self.parameters['keep_tmp_files']:
389 lammps_in_fd = open(lammps_in, "w")
390 fd = SpecialTee(lmp_handle.stdin, lammps_in_fd)
391 else:
392 fd = lmp_handle.stdin
393 write_lammps_in(
394 lammps_in=fd,
395 parameters=self.parameters,
396 atoms=self.atoms,
397 prismobj=self.prism,
398 lammps_trj=lammps_trj,
399 lammps_data=lammps_data,
400 )
402 if self.parameters['keep_tmp_files']:
403 lammps_in_fd.close()
405 # Wait for log output to be read (i.e., for LAMMPS to finish)
406 # and close the log file if there is one
407 thr_read_log.join()
408 if self.parameters['keep_tmp_files']:
409 lammps_log_fd.close()
411 if not self.parameters['keep_alive']:
412 self._lmp_end()
414 exitcode = lmp_handle.poll()
415 if exitcode and exitcode != 0:
416 raise RuntimeError(
417 "LAMMPS exited in {} with exit code: {}."
418 "".format(tempdir, exitcode)
419 )
421 # A few sanity checks
422 if len(self.thermo_content) == 0:
423 raise RuntimeError("Failed to retrieve any thermo_style-output")
424 if int(self.thermo_content[-1]["atoms"]) != len(self.atoms):
425 # This obviously shouldn't happen, but if prism.fold_...() fails,
426 # it could
427 raise RuntimeError("Atoms have gone missing")
429 trj_atoms = read_lammps_dump(
430 infileobj=lammps_trj,
431 order=self.parameters['atorder'],
432 index=-1,
433 prismobj=self.prism,
434 specorder=self.parameters['specorder'],
435 )
437 if set_atoms:
438 self.atoms = trj_atoms.copy()
440 self.forces = trj_atoms.get_forces()
441 # !TODO: trj_atoms is only the last snapshot of the system; Is it
442 # desirable to save also the inbetween steps?
443 if self.parameters['trajectory_out'] is not None:
444 # !TODO: is it advisable to create here temporary atoms-objects
445 self.trajectory_out.write(trj_atoms)
447 tc = self.thermo_content[-1]
448 self.results["energy"] = convert(
449 tc["pe"], "energy", self.parameters["units"], "ASE"
450 )
451 self.results["free_energy"] = self.results["energy"]
452 self.results['forces'] = convert(self.forces.copy(),
453 'force',
454 self.parameters['units'],
455 'ASE')
456 stress = np.array(
457 [-tc[i] for i in ("pxx", "pyy", "pzz", "pyz", "pxz", "pxy")]
458 )
460 # We need to apply the Lammps rotation stuff to the stress:
461 xx, yy, zz, yz, xz, xy = stress
462 stress_tensor = np.array([[xx, xy, xz],
463 [xy, yy, yz],
464 [xz, yz, zz]])
465 stress_atoms = self.prism.tensor2_to_ase(stress_tensor)
466 stress_atoms = stress_atoms[[0, 1, 2, 1, 0, 0],
467 [0, 1, 2, 2, 2, 1]]
468 stress = stress_atoms
470 self.results["stress"] = convert(
471 stress, "pressure", self.parameters["units"], "ASE"
472 )
474 lammps_trj_fd.close()
475 if not self.parameters['no_data_file']:
476 lammps_data_fd.close()
478 def __enter__(self):
479 return self
481 def __exit__(self, *args):
482 self._lmp_end()
484 def read_lammps_log(self, fileobj):
485 # !TODO: somehow communicate 'thermo_content' explicitly
486 """Method which reads a LAMMPS output log file."""
488 # read_log depends on that the first (three) thermo_style custom args
489 # can be capitalized and matched against the log output. I.e.
490 # don't use e.g. 'ke' or 'cpu' which are labeled KinEng and CPU.
491 mark_re = r"^\s*" + r"\s+".join(
492 [x.capitalize() for x in self.parameters['thermo_args'][0:3]]
493 )
494 _custom_thermo_mark = re_compile(mark_re)
496 # !TODO: regex-magic necessary?
497 # Match something which can be converted to a float
498 f_re = r"([+-]?(?:(?:\d+(?:\.\d*)?|\.\d+)(?:e[+-]?\d+)?|nan|inf))"
499 n_args = len(self.parameters["thermo_args"])
500 # Create a re matching exactly N white space separated floatish things
501 _custom_thermo_re = re_compile(
502 r"^\s*" + r"\s+".join([f_re] * n_args) + r"\s*$", flags=IGNORECASE
503 )
505 thermo_content = []
506 line = fileobj.readline()
507 while line and line.strip() != CALCULATION_END_MARK:
508 # check error
509 if 'ERROR:' in line:
510 raise RuntimeError(f'LAMMPS exits with error message: {line}')
512 # get thermo output
513 if _custom_thermo_mark.match(line):
514 while True:
515 line = fileobj.readline()
516 if 'WARNING:' in line:
517 continue
519 bool_match = _custom_thermo_re.match(line)
520 if not bool_match:
521 break
523 # create a dictionary between each of the
524 # thermo_style args and it's corresponding value
525 thermo_content.append(
526 dict(
527 zip(
528 self.parameters['thermo_args'],
529 map(float, bool_match.groups()),
530 )
531 )
532 )
533 else:
534 line = fileobj.readline()
536 self.thermo_content = thermo_content
539class SpecialTee:
540 """A special purpose, with limited applicability, tee-like thing.
542 A subset of stuff read from, or written to, orig_fd,
543 is also written to out_fd.
544 It is used by the lammps calculator for creating file-logs of stuff
545 read from, or written to, stdin and stdout, respectively.
546 """
548 def __init__(self, orig_fd, out_fd):
549 self._orig_fd = orig_fd
550 self._out_fd = out_fd
551 self.name = orig_fd.name
553 def write(self, data):
554 self._orig_fd.write(data)
555 self._out_fd.write(data)
556 self.flush()
558 def read(self, *args, **kwargs):
559 data = self._orig_fd.read(*args, **kwargs)
560 self._out_fd.write(data)
561 return data
563 def readline(self, *args, **kwargs):
564 data = self._orig_fd.readline(*args, **kwargs)
565 self._out_fd.write(data)
566 return data
568 def readlines(self, *args, **kwargs):
569 data = self._orig_fd.readlines(*args, **kwargs)
570 self._out_fd.write("".join(data))
571 return data
573 def flush(self):
574 self._orig_fd.flush()
575 self._out_fd.flush()