Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import warnings 

2from typing import Tuple 

3 

4import numpy as np 

5 

6from ase import __version__ 

7from ase.calculators.singlepoint import SinglePointCalculator, all_properties 

8from ase.constraints import dict2constraint 

9from ase.calculators.calculator import PropertyNotImplementedError 

10from ase.atoms import Atoms 

11from ase.io.jsonio import encode, decode 

12from ase.io.pickletrajectory import PickleTrajectory 

13from ase.parallel import world 

14from ase.utils import tokenize_version 

15 

16 

17__all__ = ['Trajectory', 'PickleTrajectory'] 

18 

19 

20def Trajectory(filename, mode='r', atoms=None, properties=None, master=None): 

21 """A Trajectory can be created in read, write or append mode. 

22 

23 Parameters: 

24 

25 filename: str 

26 The name of the file. Traditionally ends in .traj. 

27 mode: str 

28 The mode. 'r' is read mode, the file should already exist, and 

29 no atoms argument should be specified. 

30 'w' is write mode. The atoms argument specifies the Atoms 

31 object to be written to the file, if not given it must instead 

32 be given as an argument to the write() method. 

33 'a' is append mode. It acts as write mode, except that 

34 data is appended to a preexisting file. 

35 atoms: Atoms object 

36 The Atoms object to be written in write or append mode. 

37 properties: list of str 

38 If specified, these calculator properties are saved in the 

39 trajectory. If not specified, all supported quantities are 

40 saved. Possible values: energy, forces, stress, dipole, 

41 charges, magmom and magmoms. 

42 master: bool 

43 Controls which process does the actual writing. The 

44 default is that process number 0 does this. If this 

45 argument is given, processes where it is True will write. 

46 

47 The atoms, properties and master arguments are ignores in read mode. 

48 """ 

49 if mode == 'r': 

50 return TrajectoryReader(filename) 

51 return TrajectoryWriter(filename, mode, atoms, properties, master=master) 

52 

53 

54class TrajectoryWriter: 

55 """Writes Atoms objects to a .traj file.""" 

56 def __init__(self, filename, mode='w', atoms=None, properties=None, 

57 extra=[], master=None): 

58 """A Trajectory writer, in write or append mode. 

59 

60 Parameters: 

61 

62 filename: str 

63 The name of the file. Traditionally ends in .traj. 

64 mode: str 

65 The mode. 'r' is read mode, the file should already exist, and 

66 no atoms argument should be specified. 

67 'w' is write mode. The atoms argument specifies the Atoms 

68 object to be written to the file, if not given it must instead 

69 be given as an argument to the write() method. 

70 'a' is append mode. It acts as write mode, except that 

71 data is appended to a preexisting file. 

72 atoms: Atoms object 

73 The Atoms object to be written in write or append mode. 

74 properties: list of str 

75 If specified, these calculator properties are saved in the 

76 trajectory. If not specified, all supported quantities are 

77 saved. Possible values: energy, forces, stress, dipole, 

78 charges, magmom and magmoms. 

79 master: bool 

80 Controls which process does the actual writing. The 

81 default is that process number 0 does this. If this 

82 argument is given, processes where it is True will write. 

83 """ 

84 if master is None: 

85 master = (world.rank == 0) 

86 self.master = master 

87 self.atoms = atoms 

88 self.properties = properties 

89 

90 self.description = {} 

91 self.header_data = None 

92 self.multiple_headers = False 

93 

94 self._open(filename, mode) 

95 

96 def __enter__(self): 

97 return self 

98 

99 def __exit__(self, exc_type, exc_value, tb): 

100 self.close() 

101 

102 def set_description(self, description): 

103 self.description.update(description) 

104 

105 def _open(self, filename, mode): 

106 import ase.io.ulm as ulm 

107 if mode not in 'aw': 

108 raise ValueError('mode must be "w" or "a".') 

109 if self.master: 

110 self.backend = ulm.open(filename, mode, tag='ASE-Trajectory') 

111 if len(self.backend) > 0 and mode == 'a': 

112 with Trajectory(filename) as traj: 

113 atoms = traj[0] 

114 self.header_data = get_header_data(atoms) 

115 else: 

116 self.backend = ulm.DummyWriter() 

117 

118 def write(self, atoms=None, **kwargs): 

119 """Write the atoms to the file. 

120 

121 If the atoms argument is not given, the atoms object specified 

122 when creating the trajectory object is used. 

123 

124 Use keyword arguments to add extra properties:: 

125 

126 writer.write(atoms, energy=117, dipole=[0, 0, 1.0]) 

127 """ 

128 if atoms is None: 

129 atoms = self.atoms 

130 

131 for image in atoms.iterimages(): 

132 self._write_atoms(image, **kwargs) 

133 

134 def _write_atoms(self, atoms, **kwargs): 

135 b = self.backend 

136 

137 if self.header_data is None: 

138 b.write(version=1, ase_version=__version__) 

139 if self.description: 

140 b.write(description=self.description) 

141 # Atomic numbers and periodic boundary conditions are written 

142 # in the header in the beginning. 

143 # 

144 # If an image later on has other numbers/pbc, we write a new 

145 # header. All subsequent images will then have their own header 

146 # whether or not their numbers/pbc change. 

147 self.header_data = get_header_data(atoms) 

148 write_header = True 

149 else: 

150 if not self.multiple_headers: 

151 header_data = get_header_data(atoms) 

152 self.multiple_headers = not headers_equal(self.header_data, 

153 header_data) 

154 write_header = self.multiple_headers 

155 

156 write_atoms(b, atoms, write_header=write_header) 

157 

158 calc = atoms.calc 

159 

160 if calc is None and len(kwargs) > 0: 

161 calc = SinglePointCalculator(atoms) 

162 

163 if calc is not None: 

164 if not hasattr(calc, 'get_property'): 

165 calc = OldCalculatorWrapper(calc) 

166 c = b.child('calculator') 

167 c.write(name=calc.name) 

168 if hasattr(calc, 'todict'): 

169 c.write(parameters=calc.todict()) 

170 for prop in all_properties: 

171 if prop in kwargs: 

172 x = kwargs[prop] 

173 else: 

174 if self.properties is not None: 

175 if prop in self.properties: 

176 x = calc.get_property(prop, atoms) 

177 else: 

178 x = None 

179 else: 

180 try: 

181 x = calc.get_property(prop, atoms, 

182 allow_calculation=False) 

183 except (PropertyNotImplementedError, KeyError): 

184 # KeyError is needed for Jacapo. 

185 # XXX We can perhaps remove this. 

186 x = None 

187 if x is not None: 

188 if prop in ['stress', 'dipole']: 

189 x = x.tolist() 

190 c.write(prop, x) 

191 

192 info = {} 

193 for key, value in atoms.info.items(): 

194 try: 

195 encode(value) 

196 except TypeError: 

197 warnings.warn('Skipping "{0}" info.'.format(key)) 

198 else: 

199 info[key] = value 

200 if info: 

201 b.write(info=info) 

202 

203 b.sync() 

204 

205 def close(self): 

206 """Close the trajectory file.""" 

207 self.backend.close() 

208 

209 def __len__(self): 

210 return world.sum(len(self.backend)) 

211 

212 

213class TrajectoryReader: 

214 """Reads Atoms objects from a .traj file.""" 

215 def __init__(self, filename): 

216 """A Trajectory in read mode. 

217 

218 The filename traditionally ends in .traj. 

219 """ 

220 

221 self.numbers = None 

222 self.pbc = None 

223 self.masses = None 

224 

225 self._open(filename) 

226 

227 def __enter__(self): 

228 return self 

229 

230 def __exit__(self, exc_type, exc_value, tb): 

231 self.close() 

232 

233 def _open(self, filename): 

234 import ase.io.ulm as ulm 

235 self.backend = ulm.open(filename, 'r') 

236 self._read_header() 

237 

238 def _read_header(self): 

239 b = self.backend 

240 if b.get_tag() != 'ASE-Trajectory': 

241 raise IOError('This is not a trajectory file!') 

242 

243 if len(b) > 0: 

244 self.pbc = b.pbc 

245 self.numbers = b.numbers 

246 self.masses = b.get('masses') 

247 self.constraints = b.get('constraints', '[]') 

248 self.description = b.get('description') 

249 self.version = b.version 

250 self.ase_version = b.get('ase_version') 

251 

252 def close(self): 

253 """Close the trajectory file.""" 

254 self.backend.close() 

255 

256 def __getitem__(self, i=-1): 

257 if isinstance(i, slice): 

258 return SlicedTrajectory(self, i) 

259 b = self.backend[i] 

260 if 'numbers' in b: 

261 # numbers and other header info was written alongside the image: 

262 atoms = read_atoms(b, traj=self) 

263 else: 

264 # header info was not written because they are the same: 

265 atoms = read_atoms(b, 

266 header=[self.pbc, self.numbers, self.masses, 

267 self.constraints], 

268 traj=self) 

269 if 'calculator' in b: 

270 results = {} 

271 implemented_properties = [] 

272 c = b.calculator 

273 for prop in all_properties: 

274 if prop in c: 

275 results[prop] = c.get(prop) 

276 implemented_properties.append(prop) 

277 calc = SinglePointCalculator(atoms, **results) 

278 calc.name = b.calculator.name 

279 calc.implemented_properties = implemented_properties 

280 

281 if 'parameters' in c: 

282 calc.parameters.update(c.parameters) 

283 atoms.calc = calc 

284 

285 return atoms 

286 

287 def __len__(self): 

288 return len(self.backend) 

289 

290 def __iter__(self): 

291 for i in range(len(self)): 

292 yield self[i] 

293 

294 

295class SlicedTrajectory: 

296 """Wrapper to return a slice from a trajectory without loading 

297 from disk. Initialize with a trajectory (in read mode) and the 

298 desired slice object.""" 

299 

300 def __init__(self, trajectory, sliced): 

301 self.trajectory = trajectory 

302 self.map = range(len(self.trajectory))[sliced] 

303 

304 def __getitem__(self, i): 

305 if isinstance(i, slice): 

306 # Map directly to the original traj, not recursively. 

307 traj = SlicedTrajectory(self.trajectory, slice(0, None)) 

308 traj.map = self.map[i] 

309 return traj 

310 return self.trajectory[self.map[i]] 

311 

312 def __len__(self): 

313 return len(self.map) 

314 

315 

316def get_header_data(atoms): 

317 return {'pbc': atoms.pbc.copy(), 

318 'numbers': atoms.get_atomic_numbers(), 

319 'masses': atoms.get_masses() if atoms.has('masses') else None, 

320 'constraints': list(atoms.constraints)} 

321 

322 

323def headers_equal(headers1, headers2): 

324 assert len(headers1) == len(headers2) 

325 eq = True 

326 for key in headers1: 

327 eq &= np.array_equal(headers1[key], headers2[key]) 

328 return eq 

329 

330 

331class VersionTooOldError(Exception): 

332 pass 

333 

334 

335def read_atoms(backend, 

336 header: Tuple = None, 

337 traj: TrajectoryReader = None, 

338 _try_except: bool = True) -> Atoms: 

339 

340 if _try_except: 

341 try: 

342 return read_atoms(backend, header, traj, False) 

343 except Exception as ex: 

344 if (traj is not None and tokenize_version(__version__) < 

345 tokenize_version(traj.ase_version)): 

346 msg = ('You are trying to read a trajectory file written ' 

347 f'by ASE-{traj.ase_version} from ASE-{__version__}. ' 

348 'It might help to update your ASE') 

349 raise VersionTooOldError(msg) from ex 

350 else: 

351 raise 

352 

353 b = backend 

354 if header: 

355 pbc, numbers, masses, constraints = header 

356 else: 

357 pbc = b.pbc 

358 numbers = b.numbers 

359 masses = b.get('masses') 

360 constraints = b.get('constraints', '[]') 

361 

362 atoms = Atoms(positions=b.positions, 

363 numbers=numbers, 

364 cell=b.cell, 

365 masses=masses, 

366 pbc=pbc, 

367 info=b.get('info'), 

368 constraint=[dict2constraint(d) 

369 for d in decode(constraints)], 

370 momenta=b.get('momenta'), 

371 magmoms=b.get('magmoms'), 

372 charges=b.get('charges'), 

373 tags=b.get('tags')) 

374 return atoms 

375 

376 

377def write_atoms(backend, atoms, write_header=True): 

378 b = backend 

379 

380 if write_header: 

381 b.write(pbc=atoms.pbc.tolist(), 

382 numbers=atoms.numbers) 

383 if atoms.constraints: 

384 if all(hasattr(c, 'todict') for c in atoms.constraints): 

385 b.write(constraints=encode(atoms.constraints)) 

386 

387 if atoms.has('masses'): 

388 b.write(masses=atoms.get_masses()) 

389 

390 b.write(positions=atoms.get_positions(), 

391 cell=atoms.get_cell().tolist()) 

392 

393 if atoms.has('tags'): 

394 b.write(tags=atoms.get_tags()) 

395 if atoms.has('momenta'): 

396 b.write(momenta=atoms.get_momenta()) 

397 if atoms.has('initial_magmoms'): 

398 b.write(magmoms=atoms.get_initial_magnetic_moments()) 

399 if atoms.has('initial_charges'): 

400 b.write(charges=atoms.get_initial_charges()) 

401 

402 

403def read_traj(fd, index): 

404 trj = TrajectoryReader(fd) 

405 for i in range(*index.indices(len(trj))): 

406 yield trj[i] 

407 

408 

409def write_traj(fd, images): 

410 """Write image(s) to trajectory.""" 

411 trj = TrajectoryWriter(fd) 

412 if isinstance(images, Atoms): 

413 images = [images] 

414 for atoms in images: 

415 trj.write(atoms) 

416 

417 

418class OldCalculatorWrapper: 

419 def __init__(self, calc): 

420 self.calc = calc 

421 try: 

422 self.name = calc.name 

423 except AttributeError: 

424 self.name = calc.__class__.__name__.lower() 

425 

426 def get_property(self, prop, atoms, allow_calculation=True): 

427 try: 

428 if (not allow_calculation and 

429 self.calc.calculation_required(atoms, [prop])): 

430 return None 

431 except AttributeError: 

432 pass 

433 

434 method = 'get_' + {'energy': 'potential_energy', 

435 'magmom': 'magnetic_moment', 

436 'magmoms': 'magnetic_moments', 

437 'dipole': 'dipole_moment'}.get(prop, prop) 

438 try: 

439 result = getattr(self.calc, method)(atoms) 

440 except AttributeError: 

441 raise PropertyNotImplementedError 

442 return result 

443 

444 

445def convert(name): 

446 import os 

447 t = TrajectoryWriter(name + '.new') 

448 for atoms in PickleTrajectory(name, _warn=False): 

449 t.write(atoms) 

450 t.close() 

451 os.rename(name, name + '.old') 

452 os.rename(name + '.new', name) 

453 

454 

455def main(): 

456 import optparse 

457 parser = optparse.OptionParser(usage='python -m ase.io.trajectory ' 

458 'a1.traj [a2.traj ...]', 

459 description='Convert old trajectory ' 

460 'file(s) to new format. ' 

461 'The old file is kept as a1.traj.old.') 

462 opts, args = parser.parse_args() 

463 for name in args: 

464 convert(name) 

465 

466 

467if __name__ == '__main__': 

468 main()