Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1""" Class for handling several simultaneous jobs. 

2 The class has been tested on linux and Mac OS X. 

3""" 

4from subprocess import Popen, PIPE 

5import os 

6import time 

7from ase.io import write, read 

8 

9 

10class ParallelLocalRun: 

11 

12 """ Class that allows for the simultaneous relaxation of 

13 several candidates on the same computer. 

14 The method is based on starting each relaxation with an 

15 external python script and then monitoring when the 

16 relaxations are done adding in the resulting structures 

17 to the database. 

18 

19 Parameters: 

20 data_connection: DataConnection object. 

21 tmp_folder: Folder for temporary files 

22 n_simul: The number of simultaneous relaxations. 

23 calc_script: Reference to the relaxation script. 

24 """ 

25 

26 def __init__(self, data_connection, tmp_folder, 

27 n_simul, calc_script): 

28 self.dc = data_connection 

29 self.n_simul = n_simul 

30 self.calc_script = calc_script 

31 self.tmp_folder = tmp_folder 

32 self.running_pids = [] 

33 

34 def get_number_of_jobs_running(self): 

35 """ Returns the number of jobs running. 

36 It is a good idea to check that this is 0 before 

37 terminating the main program. """ 

38 self.__cleanup__() 

39 return len(self.running_pids) 

40 

41 def relax(self, a): 

42 """ Relax the input atoms object a. If n_simul relaxations 

43 are already running the function sleeps until a processor 

44 becomes available. 

45 """ 

46 self.__cleanup__() 

47 

48 # Wait until a thread is available. 

49 while len(self.running_pids) >= self.n_simul: 

50 time.sleep(2.) 

51 self.__cleanup__() 

52 

53 # Mark the structure as queued and run the external py script. 

54 self.dc.mark_as_queued(a) 

55 if not os.path.isdir(self.tmp_folder): 

56 os.mkdir(self.tmp_folder) 

57 fname = '{0}/cand{1}.traj'.format(self.tmp_folder, 

58 a.info['confid']) 

59 write(fname, a) 

60 p = Popen(['python', self.calc_script, fname]) 

61 self.running_pids.append([a.info['confid'], p.pid]) 

62 

63 def __cleanup__(self): 

64 """ Checks if any relaxations are done and load in the structure 

65 from the traj file. """ 

66 p = Popen(['ps -x -U `whoami`'], shell=True, 

67 stdin=PIPE, stdout=PIPE, stderr=PIPE, close_fds=True, 

68 universal_newlines=True) 

69 (_, fout) = (p.stdin, p.stdout) 

70 lines = fout.readlines() 

71 lines = [l for l in lines if l.find('defunct') == -1] 

72 

73 stopped_runs = [] 

74 for i in range(len(self.running_pids) - 1, -1, -1): 

75 found = False 

76 for l in lines: 

77 if l.find(str(self.running_pids[i][1])) != -1: 

78 found = True 

79 break 

80 if not found: 

81 stopped_runs.append(self.running_pids.pop(i)) 

82 

83 # All processes not running any more must be complete and should 

84 # be loaded in. 

85 for (confid, _) in stopped_runs: 

86 try: 

87 tf = self.tmp_folder 

88 a = read('{0}/cand{1}_done.traj'.format(tf, 

89 confid)) 

90 self.dc.add_relaxed_step(a) 

91 except IOError as e: 

92 print(e)