Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import warnings
2from typing import Tuple
4import numpy as np
6from ase import __version__
7from ase.calculators.singlepoint import SinglePointCalculator, all_properties
8from ase.constraints import dict2constraint
9from ase.calculators.calculator import PropertyNotImplementedError
10from ase.atoms import Atoms
11from ase.io.jsonio import encode, decode
12from ase.io.pickletrajectory import PickleTrajectory
13from ase.parallel import world
14from ase.utils import tokenize_version
17__all__ = ['Trajectory', 'PickleTrajectory']
20def Trajectory(filename, mode='r', atoms=None, properties=None, master=None):
21 """A Trajectory can be created in read, write or append mode.
23 Parameters:
25 filename: str
26 The name of the file. Traditionally ends in .traj.
27 mode: str
28 The mode. 'r' is read mode, the file should already exist, and
29 no atoms argument should be specified.
30 'w' is write mode. The atoms argument specifies the Atoms
31 object to be written to the file, if not given it must instead
32 be given as an argument to the write() method.
33 'a' is append mode. It acts as write mode, except that
34 data is appended to a preexisting file.
35 atoms: Atoms object
36 The Atoms object to be written in write or append mode.
37 properties: list of str
38 If specified, these calculator properties are saved in the
39 trajectory. If not specified, all supported quantities are
40 saved. Possible values: energy, forces, stress, dipole,
41 charges, magmom and magmoms.
42 master: bool
43 Controls which process does the actual writing. The
44 default is that process number 0 does this. If this
45 argument is given, processes where it is True will write.
47 The atoms, properties and master arguments are ignores in read mode.
48 """
49 if mode == 'r':
50 return TrajectoryReader(filename)
51 return TrajectoryWriter(filename, mode, atoms, properties, master=master)
54class TrajectoryWriter:
55 """Writes Atoms objects to a .traj file."""
56 def __init__(self, filename, mode='w', atoms=None, properties=None,
57 extra=[], master=None):
58 """A Trajectory writer, in write or append mode.
60 Parameters:
62 filename: str
63 The name of the file. Traditionally ends in .traj.
64 mode: str
65 The mode. 'r' is read mode, the file should already exist, and
66 no atoms argument should be specified.
67 'w' is write mode. The atoms argument specifies the Atoms
68 object to be written to the file, if not given it must instead
69 be given as an argument to the write() method.
70 'a' is append mode. It acts as write mode, except that
71 data is appended to a preexisting file.
72 atoms: Atoms object
73 The Atoms object to be written in write or append mode.
74 properties: list of str
75 If specified, these calculator properties are saved in the
76 trajectory. If not specified, all supported quantities are
77 saved. Possible values: energy, forces, stress, dipole,
78 charges, magmom and magmoms.
79 master: bool
80 Controls which process does the actual writing. The
81 default is that process number 0 does this. If this
82 argument is given, processes where it is True will write.
83 """
84 if master is None:
85 master = (world.rank == 0)
86 self.master = master
87 self.atoms = atoms
88 self.properties = properties
90 self.description = {}
91 self.header_data = None
92 self.multiple_headers = False
94 self._open(filename, mode)
96 def __enter__(self):
97 return self
99 def __exit__(self, exc_type, exc_value, tb):
100 self.close()
102 def set_description(self, description):
103 self.description.update(description)
105 def _open(self, filename, mode):
106 import ase.io.ulm as ulm
107 if mode not in 'aw':
108 raise ValueError('mode must be "w" or "a".')
109 if self.master:
110 self.backend = ulm.open(filename, mode, tag='ASE-Trajectory')
111 if len(self.backend) > 0 and mode == 'a':
112 with Trajectory(filename) as traj:
113 atoms = traj[0]
114 self.header_data = get_header_data(atoms)
115 else:
116 self.backend = ulm.DummyWriter()
118 def write(self, atoms=None, **kwargs):
119 """Write the atoms to the file.
121 If the atoms argument is not given, the atoms object specified
122 when creating the trajectory object is used.
124 Use keyword arguments to add extra properties::
126 writer.write(atoms, energy=117, dipole=[0, 0, 1.0])
127 """
128 if atoms is None:
129 atoms = self.atoms
131 for image in atoms.iterimages():
132 self._write_atoms(image, **kwargs)
134 def _write_atoms(self, atoms, **kwargs):
135 b = self.backend
137 if self.header_data is None:
138 b.write(version=1, ase_version=__version__)
139 if self.description:
140 b.write(description=self.description)
141 # Atomic numbers and periodic boundary conditions are written
142 # in the header in the beginning.
143 #
144 # If an image later on has other numbers/pbc, we write a new
145 # header. All subsequent images will then have their own header
146 # whether or not their numbers/pbc change.
147 self.header_data = get_header_data(atoms)
148 write_header = True
149 else:
150 if not self.multiple_headers:
151 header_data = get_header_data(atoms)
152 self.multiple_headers = not headers_equal(self.header_data,
153 header_data)
154 write_header = self.multiple_headers
156 write_atoms(b, atoms, write_header=write_header)
158 calc = atoms.calc
160 if calc is None and len(kwargs) > 0:
161 calc = SinglePointCalculator(atoms)
163 if calc is not None:
164 if not hasattr(calc, 'get_property'):
165 calc = OldCalculatorWrapper(calc)
166 c = b.child('calculator')
167 c.write(name=calc.name)
168 if hasattr(calc, 'todict'):
169 c.write(parameters=calc.todict())
170 for prop in all_properties:
171 if prop in kwargs:
172 x = kwargs[prop]
173 else:
174 if self.properties is not None:
175 if prop in self.properties:
176 x = calc.get_property(prop, atoms)
177 else:
178 x = None
179 else:
180 try:
181 x = calc.get_property(prop, atoms,
182 allow_calculation=False)
183 except (PropertyNotImplementedError, KeyError):
184 # KeyError is needed for Jacapo.
185 # XXX We can perhaps remove this.
186 x = None
187 if x is not None:
188 if prop in ['stress', 'dipole']:
189 x = x.tolist()
190 c.write(prop, x)
192 info = {}
193 for key, value in atoms.info.items():
194 try:
195 encode(value)
196 except TypeError:
197 warnings.warn('Skipping "{0}" info.'.format(key))
198 else:
199 info[key] = value
200 if info:
201 b.write(info=info)
203 b.sync()
205 def close(self):
206 """Close the trajectory file."""
207 self.backend.close()
209 def __len__(self):
210 return world.sum(len(self.backend))
213class TrajectoryReader:
214 """Reads Atoms objects from a .traj file."""
215 def __init__(self, filename):
216 """A Trajectory in read mode.
218 The filename traditionally ends in .traj.
219 """
221 self.numbers = None
222 self.pbc = None
223 self.masses = None
225 self._open(filename)
227 def __enter__(self):
228 return self
230 def __exit__(self, exc_type, exc_value, tb):
231 self.close()
233 def _open(self, filename):
234 import ase.io.ulm as ulm
235 self.backend = ulm.open(filename, 'r')
236 self._read_header()
238 def _read_header(self):
239 b = self.backend
240 if b.get_tag() != 'ASE-Trajectory':
241 raise IOError('This is not a trajectory file!')
243 if len(b) > 0:
244 self.pbc = b.pbc
245 self.numbers = b.numbers
246 self.masses = b.get('masses')
247 self.constraints = b.get('constraints', '[]')
248 self.description = b.get('description')
249 self.version = b.version
250 self.ase_version = b.get('ase_version')
252 def close(self):
253 """Close the trajectory file."""
254 self.backend.close()
256 def __getitem__(self, i=-1):
257 if isinstance(i, slice):
258 return SlicedTrajectory(self, i)
259 b = self.backend[i]
260 if 'numbers' in b:
261 # numbers and other header info was written alongside the image:
262 atoms = read_atoms(b, traj=self)
263 else:
264 # header info was not written because they are the same:
265 atoms = read_atoms(b,
266 header=[self.pbc, self.numbers, self.masses,
267 self.constraints],
268 traj=self)
269 if 'calculator' in b:
270 results = {}
271 implemented_properties = []
272 c = b.calculator
273 for prop in all_properties:
274 if prop in c:
275 results[prop] = c.get(prop)
276 implemented_properties.append(prop)
277 calc = SinglePointCalculator(atoms, **results)
278 calc.name = b.calculator.name
279 calc.implemented_properties = implemented_properties
281 if 'parameters' in c:
282 calc.parameters.update(c.parameters)
283 atoms.calc = calc
285 return atoms
287 def __len__(self):
288 return len(self.backend)
290 def __iter__(self):
291 for i in range(len(self)):
292 yield self[i]
295class SlicedTrajectory:
296 """Wrapper to return a slice from a trajectory without loading
297 from disk. Initialize with a trajectory (in read mode) and the
298 desired slice object."""
300 def __init__(self, trajectory, sliced):
301 self.trajectory = trajectory
302 self.map = range(len(self.trajectory))[sliced]
304 def __getitem__(self, i):
305 if isinstance(i, slice):
306 # Map directly to the original traj, not recursively.
307 traj = SlicedTrajectory(self.trajectory, slice(0, None))
308 traj.map = self.map[i]
309 return traj
310 return self.trajectory[self.map[i]]
312 def __len__(self):
313 return len(self.map)
316def get_header_data(atoms):
317 return {'pbc': atoms.pbc.copy(),
318 'numbers': atoms.get_atomic_numbers(),
319 'masses': atoms.get_masses() if atoms.has('masses') else None,
320 'constraints': list(atoms.constraints)}
323def headers_equal(headers1, headers2):
324 assert len(headers1) == len(headers2)
325 eq = True
326 for key in headers1:
327 eq &= np.array_equal(headers1[key], headers2[key])
328 return eq
331class VersionTooOldError(Exception):
332 pass
335def read_atoms(backend,
336 header: Tuple = None,
337 traj: TrajectoryReader = None,
338 _try_except: bool = True) -> Atoms:
340 if _try_except:
341 try:
342 return read_atoms(backend, header, traj, False)
343 except Exception as ex:
344 if (traj is not None and tokenize_version(__version__) <
345 tokenize_version(traj.ase_version)):
346 msg = ('You are trying to read a trajectory file written '
347 f'by ASE-{traj.ase_version} from ASE-{__version__}. '
348 'It might help to update your ASE')
349 raise VersionTooOldError(msg) from ex
350 else:
351 raise
353 b = backend
354 if header:
355 pbc, numbers, masses, constraints = header
356 else:
357 pbc = b.pbc
358 numbers = b.numbers
359 masses = b.get('masses')
360 constraints = b.get('constraints', '[]')
362 atoms = Atoms(positions=b.positions,
363 numbers=numbers,
364 cell=b.cell,
365 masses=masses,
366 pbc=pbc,
367 info=b.get('info'),
368 constraint=[dict2constraint(d)
369 for d in decode(constraints)],
370 momenta=b.get('momenta'),
371 magmoms=b.get('magmoms'),
372 charges=b.get('charges'),
373 tags=b.get('tags'))
374 return atoms
377def write_atoms(backend, atoms, write_header=True):
378 b = backend
380 if write_header:
381 b.write(pbc=atoms.pbc.tolist(),
382 numbers=atoms.numbers)
383 if atoms.constraints:
384 if all(hasattr(c, 'todict') for c in atoms.constraints):
385 b.write(constraints=encode(atoms.constraints))
387 if atoms.has('masses'):
388 b.write(masses=atoms.get_masses())
390 b.write(positions=atoms.get_positions(),
391 cell=atoms.get_cell().tolist())
393 if atoms.has('tags'):
394 b.write(tags=atoms.get_tags())
395 if atoms.has('momenta'):
396 b.write(momenta=atoms.get_momenta())
397 if atoms.has('initial_magmoms'):
398 b.write(magmoms=atoms.get_initial_magnetic_moments())
399 if atoms.has('initial_charges'):
400 b.write(charges=atoms.get_initial_charges())
403def read_traj(fd, index):
404 trj = TrajectoryReader(fd)
405 for i in range(*index.indices(len(trj))):
406 yield trj[i]
409def write_traj(fd, images):
410 """Write image(s) to trajectory."""
411 trj = TrajectoryWriter(fd)
412 if isinstance(images, Atoms):
413 images = [images]
414 for atoms in images:
415 trj.write(atoms)
418class OldCalculatorWrapper:
419 def __init__(self, calc):
420 self.calc = calc
421 try:
422 self.name = calc.name
423 except AttributeError:
424 self.name = calc.__class__.__name__.lower()
426 def get_property(self, prop, atoms, allow_calculation=True):
427 try:
428 if (not allow_calculation and
429 self.calc.calculation_required(atoms, [prop])):
430 return None
431 except AttributeError:
432 pass
434 method = 'get_' + {'energy': 'potential_energy',
435 'magmom': 'magnetic_moment',
436 'magmoms': 'magnetic_moments',
437 'dipole': 'dipole_moment'}.get(prop, prop)
438 try:
439 result = getattr(self.calc, method)(atoms)
440 except AttributeError:
441 raise PropertyNotImplementedError
442 return result
445def convert(name):
446 import os
447 t = TrajectoryWriter(name + '.new')
448 for atoms in PickleTrajectory(name, _warn=False):
449 t.write(atoms)
450 t.close()
451 os.rename(name, name + '.old')
452 os.rename(name + '.new', name)
455def main():
456 import optparse
457 parser = optparse.OptionParser(usage='python -m ase.io.trajectory '
458 'a1.traj [a2.traj ...]',
459 description='Convert old trajectory '
460 'file(s) to new format. '
461 'The old file is kept as a1.traj.old.')
462 opts, args = parser.parse_args()
463 for name in args:
464 convert(name)
467if __name__ == '__main__':
468 main()