Coverage for ufedmm/io.py: 52%
143 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-02 04:20 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-02 04:20 +0000
1"""
2.. module:: io
3 :platform: Unix, Windows
4 :synopsis: Unified Free Energy Dynamics Outputs
6.. moduleauthor:: Charlles Abreu <abreu@eq.ufrj.br>
7"""
9import sys
11import openmm
12import yaml
13from openmm import app, unit
15import ufedmm
16from ufedmm.ufedmm import _Metadynamics, _standardized
19class Tee:
20 """Allows the use of multiple outputs in an OpenMM Reporter.
22 Parameters
23 ----------
24 A list of valid outputs (file names and/or output streams).
26 Example
27 -------
28 >>> import ufedmm
29 >>> import tempfile
30 >>> from sys import stdout
31 >>> file = tempfile.TemporaryFile(mode='w+t')
32 >>> print('test', file=ufedmm.Tee(stdout, file))
33 test
34 """
36 def __init__(self, *files):
37 self._files = list()
38 for output in files:
39 self._files.append(open(output, "w") if isinstance(output, str) else output)
41 def __del__(self):
42 for output in self._files:
43 if output != sys.stdout and output != sys.stderr:
44 output.close()
46 def write(self, message):
47 for output in self._files:
48 output.write(message)
50 def flush(self):
51 for output in self._files:
52 output.flush()
55class StateDataReporter(app.StateDataReporter):
56 """An extension of OpenMM's :OpenMMApp:`statedatareporter.StateDataReporter` class,
57 which outputs information about a simulation, such as energy, temperature, etc.
59 All original functionalities are preserved.
61 Besides, if it is added to an :class:`~ufedmm.ufedmm.ExtendedSpaceSimulation`
62 object, e.g. one created through the
63 :func:`~ufedmm.ufedmm.UnifiedFreeEnergyDynamics.simulation` method, then a
64 new set of keywords are available.
66 Parameters
67 ----------
68 file : str or stream or afed.temperature
69 The file to write to, specified as a file name, file object, or
70 :class:`~ufedmm.io.Tee` object.
71 report_interval : int
72 The interval (in time steps) at which to report state data.
74 Keyword Args
75 ------------
76 variables : bool, default=False
77 Whether to report all dynamical variables related to the extended-space
78 simulation and their associated collective variables.
79 multipleTemperatures : bool, default=False
80 Whether to separately report the temperature estimates for the atoms and for
81 the extended-space dynamical variables.
82 hillHeights : bool, default=False
83 Whether to report the height of the latest deposited metadynamics hill.
84 collectiveVariables : bool, default=False
85 Whether to report the collective variables in all :OpenMM:`CustomCVForce`
86 objects in the system.
87 globalParameterStates : pandas.DataFrame, default=None
88 A DataFrame containing context global parameters (column names) and sets of
89 values thereof. If it is provided, then the potential energy will be
90 reported for every state defined by these values.
92 Example
93 -------
94 >>> import openmm
95 >>> import ufedmm
96 >>> from openmm import unit
97 >>> from sys import stdout
98 >>> model = ufedmm.AlanineDipeptideModel(water='tip3p')
99 >>> mass = 50*unit.dalton*(unit.nanometer/unit.radians)**2
100 >>> Ks = 1000*unit.kilojoules_per_mole/unit.radians**2
101 >>> T = 300*unit.kelvin
102 >>> Ts = 1500*unit.kelvin
103 >>> dt = 2*unit.femtoseconds
104 >>> gamma = 10/unit.picoseconds
105 >>> limit = 180*unit.degrees
106 >>> s_phi = ufedmm.DynamicalVariable(
107 ... 's_phi', -limit, limit, mass, Ts, model.phi, Ks
108 ... )
109 >>> s_psi = ufedmm.DynamicalVariable(
110 ... 's_psi', -limit, limit, mass, Ts, model.psi, Ks
111 ... )
112 >>> ufed = ufedmm.UnifiedFreeEnergyDynamics([s_phi, s_psi], T)
113 >>> integrator = ufedmm.GeodesicLangevinIntegrator(T, gamma, dt)
114 >>> platform = openmm.Platform.getPlatformByName('Reference')
115 >>> simulation = ufed.simulation(
116 ... model.topology, model.system, integrator, platform
117 ... )
118 >>> simulation.context.setPositions(model.positions)
119 >>> simulation.context.setVelocitiesToTemperature(300*unit.kelvin, 1234)
120 >>> reporter = ufedmm.StateDataReporter(stdout, 1, variables=True)
121 >>> reporter.report(simulation, simulation.context.getState())
122 #"s_phi","phi","s_psi","psi"
123 -3.141592653589793,3.141592653589793,-3.141592653589793,3.141592653589793
124 """
126 def __init__(self, file, report_interval, **kwargs):
127 self._variables = kwargs.pop("variables", False)
128 self._multitemps = kwargs.pop("multipleTemperatures", False)
129 self._hill_heights = kwargs.pop("hillHeights", False)
130 self._collective_variables = kwargs.pop("collectiveVariables", False)
131 self._global_parameter_states = kwargs.pop("globalParameterStates", None)
132 super().__init__(file, report_interval, **kwargs)
133 items = [
134 self._volume,
135 self._density,
136 self._speed,
137 self._elapsedTime,
138 self._remainingTime,
139 ]
140 self._backSteps = -sum(items)
141 if self._multitemps:
142 self._needsVelocities = self._needEnergy = True
144 def _add_item(self, lst, item):
145 if self._backSteps == 0:
146 lst.append(item)
147 else:
148 lst.insert(self._backSteps, item)
150 def _initializeConstants(self, simulation):
151 if self._multitemps and not self._temperature:
152 self._temperature = True
153 super()._initializeConstants(simulation)
154 self._temperature = False
155 else:
156 super()._initializeConstants(simulation)
158 self._extended_space = isinstance(simulation, ufedmm.ExtendedSpaceSimulation)
159 if self._extended_space:
160 self._cv_names = [
161 force.getCollectiveVariableName(i)
162 for force in simulation.context.driving_forces
163 for i in range(force.getNumCollectiveVariables())
164 ]
165 self._var_names = [v.id for v in simulation.context.variables]
166 if self._hill_heights:
167 metadynamics_tasks = filter(
168 lambda x: isinstance(x, _Metadynamics), simulation._periodic_tasks
169 )
170 try:
171 self._metadynamics = next(metadynamics_tasks)
172 except StopIteration:
173 raise Exception(
174 "hillHeights keyword involked for simulation without "
175 "metadynamics bias"
176 )
177 bias_factor = self._metadynamics.bias_factor
178 self._height_scaling = (
179 1 if bias_factor is None else bias_factor / (bias_factor - 1)
180 )
182 if self._multitemps:
183 system = simulation.context.getSystem()
184 integrator = simulation.context.getIntegrator()
185 Nt = system.getNumParticles()
186 Nv = len(simulation.context.variables)
187 ke_system = openmm.System()
188 for i in range(Nt - Nv, Nt):
189 ke_system.addParticle(system.getParticleMass(i))
190 ke_integrator = openmm.CustomIntegrator(0)
191 ke_integrator.addPerDofVariable("kT", 0)
192 ke_integrator.addComputePerDof(
193 "v", integrator.getKineticEnergyExpression()
194 )
195 self._ke_context = openmm.Context(ke_system, ke_integrator)
196 if integrator._up_to_date:
197 kT = integrator.getPerDofVariableByName("kT")
198 ke_integrator.setPerDofVariableByName("kT", kT[Nt - Nv : Nt])
199 else:
200 raise ValueError("Integrator temperatures were not set")
202 if self._collective_variables:
203 forces = simulation.context.getSystem().getForces()
204 self._cv_forces = list(
205 filter(lambda f: isinstance(f, openmm.CustomCVForce), forces)
206 )
208 def _constructHeaders(self):
209 headers = super()._constructHeaders()
210 if self._extended_space:
211 if self._multitemps:
212 self._add_item(headers, "T[atoms] (K)")
213 for var in self._var_names:
214 self._add_item(headers, f"T[{var}] (K)")
215 if self._variables:
216 for cv in self._cv_names:
217 self._add_item(headers, cv)
218 if self._hill_heights:
219 self._add_item(headers, "Height (kJ/mole)")
220 if self._collective_variables:
221 for force in self._cv_forces:
222 for index in range(force.getNumCollectiveVariables()):
223 self._add_item(headers, force.getCollectiveVariableName(index))
224 if self._global_parameter_states is not None:
225 for index in self._global_parameter_states.index:
226 self._add_item(headers, f"Energy[{index}] (kJ/mole)")
227 return headers
229 def _constructReportValues(self, simulation, state):
230 values = super()._constructReportValues(simulation, state)
231 if self._extended_space:
232 if self._multitemps:
233 system = simulation.context.getSystem()
234 ke_context = self._ke_context
235 Nt = system.getNumParticles()
236 Nv = ke_context.getSystem().getNumParticles()
237 ke_context.setVelocities(
238 state.getVelocities(extended=True)[Nt - Nv : Nt]
239 )
240 ke_context.getIntegrator().step(1)
241 ke_values = ke_context.getState(getVelocities=True).getVelocities()
242 xvar_ke = sum(ke.x + ke.y + ke.z for ke in ke_values)
243 total_ke = _standardized(state.getKineticEnergy())
244 kB = _standardized(unit.MOLAR_GAS_CONSTANT_R)
245 self._add_item(
246 values, 2 * (total_ke - xvar_ke) / ((self._dof - 3 * Nv) * kB)
247 )
248 for ke in ke_values:
249 self._add_item(values, 2 * ke.x / kB)
250 if self._variables:
251 for force in simulation.context.driving_forces:
252 for cv in force.getCollectiveVariableValues(simulation.context):
253 self._add_item(values, cv)
254 if self._hill_heights:
255 self._add_item(values, self._metadynamics.height * self._height_scaling)
256 if self._collective_variables:
257 for force in self._cv_forces:
258 for cv in force.getCollectiveVariableValues(simulation.context):
259 self._add_item(values, cv)
260 if self._global_parameter_states is not None:
261 originals = list(
262 map(
263 simulation.context.getParameter,
264 self._global_parameter_states.columns,
265 )
266 )
267 for index, row in self._global_parameter_states.iterrows():
268 for name, value in row.items():
269 simulation.context.setParameter(name, value)
270 energy = simulation.context.getState(
271 getEnergy=True
272 ).getPotentialEnergy()
273 self._add_item(values, energy.value_in_unit(unit.kilojoules_per_mole))
274 for name, value in zip(self._global_parameter_states.columns, originals):
275 simulation.context.setParameter(name, value)
276 return values
279def serialize(object, file):
280 """Serializes a ufedmm object."""
281 dump = yaml.dump(object, Dumper=yaml.CDumper)
282 if isinstance(file, str):
283 with open(file, "w") as f:
284 f.write(dump)
285 else:
286 file.write(dump)
289def deserialize(file):
290 """Deserializes a ufedmm object."""
291 if isinstance(file, str):
292 with open(file, "r") as f:
293 object = yaml.load(f.read(), Loader=yaml.CLoader)
294 else:
295 object = yaml.load(file.read(), Loader=yaml.CLoader)
296 return object