Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1""" Class for handling interaction with the PBS queuing system.""" 

2from ase.io import write 

3import os 

4from ase.io.trajectory import Trajectory 

5from subprocess import Popen, PIPE 

6import time 

7 

8 

9class PBSQueueRun: 

10 

11 """ Class for communicating with the commonly used PBS queing system 

12 at a computer cluster. 

13 

14 The user needs to supply a job file generator which takes 

15 as input a job name and the relative path to the traj 

16 file which is to be locally optimized. The function returns 

17 the job script as text. 

18 If the traj file is called f the job must write a file 

19 f[:-5] + '_done.traj' which is then read by this object. 

20 

21 Parameters: 

22 

23 data_connection: The DataConnection object. 

24 tmp_folder: Temporary folder for all calculations 

25 job_prefix: Prefix of the job submitted. This identifier is used 

26 to determine how many jobs are currently running. 

27 n_simul: The number of simultaneous jobs to keep in the queuing system. 

28 job_template_generator: The function generating the job file. 

29 This function should return the content of the job file as a 

30 string. 

31 qsub_command: The name of the qsub command (default qsub). 

32 qstat_command: The name of the qstat command (default qstat). 

33 """ 

34 def __init__(self, data_connection, tmp_folder, job_prefix, 

35 n_simul, job_template_generator, 

36 qsub_command='qsub', qstat_command='qstat', 

37 find_neighbors=None, perform_parametrization=None): 

38 self.dc = data_connection 

39 self.job_prefix = job_prefix 

40 self.n_simul = n_simul 

41 self.job_template_generator = job_template_generator 

42 self.qsub_command = qsub_command 

43 self.qstat_command = qstat_command 

44 self.tmp_folder = tmp_folder 

45 self.find_neighbors = find_neighbors 

46 self.perform_parametrization = perform_parametrization 

47 self.__cleanup__() 

48 

49 def relax(self, a): 

50 """ Add a structure to the queue. This method does not fail 

51 if sufficient jobs are already running, but simply 

52 submits the job. """ 

53 self.__cleanup__() 

54 self.dc.mark_as_queued(a) 

55 if not os.path.isdir(self.tmp_folder): 

56 os.mkdir(self.tmp_folder) 

57 fname = '{0}/cand{1}.traj'.format(self.tmp_folder, 

58 a.info['confid']) 

59 write(fname, a) 

60 job_name = '{0}_{1}'.format(self.job_prefix, a.info['confid']) 

61 fd = open('tmp_job_file.job', 'w') 

62 fd.write(self.job_template_generator(job_name, fname)) 

63 fd.close() 

64 os.system('{0} tmp_job_file.job'.format(self.qsub_command)) 

65 

66 def enough_jobs_running(self): 

67 """ Determines if sufficient jobs are running. """ 

68 return self.number_of_jobs_running() >= self.n_simul 

69 

70 def number_of_jobs_running(self): 

71 """ Determines how many jobs are running. The user 

72 should use this or the enough_jobs_running method 

73 to verify that a job needs to be started before 

74 calling the relax method.""" 

75 self.__cleanup__() 

76 p = Popen(['`which {0}` -u `whoami`'.format(self.qstat_command)], 

77 shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE, 

78 close_fds=True, universal_newlines=True) 

79 fout = p.stdout 

80 lines = fout.readlines() 

81 n_running = 0 

82 for l in lines: 

83 if l.find(self.job_prefix) != -1: 

84 n_running += 1 

85 return n_running 

86 

87 def __cleanup__(self): 

88 """ Tries to load in structures previously 

89 submitted to the queing system. """ 

90 confs = self.dc.get_all_candidates_in_queue() 

91 for c in confs: 

92 fdone = '{0}/cand{1}_done.traj'.format(self.tmp_folder, 

93 c) 

94 if os.path.isfile(fdone) and os.path.getsize(fdone) > 0: 

95 try: 

96 a = [] 

97 niter = 0 

98 while len(a) == 0 and niter < 5: 

99 t = Trajectory(fdone, 'r') 

100 a = [ats for ats in t] 

101 if len(a) == 0: 

102 time.sleep(1.) 

103 niter += 1 

104 if len(a) == 0: 

105 txt = 'Could not read candidate ' + \ 

106 '{0} from the filesystem'.format(c) 

107 raise IOError(txt) 

108 a = a[-1] 

109 a.info['confid'] = c 

110 self.dc.add_relaxed_step( 

111 a, 

112 find_neighbors=self.find_neighbors, 

113 perform_parametrization=self.perform_parametrization) 

114 except IOError as e: 

115 print(e)