Coverage for openxps/extension_writer.py: 100%

110 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-13 22:08 +0000

1""" 

2.. module:: openxps.extension_writer 

3 :platform: Linux, MacOS, Windows 

4 :synopsis: A custom writer for reporting state data from an extension context. 

5 

6.. classauthor:: Charlles Abreu <craabreu@gmail.com> 

7 

8""" 

9 

10import cvpack 

11import openmm as mm 

12from cvpack.reporting.custom_writer import CustomWriter 

13from openmm import _openmm as mmswig 

14from openmm import unit as mmunit 

15 

16from .simulation import ExtendedSpaceSimulation 

17 

18 

19class ExtensionWriter(CustomWriter): 

20 """ 

21 A custom writer for reporting state data from an extension context. 

22 

23 This writer can be used with :class:`cvpack.reporting.StateDataReporter` to 

24 report various properties of the extension context in an extended phase-space 

25 simulation. The extension context contains the dynamical variables and their 

26 associated properties. 

27 

28 Parameters 

29 ---------- 

30 kinetic : bool, optional, default=False 

31 If ``True``, the kinetic energy of the extension context will be reported 

32 in units of kJ/mol. 

33 temperature : bool, optional, default=False 

34 If ``True``, the temperature of the extension context will be reported 

35 in units of Kelvin. 

36 dynamical_variables : bool, optional, default=False 

37 If ``True``, the current values of all dynamical variables will be reported. 

38 Each variable will be reported with its name and unit. 

39 forces : bool, optional, default=False 

40 If ``True``, the forces acting on each dynamical variable will be reported 

41 in units of kJ/(mol*unit), where unit is the unit of the corresponding 

42 dynamical variable. 

43 collective_variables : bool, optional, default=False 

44 If ``True``, the current values of collective variables that are part of 

45 collective variable couplings will be reported. 

46 effective_masses : bool, optional, default=False 

47 If ``True``, the effective masses of collective variables from coupling 

48 forces will be reported in units of dalton*(nanometer/unit)^2, where 

49 unit is the unit of the corresponding collective variable. 

50 coupling_functions : bool, optional, default=False 

51 If ``True``, the current values of functions in inner product couplings 

52 will be reported. 

53 

54 Example 

55 ------- 

56 >>> import openxps as xps 

57 >>> from sys import stdout 

58 >>> import openmm 

59 >>> import cvpack 

60 >>> from openmm import unit 

61 >>> from openmmtools import testsystems 

62 >>> model = testsystems.AlanineDipeptideVacuum() 

63 >>> mass = 3 * unit.dalton*(unit.nanometer/unit.radian)**2 

64 >>> phi0 = xps.DynamicalVariable("phi0", unit.radian, mass, xps.CircularBounds()) 

65 >>> phi = cvpack.Torsion(6, 8, 14, 16, name="phi") 

66 >>> kappa = 1000 * unit.kilojoules_per_mole / unit.radian**2 

67 >>> harmonic_force = xps.HarmonicCoupling(phi, phi0, kappa) 

68 >>> integrator = openmm.LangevinMiddleIntegrator( 

69 ... 300 * unit.kelvin, 1 / unit.picosecond, 4 * unit.femtosecond 

70 ... ) 

71 >>> integrator.setRandomNumberSeed(1234) 

72 >>> platform = openmm.Platform.getPlatformByName("Reference") 

73 >>> simulation = xps.ExtendedSpaceSimulation( 

74 ... model.topology, 

75 ... xps.ExtendedSpaceSystem(model.system, harmonic_force), 

76 ... xps.LockstepIntegrator(integrator), 

77 ... platform 

78 ... ) 

79 >>> simulation.context.setPositions(model.positions) 

80 >>> simulation.context.setVelocitiesToTemperature(300 * unit.kelvin, 1234) 

81 >>> simulation.context.setDynamicalVariableValues([180 * unit.degree]) 

82 >>> simulation.context.setDynamicalVariableVelocitiesToTemperature( 

83 ... 300 * unit.kelvin, 1234 

84 ... ) 

85 >>> reporter = cvpack.reporting.StateDataReporter( 

86 ... stdout, 

87 ... 10, 

88 ... step=True, 

89 ... kineticEnergy=True, 

90 ... writers=[xps.ExtensionWriter( 

91 ... kinetic=True, 

92 ... temperature=True, 

93 ... dynamical_variables=True 

94 ... )], 

95 ... ) 

96 >>> simulation.reporters.append(reporter) 

97 >>> simulation.step(100) # doctest: +SKIP 

98 #"Step","Kinetic Energy (kJ/mole)","Extension Kinetic Energy (kJ/mole)",\ 

99 "Extension Temperature (K)","phi0 (rad)" 

100 10,60.512...,1.7013...,123.456...,3.1415... 

101 """ 

102 

103 def __init__( # noqa: PLR0913 

104 self, 

105 *, 

106 kinetic: bool = False, 

107 temperature: bool = False, 

108 dynamical_variables: bool = False, 

109 forces: bool = False, 

110 collective_variables: bool = False, 

111 effective_masses: bool = False, 

112 coupling_functions: bool = False, 

113 ) -> None: 

114 self._kinetic = kinetic 

115 self._temperature = temperature 

116 self._dynamical_variables = dynamical_variables 

117 self._forces = forces 

118 self._collective_variables = collective_variables 

119 self._effective_masses = effective_masses 

120 self._coupling_functions = coupling_functions 

121 

122 self._needs_energy = kinetic or temperature 

123 self._needs_positions = dynamical_variables 

124 self._needs_forces = forces 

125 

126 self._temp_factor = 0.0 

127 self._dv_objects = [] 

128 self._meta_cvs = [] 

129 self._function_names = [] 

130 

131 def initialize(self, simulation: ExtendedSpaceSimulation) -> None: 

132 context = simulation.extended_space_context 

133 coupling = context.getSystem().getCoupling() 

134 self._dv_objects = coupling.getDynamicalVariables() 

135 

136 if self._temperature: 

137 number = len(coupling.getDynamicalVariables()) 

138 kb = mmunit.MOLAR_GAS_CONSTANT_R.value_in_unit( 

139 mmunit.kilojoules_per_mole / mmunit.kelvin 

140 ) 

141 self._temp_factor = 2 / (number * kb) 

142 

143 if self._collective_variables or self._effective_masses: 

144 self._meta_cvs = [] 

145 for force in coupling.getForces(): 

146 if isinstance(force, cvpack.MetaCollectiveVariable): 

147 self._meta_cvs.append(force) 

148 

149 if self._coupling_functions: 

150 self._function_names = [] 

151 dv_names = {dv.name for dv in self._dv_objects} 

152 for parameter in coupling.getProtectedParameters(): 

153 if parameter not in dv_names: 

154 self._function_names.append(parameter) 

155 

156 def getDynamicalVariableHeaders(self) -> list[str]: 

157 """Get the headers for all dynamical variables. 

158 

159 Returns 

160 ------- 

161 list[str] 

162 The headers for all dynamical variables. 

163 """ 

164 return [f"{dv.name} ({dv.unit.get_symbol()})" for dv in self._dv_objects] 

165 

166 def getForceHeaders(self) -> list[str]: 

167 """Get the header for the force on a dynamical variable. 

168 

169 Returns 

170 ------- 

171 list[str] 

172 The headers for all forces. 

173 """ 

174 return [ 

175 f"Force on {dv.name} (kJ/(mol*{dv.unit.get_symbol()}))" 

176 for dv in self._dv_objects 

177 ] 

178 

179 def getCollectiveVariableHeaders(self) -> list[str]: 

180 """Get the headers for all collective variables. 

181 

182 Returns 

183 ------- 

184 list[str] 

185 The headers for all collective variables. 

186 """ 

187 headers = [] 

188 for meta_cv in self._meta_cvs: 

189 for cv in meta_cv.getInnerVariables(): 

190 headers.append(f"{cv.getName()} ({cv.getUnit().get_symbol()})") 

191 return headers 

192 

193 def getEffectiveMassHeaders(self) -> list[str]: 

194 """Get the headers for all effective masses. 

195 

196 Returns 

197 ------- 

198 list[str] 

199 The headers for all effective masses. 

200 """ 

201 headers = [] 

202 for meta_cv in self._meta_cvs: 

203 for cv in meta_cv.getInnerVariables(): 

204 mass_unit = mmunit.dalton * (mmunit.nanometer / cv.getUnit()) ** 2 

205 headers.append(f"emass[{cv.getName()}] ({mass_unit.get_symbol()})") 

206 return headers 

207 

208 def getHeaders(self) -> list[str]: # noqa: PLR0912 

209 headers = [] 

210 if self._kinetic: 

211 headers.append("Extension Kinetic Energy (kJ/mole)") 

212 if self._temperature: 

213 headers.append("Extension Temperature (K)") 

214 if self._dynamical_variables: 

215 headers.extend(self.getDynamicalVariableHeaders()) 

216 if self._forces: 

217 headers.extend(self.getForceHeaders()) 

218 if self._collective_variables: 

219 headers.extend(self.getCollectiveVariableHeaders()) 

220 if self._effective_masses: 

221 headers.extend(self.getEffectiveMassHeaders()) 

222 if self._coupling_functions: 

223 headers.extend(self._function_names) 

224 return headers 

225 

226 def getValues(self, simulation: ExtendedSpaceSimulation) -> list[float]: # noqa: PLR0912 

227 context = simulation.extended_space_context 

228 extension_context = context.getExtensionContext() 

229 state = extension_context.getState( 

230 energy=self._needs_energy, 

231 positions=self._needs_positions, 

232 forces=self._needs_forces, 

233 ) 

234 if self._needs_energy: 

235 kinetic_energy = mmswig.State_getKineticEnergy(state) 

236 if self._needs_positions: 

237 positions = mmswig.State__getVectorAsVec3(state, mm.State.Positions) 

238 if self._forces: 

239 forces = mmswig.State__getVectorAsVec3(state, mm.State.Forces) 

240 values = [] 

241 if self._kinetic: 

242 values.append(kinetic_energy) 

243 if self._temperature: 

244 values.append(self._temp_factor * kinetic_energy) 

245 if self._dynamical_variables: 

246 for index, dv in enumerate(self._dv_objects): 

247 value, _ = dv.bounds.wrap(positions[index].x, 0) 

248 values.append(value) 

249 if self._forces: 

250 for force in forces: 

251 values.append(force.x) 

252 if self._collective_variables: 

253 for meta_cv in self._meta_cvs: 

254 for cv in meta_cv.getInnerVariables(): 

255 values.append(extension_context.getParameter(cv.getName())) 

256 if self._effective_masses: 

257 for meta_cv in self._meta_cvs: 

258 for cv, mass in zip( 

259 meta_cv.getInnerVariables(), 

260 meta_cv.getInnerEffectiveMasses(context).values(), 

261 ): 

262 mass_unit = mmunit.dalton * (mmunit.nanometer / cv.getUnit()) ** 2 

263 values.append(mass.value_in_unit(mass_unit)) 

264 if self._coupling_functions: 

265 for name in self._function_names: 

266 values.append(context.getParameter(name)) 

267 return values