Coverage for /builds/debichem-team/python-ase/ase/ga/pbs_queue_run.py: 17.74%
62 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-03-06 04:00 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-03-06 04:00 +0000
1""" Class for handling interaction with the PBS queuing system."""
2import os
3import time
4from subprocess import PIPE, Popen
6from ase.io import write
7from ase.io.trajectory import Trajectory
10class PBSQueueRun:
12 """ Class for communicating with the commonly used PBS queing system
13 at a computer cluster.
15 The user needs to supply a job file generator which takes
16 as input a job name and the relative path to the traj
17 file which is to be locally optimized. The function returns
18 the job script as text.
19 If the traj file is called f the job must write a file
20 f[:-5] + '_done.traj' which is then read by this object.
22 Parameters:
24 data_connection: The DataConnection object.
25 tmp_folder: Temporary folder for all calculations
26 job_prefix: Prefix of the job submitted. This identifier is used
27 to determine how many jobs are currently running.
28 n_simul: The number of simultaneous jobs to keep in the queuing system.
29 job_template_generator: The function generating the job file.
30 This function should return the content of the job file as a
31 string.
32 qsub_command: The name of the qsub command (default qsub).
33 qstat_command: The name of the qstat command (default qstat).
34 """
36 def __init__(self, data_connection, tmp_folder, job_prefix,
37 n_simul, job_template_generator,
38 qsub_command='qsub', qstat_command='qstat',
39 find_neighbors=None, perform_parametrization=None):
40 self.dc = data_connection
41 self.job_prefix = job_prefix
42 self.n_simul = n_simul
43 self.job_template_generator = job_template_generator
44 self.qsub_command = qsub_command
45 self.qstat_command = qstat_command
46 self.tmp_folder = tmp_folder
47 self.find_neighbors = find_neighbors
48 self.perform_parametrization = perform_parametrization
49 self.__cleanup__()
51 def relax(self, a):
52 """ Add a structure to the queue. This method does not fail
53 if sufficient jobs are already running, but simply
54 submits the job. """
55 self.__cleanup__()
56 self.dc.mark_as_queued(a)
57 if not os.path.isdir(self.tmp_folder):
58 os.mkdir(self.tmp_folder)
59 fname = '{}/cand{}.traj'.format(self.tmp_folder,
60 a.info['confid'])
61 write(fname, a)
62 job_name = '{}_{}'.format(self.job_prefix, a.info['confid'])
63 with open('tmp_job_file.job', 'w') as fd:
64 fd.write(self.job_template_generator(job_name, fname))
65 os.system(f'{self.qsub_command} tmp_job_file.job')
67 def enough_jobs_running(self):
68 """ Determines if sufficient jobs are running. """
69 return self.number_of_jobs_running() >= self.n_simul
71 def number_of_jobs_running(self):
72 """ Determines how many jobs are running. The user
73 should use this or the enough_jobs_running method
74 to verify that a job needs to be started before
75 calling the relax method."""
76 self.__cleanup__()
77 p = Popen([f'`which {self.qstat_command}` -u `whoami`'],
78 shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE,
79 close_fds=True, universal_newlines=True)
80 fout = p.stdout
81 lines = fout.readlines()
82 n_running = 0
83 for line in lines:
84 if line.find(self.job_prefix) != -1:
85 n_running += 1
86 return n_running
88 def __cleanup__(self):
89 """ Tries to load in structures previously
90 submitted to the queing system. """
91 confs = self.dc.get_all_candidates_in_queue()
92 for c in confs:
93 fdone = '{}/cand{}_done.traj'.format(self.tmp_folder,
94 c)
95 if os.path.isfile(fdone) and os.path.getsize(fdone) > 0:
96 try:
97 a = []
98 niter = 0
99 while len(a) == 0 and niter < 5:
100 t = Trajectory(fdone, 'r')
101 a = [ats for ats in t]
102 if len(a) == 0:
103 time.sleep(1.)
104 niter += 1
105 if len(a) == 0:
106 txt = 'Could not read candidate ' + \
107 f'{c} from the filesystem'
108 raise OSError(txt)
109 a = a[-1]
110 a.info['confid'] = c
111 self.dc.add_relaxed_step(
112 a,
113 find_neighbors=self.find_neighbors,
114 perform_parametrization=self.perform_parametrization)
115 except OSError as e:
116 print(e)