Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""Structure optimization. """ 

2 

3import time 

4from math import sqrt 

5from os.path import isfile 

6 

7from ase.io.jsonio import read_json, write_json 

8from ase.calculators.calculator import PropertyNotImplementedError 

9from ase.parallel import world, barrier 

10from ase.io.trajectory import Trajectory 

11from ase.utils import IOContext 

12import collections.abc 

13 

14 

15class RestartError(RuntimeError): 

16 pass 

17 

18 

19class Dynamics(IOContext): 

20 """Base-class for all MD and structure optimization classes.""" 

21 

22 def __init__( 

23 self, atoms, logfile, trajectory, append_trajectory=False, master=None 

24 ): 

25 """Dynamics object. 

26 

27 Parameters: 

28 

29 atoms: Atoms object 

30 The Atoms object to operate on. 

31 

32 logfile: file object or str 

33 If *logfile* is a string, a file with that name will be opened. 

34 Use '-' for stdout. 

35 

36 trajectory: Trajectory object or str 

37 Attach trajectory object. If *trajectory* is a string a 

38 Trajectory will be constructed. Use *None* for no 

39 trajectory. 

40 

41 append_trajectory: boolean 

42 Defaults to False, which causes the trajectory file to be 

43 overwriten each time the dynamics is restarted from scratch. 

44 If True, the new structures are appended to the trajectory 

45 file instead. 

46 

47 master: boolean 

48 Defaults to None, which causes only rank 0 to save files. If 

49 set to true, this rank will save files. 

50 """ 

51 

52 self.atoms = atoms 

53 self.logfile = self.openfile(logfile, mode='a', comm=world) 

54 self.observers = [] 

55 self.nsteps = 0 

56 # maximum number of steps placeholder with maxint 

57 self.max_steps = 100000000 

58 

59 if trajectory is not None: 

60 if isinstance(trajectory, str): 

61 mode = "a" if append_trajectory else "w" 

62 trajectory = self.closelater(Trajectory( 

63 trajectory, mode=mode, atoms=atoms, master=master 

64 )) 

65 self.attach(trajectory) 

66 

67 def get_number_of_steps(self): 

68 return self.nsteps 

69 

70 def insert_observer( 

71 self, function, position=0, interval=1, *args, **kwargs 

72 ): 

73 """Insert an observer.""" 

74 if not isinstance(function, collections.abc.Callable): 

75 function = function.write 

76 self.observers.insert(position, (function, interval, args, kwargs)) 

77 

78 def attach(self, function, interval=1, *args, **kwargs): 

79 """Attach callback function. 

80 

81 If *interval > 0*, at every *interval* steps, call *function* with 

82 arguments *args* and keyword arguments *kwargs*. 

83 

84 If *interval <= 0*, after step *interval*, call *function* with 

85 arguments *args* and keyword arguments *kwargs*. This is 

86 currently zero indexed.""" 

87 

88 if hasattr(function, "set_description"): 

89 d = self.todict() 

90 d.update(interval=interval) 

91 function.set_description(d) 

92 if not hasattr(function, "__call__"): 

93 function = function.write 

94 self.observers.append((function, interval, args, kwargs)) 

95 

96 def call_observers(self): 

97 for function, interval, args, kwargs in self.observers: 

98 call = False 

99 # Call every interval iterations 

100 if interval > 0: 

101 if (self.nsteps % interval) == 0: 

102 call = True 

103 # Call only on iteration interval 

104 elif interval <= 0: 

105 if self.nsteps == abs(interval): 

106 call = True 

107 if call: 

108 function(*args, **kwargs) 

109 

110 def irun(self): 

111 """Run dynamics algorithm as generator. This allows, e.g., 

112 to easily run two optimizers or MD thermostats at the same time. 

113 

114 Examples: 

115 >>> opt1 = BFGS(atoms) 

116 >>> opt2 = BFGS(StrainFilter(atoms)).irun() 

117 >>> for _ in opt2: 

118 >>> opt1.run() 

119 """ 

120 

121 # compute initial structure and log the first step 

122 self.atoms.get_forces() 

123 

124 # yield the first time to inspect before logging 

125 yield False 

126 

127 if self.nsteps == 0: 

128 self.log() 

129 self.call_observers() 

130 

131 # run the algorithm until converged or max_steps reached 

132 while not self.converged() and self.nsteps < self.max_steps: 

133 

134 # compute the next step 

135 self.step() 

136 self.nsteps += 1 

137 

138 # let the user inspect the step and change things before logging 

139 # and predicting the next step 

140 yield False 

141 

142 # log the step 

143 self.log() 

144 self.call_observers() 

145 

146 # finally check if algorithm was converged 

147 yield self.converged() 

148 

149 def run(self): 

150 """Run dynamics algorithm. 

151 

152 This method will return when the forces on all individual 

153 atoms are less than *fmax* or when the number of steps exceeds 

154 *steps*.""" 

155 

156 for converged in Dynamics.irun(self): 

157 pass 

158 return converged 

159 

160 def converged(self, *args): 

161 """" a dummy function as placeholder for a real criterion, e.g. in 

162 Optimizer """ 

163 return False 

164 

165 def log(self, *args): 

166 """ a dummy function as placeholder for a real logger, e.g. in 

167 Optimizer """ 

168 return True 

169 

170 def step(self): 

171 """this needs to be implemented by subclasses""" 

172 raise RuntimeError("step not implemented.") 

173 

174 

175class Optimizer(Dynamics): 

176 """Base-class for all structure optimization classes.""" 

177 

178 # default maxstep for all optimizers 

179 defaults = {'maxstep': 0.2} 

180 

181 def __init__( 

182 self, 

183 atoms, 

184 restart, 

185 logfile, 

186 trajectory, 

187 master=None, 

188 append_trajectory=False, 

189 force_consistent=False, 

190 ): 

191 """Structure optimizer object. 

192 

193 Parameters: 

194 

195 atoms: Atoms object 

196 The Atoms object to relax. 

197 

198 restart: str 

199 Filename for restart file. Default value is *None*. 

200 

201 logfile: file object or str 

202 If *logfile* is a string, a file with that name will be opened. 

203 Use '-' for stdout. 

204 

205 trajectory: Trajectory object or str 

206 Attach trajectory object. If *trajectory* is a string a 

207 Trajectory will be constructed. Use *None* for no 

208 trajectory. 

209 

210 master: boolean 

211 Defaults to None, which causes only rank 0 to save files. If 

212 set to true, this rank will save files. 

213 

214 append_trajectory: boolean 

215 Appended to the trajectory file instead of overwriting it. 

216 

217 force_consistent: boolean or None 

218 Use force-consistent energy calls (as opposed to the energy 

219 extrapolated to 0 K). If force_consistent=None, uses 

220 force-consistent energies if available in the calculator, but 

221 falls back to force_consistent=False if not. 

222 """ 

223 Dynamics.__init__( 

224 self, 

225 atoms, 

226 logfile, 

227 trajectory, 

228 append_trajectory=append_trajectory, 

229 master=master, 

230 ) 

231 

232 self.force_consistent = force_consistent 

233 if self.force_consistent is None: 

234 self.set_force_consistent() 

235 

236 self.restart = restart 

237 

238 # initialize attribute 

239 self.fmax = None 

240 

241 if restart is None or not isfile(restart): 

242 self.initialize() 

243 else: 

244 self.read() 

245 barrier() 

246 

247 def todict(self): 

248 description = { 

249 "type": "optimization", 

250 "optimizer": self.__class__.__name__, 

251 } 

252 return description 

253 

254 def initialize(self): 

255 pass 

256 

257 def irun(self, fmax=0.05, steps=None): 

258 """ call Dynamics.irun and keep track of fmax""" 

259 self.fmax = fmax 

260 if steps: 

261 self.max_steps = steps 

262 return Dynamics.irun(self) 

263 

264 def run(self, fmax=0.05, steps=None): 

265 """ call Dynamics.run and keep track of fmax""" 

266 self.fmax = fmax 

267 if steps: 

268 self.max_steps = steps 

269 return Dynamics.run(self) 

270 

271 def converged(self, forces=None): 

272 """Did the optimization converge?""" 

273 if forces is None: 

274 forces = self.atoms.get_forces() 

275 if hasattr(self.atoms, "get_curvature"): 

276 return (forces ** 2).sum( 

277 axis=1 

278 ).max() < self.fmax ** 2 and self.atoms.get_curvature() < 0.0 

279 return (forces ** 2).sum(axis=1).max() < self.fmax ** 2 

280 

281 def log(self, forces=None): 

282 if forces is None: 

283 forces = self.atoms.get_forces() 

284 fmax = sqrt((forces ** 2).sum(axis=1).max()) 

285 e = self.atoms.get_potential_energy( 

286 force_consistent=self.force_consistent 

287 ) 

288 T = time.localtime() 

289 if self.logfile is not None: 

290 name = self.__class__.__name__ 

291 if self.nsteps == 0: 

292 args = (" " * len(name), "Step", "Time", "Energy", "fmax") 

293 msg = "%s %4s %8s %15s %12s\n" % args 

294 self.logfile.write(msg) 

295 

296 if self.force_consistent: 

297 msg = "*Force-consistent energies used in optimization.\n" 

298 self.logfile.write(msg) 

299 

300 ast = {1: "*", 0: ""}[self.force_consistent] 

301 args = (name, self.nsteps, T[3], T[4], T[5], e, ast, fmax) 

302 msg = "%s: %3d %02d:%02d:%02d %15.6f%1s %12.4f\n" % args 

303 self.logfile.write(msg) 

304 

305 self.logfile.flush() 

306 

307 def dump(self, data): 

308 if world.rank == 0 and self.restart is not None: 

309 with open(self.restart, 'w') as fd: 

310 write_json(fd, data) 

311 

312 def load(self): 

313 with open(self.restart) as fd: 

314 try: 

315 return read_json(fd, always_array=False) 

316 except Exception as ex: 

317 msg = ('Could not decode restart file as JSON. ' 

318 f'You may need to delete the restart file {self.restart}') 

319 raise RestartError(msg) from ex 

320 

321 def set_force_consistent(self): 

322 """Automatically sets force_consistent to True if force_consistent 

323 energies are supported by calculator; else False.""" 

324 try: 

325 self.atoms.get_potential_energy(force_consistent=True) 

326 except PropertyNotImplementedError: 

327 self.force_consistent = False 

328 else: 

329 self.force_consistent = True