Coverage for ufedmm/io.py: 52%

143 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-02 04:20 +0000

1""" 

2.. module:: io 

3 :platform: Unix, Windows 

4 :synopsis: Unified Free Energy Dynamics Outputs 

5 

6.. moduleauthor:: Charlles Abreu <abreu@eq.ufrj.br> 

7""" 

8 

9import sys 

10 

11import openmm 

12import yaml 

13from openmm import app, unit 

14 

15import ufedmm 

16from ufedmm.ufedmm import _Metadynamics, _standardized 

17 

18 

19class Tee: 

20 """Allows the use of multiple outputs in an OpenMM Reporter. 

21 

22 Parameters 

23 ---------- 

24 A list of valid outputs (file names and/or output streams). 

25 

26 Example 

27 ------- 

28 >>> import ufedmm 

29 >>> import tempfile 

30 >>> from sys import stdout 

31 >>> file = tempfile.TemporaryFile(mode='w+t') 

32 >>> print('test', file=ufedmm.Tee(stdout, file)) 

33 test 

34 """ 

35 

36 def __init__(self, *files): 

37 self._files = list() 

38 for output in files: 

39 self._files.append(open(output, "w") if isinstance(output, str) else output) 

40 

41 def __del__(self): 

42 for output in self._files: 

43 if output != sys.stdout and output != sys.stderr: 

44 output.close() 

45 

46 def write(self, message): 

47 for output in self._files: 

48 output.write(message) 

49 

50 def flush(self): 

51 for output in self._files: 

52 output.flush() 

53 

54 

55class StateDataReporter(app.StateDataReporter): 

56 """An extension of OpenMM's :OpenMMApp:`statedatareporter.StateDataReporter` class, 

57 which outputs information about a simulation, such as energy, temperature, etc. 

58 

59 All original functionalities are preserved. 

60 

61 Besides, if it is added to an :class:`~ufedmm.ufedmm.ExtendedSpaceSimulation` 

62 object, e.g. one created through the 

63 :func:`~ufedmm.ufedmm.UnifiedFreeEnergyDynamics.simulation` method, then a 

64 new set of keywords are available. 

65 

66 Parameters 

67 ---------- 

68 file : str or stream or afed.temperature 

69 The file to write to, specified as a file name, file object, or 

70 :class:`~ufedmm.io.Tee` object. 

71 report_interval : int 

72 The interval (in time steps) at which to report state data. 

73 

74 Keyword Args 

75 ------------ 

76 variables : bool, default=False 

77 Whether to report all dynamical variables related to the extended-space 

78 simulation and their associated collective variables. 

79 multipleTemperatures : bool, default=False 

80 Whether to separately report the temperature estimates for the atoms and for 

81 the extended-space dynamical variables. 

82 hillHeights : bool, default=False 

83 Whether to report the height of the latest deposited metadynamics hill. 

84 collectiveVariables : bool, default=False 

85 Whether to report the collective variables in all :OpenMM:`CustomCVForce` 

86 objects in the system. 

87 globalParameterStates : pandas.DataFrame, default=None 

88 A DataFrame containing context global parameters (column names) and sets of 

89 values thereof. If it is provided, then the potential energy will be 

90 reported for every state defined by these values. 

91 

92 Example 

93 ------- 

94 >>> import openmm 

95 >>> import ufedmm 

96 >>> from openmm import unit 

97 >>> from sys import stdout 

98 >>> model = ufedmm.AlanineDipeptideModel(water='tip3p') 

99 >>> mass = 50*unit.dalton*(unit.nanometer/unit.radians)**2 

100 >>> Ks = 1000*unit.kilojoules_per_mole/unit.radians**2 

101 >>> T = 300*unit.kelvin 

102 >>> Ts = 1500*unit.kelvin 

103 >>> dt = 2*unit.femtoseconds 

104 >>> gamma = 10/unit.picoseconds 

105 >>> limit = 180*unit.degrees 

106 >>> s_phi = ufedmm.DynamicalVariable( 

107 ... 's_phi', -limit, limit, mass, Ts, model.phi, Ks 

108 ... ) 

109 >>> s_psi = ufedmm.DynamicalVariable( 

110 ... 's_psi', -limit, limit, mass, Ts, model.psi, Ks 

111 ... ) 

112 >>> ufed = ufedmm.UnifiedFreeEnergyDynamics([s_phi, s_psi], T) 

113 >>> integrator = ufedmm.GeodesicLangevinIntegrator(T, gamma, dt) 

114 >>> platform = openmm.Platform.getPlatformByName('Reference') 

115 >>> simulation = ufed.simulation( 

116 ... model.topology, model.system, integrator, platform 

117 ... ) 

118 >>> simulation.context.setPositions(model.positions) 

119 >>> simulation.context.setVelocitiesToTemperature(300*unit.kelvin, 1234) 

120 >>> reporter = ufedmm.StateDataReporter(stdout, 1, variables=True) 

121 >>> reporter.report(simulation, simulation.context.getState()) 

122 #"s_phi","phi","s_psi","psi" 

123 -3.141592653589793,3.141592653589793,-3.141592653589793,3.141592653589793 

124 """ 

125 

126 def __init__(self, file, report_interval, **kwargs): 

127 self._variables = kwargs.pop("variables", False) 

128 self._multitemps = kwargs.pop("multipleTemperatures", False) 

129 self._hill_heights = kwargs.pop("hillHeights", False) 

130 self._collective_variables = kwargs.pop("collectiveVariables", False) 

131 self._global_parameter_states = kwargs.pop("globalParameterStates", None) 

132 super().__init__(file, report_interval, **kwargs) 

133 items = [ 

134 self._volume, 

135 self._density, 

136 self._speed, 

137 self._elapsedTime, 

138 self._remainingTime, 

139 ] 

140 self._backSteps = -sum(items) 

141 if self._multitemps: 

142 self._needsVelocities = self._needEnergy = True 

143 

144 def _add_item(self, lst, item): 

145 if self._backSteps == 0: 

146 lst.append(item) 

147 else: 

148 lst.insert(self._backSteps, item) 

149 

150 def _initializeConstants(self, simulation): 

151 if self._multitemps and not self._temperature: 

152 self._temperature = True 

153 super()._initializeConstants(simulation) 

154 self._temperature = False 

155 else: 

156 super()._initializeConstants(simulation) 

157 

158 self._extended_space = isinstance(simulation, ufedmm.ExtendedSpaceSimulation) 

159 if self._extended_space: 

160 self._cv_names = [ 

161 force.getCollectiveVariableName(i) 

162 for force in simulation.context.driving_forces 

163 for i in range(force.getNumCollectiveVariables()) 

164 ] 

165 self._var_names = [v.id for v in simulation.context.variables] 

166 if self._hill_heights: 

167 metadynamics_tasks = filter( 

168 lambda x: isinstance(x, _Metadynamics), simulation._periodic_tasks 

169 ) 

170 try: 

171 self._metadynamics = next(metadynamics_tasks) 

172 except StopIteration: 

173 raise Exception( 

174 "hillHeights keyword involked for simulation without " 

175 "metadynamics bias" 

176 ) 

177 bias_factor = self._metadynamics.bias_factor 

178 self._height_scaling = ( 

179 1 if bias_factor is None else bias_factor / (bias_factor - 1) 

180 ) 

181 

182 if self._multitemps: 

183 system = simulation.context.getSystem() 

184 integrator = simulation.context.getIntegrator() 

185 Nt = system.getNumParticles() 

186 Nv = len(simulation.context.variables) 

187 ke_system = openmm.System() 

188 for i in range(Nt - Nv, Nt): 

189 ke_system.addParticle(system.getParticleMass(i)) 

190 ke_integrator = openmm.CustomIntegrator(0) 

191 ke_integrator.addPerDofVariable("kT", 0) 

192 ke_integrator.addComputePerDof( 

193 "v", integrator.getKineticEnergyExpression() 

194 ) 

195 self._ke_context = openmm.Context(ke_system, ke_integrator) 

196 if integrator._up_to_date: 

197 kT = integrator.getPerDofVariableByName("kT") 

198 ke_integrator.setPerDofVariableByName("kT", kT[Nt - Nv : Nt]) 

199 else: 

200 raise ValueError("Integrator temperatures were not set") 

201 

202 if self._collective_variables: 

203 forces = simulation.context.getSystem().getForces() 

204 self._cv_forces = list( 

205 filter(lambda f: isinstance(f, openmm.CustomCVForce), forces) 

206 ) 

207 

208 def _constructHeaders(self): 

209 headers = super()._constructHeaders() 

210 if self._extended_space: 

211 if self._multitemps: 

212 self._add_item(headers, "T[atoms] (K)") 

213 for var in self._var_names: 

214 self._add_item(headers, f"T[{var}] (K)") 

215 if self._variables: 

216 for cv in self._cv_names: 

217 self._add_item(headers, cv) 

218 if self._hill_heights: 

219 self._add_item(headers, "Height (kJ/mole)") 

220 if self._collective_variables: 

221 for force in self._cv_forces: 

222 for index in range(force.getNumCollectiveVariables()): 

223 self._add_item(headers, force.getCollectiveVariableName(index)) 

224 if self._global_parameter_states is not None: 

225 for index in self._global_parameter_states.index: 

226 self._add_item(headers, f"Energy[{index}] (kJ/mole)") 

227 return headers 

228 

229 def _constructReportValues(self, simulation, state): 

230 values = super()._constructReportValues(simulation, state) 

231 if self._extended_space: 

232 if self._multitemps: 

233 system = simulation.context.getSystem() 

234 ke_context = self._ke_context 

235 Nt = system.getNumParticles() 

236 Nv = ke_context.getSystem().getNumParticles() 

237 ke_context.setVelocities( 

238 state.getVelocities(extended=True)[Nt - Nv : Nt] 

239 ) 

240 ke_context.getIntegrator().step(1) 

241 ke_values = ke_context.getState(getVelocities=True).getVelocities() 

242 xvar_ke = sum(ke.x + ke.y + ke.z for ke in ke_values) 

243 total_ke = _standardized(state.getKineticEnergy()) 

244 kB = _standardized(unit.MOLAR_GAS_CONSTANT_R) 

245 self._add_item( 

246 values, 2 * (total_ke - xvar_ke) / ((self._dof - 3 * Nv) * kB) 

247 ) 

248 for ke in ke_values: 

249 self._add_item(values, 2 * ke.x / kB) 

250 if self._variables: 

251 for force in simulation.context.driving_forces: 

252 for cv in force.getCollectiveVariableValues(simulation.context): 

253 self._add_item(values, cv) 

254 if self._hill_heights: 

255 self._add_item(values, self._metadynamics.height * self._height_scaling) 

256 if self._collective_variables: 

257 for force in self._cv_forces: 

258 for cv in force.getCollectiveVariableValues(simulation.context): 

259 self._add_item(values, cv) 

260 if self._global_parameter_states is not None: 

261 originals = list( 

262 map( 

263 simulation.context.getParameter, 

264 self._global_parameter_states.columns, 

265 ) 

266 ) 

267 for index, row in self._global_parameter_states.iterrows(): 

268 for name, value in row.items(): 

269 simulation.context.setParameter(name, value) 

270 energy = simulation.context.getState( 

271 getEnergy=True 

272 ).getPotentialEnergy() 

273 self._add_item(values, energy.value_in_unit(unit.kilojoules_per_mole)) 

274 for name, value in zip(self._global_parameter_states.columns, originals): 

275 simulation.context.setParameter(name, value) 

276 return values 

277 

278 

279def serialize(object, file): 

280 """Serializes a ufedmm object.""" 

281 dump = yaml.dump(object, Dumper=yaml.CDumper) 

282 if isinstance(file, str): 

283 with open(file, "w") as f: 

284 f.write(dump) 

285 else: 

286 file.write(dump) 

287 

288 

289def deserialize(file): 

290 """Deserializes a ufedmm object.""" 

291 if isinstance(file, str): 

292 with open(file, "r") as f: 

293 object = yaml.load(f.read(), Loader=yaml.CLoader) 

294 else: 

295 object = yaml.load(file.read(), Loader=yaml.CLoader) 

296 return object