Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Structure optimization. """
3import time
4from math import sqrt
5from os.path import isfile
7from ase.io.jsonio import read_json, write_json
8from ase.calculators.calculator import PropertyNotImplementedError
9from ase.parallel import world, barrier
10from ase.io.trajectory import Trajectory
11from ase.utils import IOContext
12import collections.abc
15class RestartError(RuntimeError):
16 pass
19class Dynamics(IOContext):
20 """Base-class for all MD and structure optimization classes."""
22 def __init__(
23 self, atoms, logfile, trajectory, append_trajectory=False, master=None
24 ):
25 """Dynamics object.
27 Parameters:
29 atoms: Atoms object
30 The Atoms object to operate on.
32 logfile: file object or str
33 If *logfile* is a string, a file with that name will be opened.
34 Use '-' for stdout.
36 trajectory: Trajectory object or str
37 Attach trajectory object. If *trajectory* is a string a
38 Trajectory will be constructed. Use *None* for no
39 trajectory.
41 append_trajectory: boolean
42 Defaults to False, which causes the trajectory file to be
43 overwriten each time the dynamics is restarted from scratch.
44 If True, the new structures are appended to the trajectory
45 file instead.
47 master: boolean
48 Defaults to None, which causes only rank 0 to save files. If
49 set to true, this rank will save files.
50 """
52 self.atoms = atoms
53 self.logfile = self.openfile(logfile, mode='a', comm=world)
54 self.observers = []
55 self.nsteps = 0
56 # maximum number of steps placeholder with maxint
57 self.max_steps = 100000000
59 if trajectory is not None:
60 if isinstance(trajectory, str):
61 mode = "a" if append_trajectory else "w"
62 trajectory = self.closelater(Trajectory(
63 trajectory, mode=mode, atoms=atoms, master=master
64 ))
65 self.attach(trajectory)
67 def get_number_of_steps(self):
68 return self.nsteps
70 def insert_observer(
71 self, function, position=0, interval=1, *args, **kwargs
72 ):
73 """Insert an observer."""
74 if not isinstance(function, collections.abc.Callable):
75 function = function.write
76 self.observers.insert(position, (function, interval, args, kwargs))
78 def attach(self, function, interval=1, *args, **kwargs):
79 """Attach callback function.
81 If *interval > 0*, at every *interval* steps, call *function* with
82 arguments *args* and keyword arguments *kwargs*.
84 If *interval <= 0*, after step *interval*, call *function* with
85 arguments *args* and keyword arguments *kwargs*. This is
86 currently zero indexed."""
88 if hasattr(function, "set_description"):
89 d = self.todict()
90 d.update(interval=interval)
91 function.set_description(d)
92 if not hasattr(function, "__call__"):
93 function = function.write
94 self.observers.append((function, interval, args, kwargs))
96 def call_observers(self):
97 for function, interval, args, kwargs in self.observers:
98 call = False
99 # Call every interval iterations
100 if interval > 0:
101 if (self.nsteps % interval) == 0:
102 call = True
103 # Call only on iteration interval
104 elif interval <= 0:
105 if self.nsteps == abs(interval):
106 call = True
107 if call:
108 function(*args, **kwargs)
110 def irun(self):
111 """Run dynamics algorithm as generator. This allows, e.g.,
112 to easily run two optimizers or MD thermostats at the same time.
114 Examples:
115 >>> opt1 = BFGS(atoms)
116 >>> opt2 = BFGS(StrainFilter(atoms)).irun()
117 >>> for _ in opt2:
118 >>> opt1.run()
119 """
121 # compute initial structure and log the first step
122 self.atoms.get_forces()
124 # yield the first time to inspect before logging
125 yield False
127 if self.nsteps == 0:
128 self.log()
129 self.call_observers()
131 # run the algorithm until converged or max_steps reached
132 while not self.converged() and self.nsteps < self.max_steps:
134 # compute the next step
135 self.step()
136 self.nsteps += 1
138 # let the user inspect the step and change things before logging
139 # and predicting the next step
140 yield False
142 # log the step
143 self.log()
144 self.call_observers()
146 # finally check if algorithm was converged
147 yield self.converged()
149 def run(self):
150 """Run dynamics algorithm.
152 This method will return when the forces on all individual
153 atoms are less than *fmax* or when the number of steps exceeds
154 *steps*."""
156 for converged in Dynamics.irun(self):
157 pass
158 return converged
160 def converged(self, *args):
161 """" a dummy function as placeholder for a real criterion, e.g. in
162 Optimizer """
163 return False
165 def log(self, *args):
166 """ a dummy function as placeholder for a real logger, e.g. in
167 Optimizer """
168 return True
170 def step(self):
171 """this needs to be implemented by subclasses"""
172 raise RuntimeError("step not implemented.")
175class Optimizer(Dynamics):
176 """Base-class for all structure optimization classes."""
178 # default maxstep for all optimizers
179 defaults = {'maxstep': 0.2}
181 def __init__(
182 self,
183 atoms,
184 restart,
185 logfile,
186 trajectory,
187 master=None,
188 append_trajectory=False,
189 force_consistent=False,
190 ):
191 """Structure optimizer object.
193 Parameters:
195 atoms: Atoms object
196 The Atoms object to relax.
198 restart: str
199 Filename for restart file. Default value is *None*.
201 logfile: file object or str
202 If *logfile* is a string, a file with that name will be opened.
203 Use '-' for stdout.
205 trajectory: Trajectory object or str
206 Attach trajectory object. If *trajectory* is a string a
207 Trajectory will be constructed. Use *None* for no
208 trajectory.
210 master: boolean
211 Defaults to None, which causes only rank 0 to save files. If
212 set to true, this rank will save files.
214 append_trajectory: boolean
215 Appended to the trajectory file instead of overwriting it.
217 force_consistent: boolean or None
218 Use force-consistent energy calls (as opposed to the energy
219 extrapolated to 0 K). If force_consistent=None, uses
220 force-consistent energies if available in the calculator, but
221 falls back to force_consistent=False if not.
222 """
223 Dynamics.__init__(
224 self,
225 atoms,
226 logfile,
227 trajectory,
228 append_trajectory=append_trajectory,
229 master=master,
230 )
232 self.force_consistent = force_consistent
233 if self.force_consistent is None:
234 self.set_force_consistent()
236 self.restart = restart
238 # initialize attribute
239 self.fmax = None
241 if restart is None or not isfile(restart):
242 self.initialize()
243 else:
244 self.read()
245 barrier()
247 def todict(self):
248 description = {
249 "type": "optimization",
250 "optimizer": self.__class__.__name__,
251 }
252 return description
254 def initialize(self):
255 pass
257 def irun(self, fmax=0.05, steps=None):
258 """ call Dynamics.irun and keep track of fmax"""
259 self.fmax = fmax
260 if steps:
261 self.max_steps = steps
262 return Dynamics.irun(self)
264 def run(self, fmax=0.05, steps=None):
265 """ call Dynamics.run and keep track of fmax"""
266 self.fmax = fmax
267 if steps:
268 self.max_steps = steps
269 return Dynamics.run(self)
271 def converged(self, forces=None):
272 """Did the optimization converge?"""
273 if forces is None:
274 forces = self.atoms.get_forces()
275 if hasattr(self.atoms, "get_curvature"):
276 return (forces ** 2).sum(
277 axis=1
278 ).max() < self.fmax ** 2 and self.atoms.get_curvature() < 0.0
279 return (forces ** 2).sum(axis=1).max() < self.fmax ** 2
281 def log(self, forces=None):
282 if forces is None:
283 forces = self.atoms.get_forces()
284 fmax = sqrt((forces ** 2).sum(axis=1).max())
285 e = self.atoms.get_potential_energy(
286 force_consistent=self.force_consistent
287 )
288 T = time.localtime()
289 if self.logfile is not None:
290 name = self.__class__.__name__
291 if self.nsteps == 0:
292 args = (" " * len(name), "Step", "Time", "Energy", "fmax")
293 msg = "%s %4s %8s %15s %12s\n" % args
294 self.logfile.write(msg)
296 if self.force_consistent:
297 msg = "*Force-consistent energies used in optimization.\n"
298 self.logfile.write(msg)
300 ast = {1: "*", 0: ""}[self.force_consistent]
301 args = (name, self.nsteps, T[3], T[4], T[5], e, ast, fmax)
302 msg = "%s: %3d %02d:%02d:%02d %15.6f%1s %12.4f\n" % args
303 self.logfile.write(msg)
305 self.logfile.flush()
307 def dump(self, data):
308 if world.rank == 0 and self.restart is not None:
309 with open(self.restart, 'w') as fd:
310 write_json(fd, data)
312 def load(self):
313 with open(self.restart) as fd:
314 try:
315 return read_json(fd, always_array=False)
316 except Exception as ex:
317 msg = ('Could not decode restart file as JSON. '
318 f'You may need to delete the restart file {self.restart}')
319 raise RestartError(msg) from ex
321 def set_force_consistent(self):
322 """Automatically sets force_consistent to True if force_consistent
323 energies are supported by calculator; else False."""
324 try:
325 self.atoms.get_potential_energy(force_consistent=True)
326 except PropertyNotImplementedError:
327 self.force_consistent = False
328 else:
329 self.force_consistent = True