Coverage for openxps/integrator.py: 86%
138 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-13 22:08 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-13 22:08 +0000
1"""
2.. module:: openxps.integrator
3 :platform: Linux, MacOS, Windows
4 :synopsis: Integrators for extended phase-space simulations with OpenMM.
6.. classauthor:: Charlles Abreu <craabreu@gmail.com>
8"""
10import typing as t
11from abc import ABC, abstractmethod
12from copy import deepcopy
14import numpy as np
15import openmm as mm
16from openmm import _openmm as mmswig
17from openmm import unit as mmunit
19from . import integrators
20from .couplings import Coupling
21from .integrators.utils import IntegratorMixin
22from .utils import STRING_SEPARATOR
24#: Tuple of OpenMM integrator classes known to evaluate forces exclusively at the
25#: beginning of each time step (force-first or leapfrog schemes).
26KNOWN_FORCE_FIRST_INTEGRATORS = (
27 mm.VerletIntegrator,
28 mm.LangevinIntegrator,
29 mm.LangevinMiddleIntegrator,
30 mm.NoseHooverIntegrator,
31)
33#: Tuple of OpenMM integrator classes known to be symmetric in the sense of operator
34#: splitting, i.e., they can be represented as a palindromic sequence of operations.
35KNOWN_SYMMETRIC_INTEGRATORS = (
36 integrators.SymmetricVerletIntegrator,
37 integrators.SymmetricLangevinIntegrator,
38)
41class ExtendedSpaceIntegrator(mm.Integrator, ABC):
42 """Base class for integrators that advance extended phase-space simulations.
44 An extended phase-space integrator manages two separate OpenMM integrators: one for
45 the physical system and one for the extension system containing dynamical variables.
47 The step size of this integrator is the maximum of the step sizes of the physical
48 and extension integrators.
50 All :OpenMM:`Integrator` methods are applied to the physical system integrator,
51 except for ``step``, ``getStepSize``, and ``setStepSize``, which are applied to
52 both the physical and extension system integrators.
54 Parameters
55 ----------
56 physical_integrator
57 The integrator for the physical system.
58 extension_integrator
59 The integrator for the extension system.
61 """
63 def __init__(
64 self,
65 physical_integrator: mm.Integrator,
66 extension_integrator: mm.Integrator,
67 ) -> None:
68 self._physical_integrator = physical_integrator
69 self._extension_integrator = extension_integrator
70 self._initialize()
72 def __copy__(self) -> "ExtendedSpaceIntegrator":
73 """Return an unconfigured copy of the integrator."""
74 return self.__class__(
75 self._physical_integrator.__copy__(),
76 self._extension_integrator.__copy__(),
77 )
79 def __getstate__(self) -> str:
80 """Get the state of the integrator as a string."""
81 return (
82 self._physical_integrator.__getstate__()
83 + STRING_SEPARATOR
84 + self._extension_integrator.__getstate__()
85 )
87 def __setstate__(self, state: str) -> None:
88 """Set the state of the integrator from a string."""
89 physical_state, extension_state = state.split(STRING_SEPARATOR)
90 self._physical_integrator = mm.XmlSerializer.deserialize(physical_state)
91 self._extension_integrator = mm.XmlSerializer.deserialize(extension_state)
92 self._initialize()
94 def _initialize(self) -> None:
95 """Initialize the integrator."""
96 self.this = self._physical_integrator.this
97 self._step_size = max(
98 mmswig.Integrator_getStepSize(self._physical_integrator),
99 mmswig.Integrator_getStepSize(self._extension_integrator),
100 )
101 self._extension_context = None
102 self._physical_context = None
103 self._dynamical_variables = None
104 self._coupling = None
106 def configure(
107 self,
108 physical_context: mm.Context,
109 extension_context: mm.Context,
110 coupling: Coupling,
111 ) -> None:
112 """Configure the integrator.
114 Store pointers to the given contexts, dynamical variables, and coupling
115 potential in the integrator. This allows the integrator to update the contexts
116 during the simulation.
118 Parameters
119 ----------
120 physical_context
121 The OpenMM context containing the physical system.
122 extension_context
123 The OpenMM context containing the extension system.
124 coupling
125 The potential that couples the physical and dynamical variables.
126 """
127 for integrator, context, isExtension in (
128 (self._physical_integrator, physical_context, False),
129 (self._extension_integrator, extension_context, True),
130 ):
131 if isinstance(integrator, IntegratorMixin):
132 integrator.registerWithSystem(context.getSystem(), isExtension)
133 self._physical_context = physical_context
134 self._extension_context = extension_context
135 self._coupling = coupling
136 self._dynamical_variables = coupling.getDynamicalVariables()
138 def getPhysicalIntegrator(self) -> mm.Integrator:
139 """Get the integrator for the physical system.
141 Returns
142 -------
143 mm.Integrator
144 The integrator for the physical system.
145 """
146 return self._physical_integrator
148 def getExtensionIntegrator(self) -> mm.Integrator:
149 """Get the integrator for the extension system.
151 Returns
152 -------
153 mm.Integrator
154 The integrator for the extension system.
155 """
156 return self._extension_integrator
158 def getStepSize(self) -> mmunit.Quantity:
159 """Get the step size for the integrator.
161 This is the maximum of the step sizes of the physical and extension integrators.
163 Returns
164 -------
165 mmunit.Quantity
166 The step size for the integrator.
167 """
168 return self._step_size * mmunit.picosecond
170 def setStepSize(self, size: mmunit.Quantity | float) -> None:
171 """Set the step size for the extended phase-space integrator.
173 This scales the step size of the physical and extension integrators by a factor
174 defined by the ratio of the given step size to the largest current step size.
176 Parameters
177 ----------
178 size
179 The step size for the extended phase-space integrator.
180 """
181 if mmunit.is_quantity(size):
182 size = size.value_in_unit(mmunit.picosecond)
183 factor = size / self._step_size
184 mmswig.Integrator_setStepSize(
185 self._physical_integrator,
186 factor * mmswig.Integrator_getStepSize(self._physical_integrator),
187 )
188 mmswig.Integrator_setStepSize(
189 self._extension_integrator,
190 factor * mmswig.Integrator_getStepSize(self._extension_integrator),
191 )
192 self._step_size = size
194 @abstractmethod
195 def step(self, steps: int) -> None:
196 """Advance the extended phase-space simulation by a specified number of steps.
198 Parameters
199 ----------
200 steps
201 The number of time steps to advance the simulation.
202 """
203 raise NotImplementedError("This method must be implemented by the subclass.")
206class LockstepIntegrator(ExtendedSpaceIntegrator):
207 """
208 An integrator that advances the physical and extension systems in lockstep.
210 This class integrates both the physical and extension systems simultaneously, then
211 synchronizes the contexts after each step. It assumes that both integrators apply
212 all forces at the beginning of each time step (a force-first scheme). If either
213 integrator does not follow this scheme, the overall integration will be incorrect.
215 The step sizes of the physical and extension integrators must be equal.
217 Parameters
218 ----------
219 physical_integrator
220 The integrator for the physical system.
221 extension_integrator
222 The integrator for the extension system. If None, a copy of the physical
223 integrator is used.
224 assume_force_first
225 If True, skip the validation that checks whether the integrators are known
226 to follow a force-first scheme. Use this at your own risk if you know your
227 integrators are compatible. Default is False.
229 Raises
230 ------
231 ValueError
232 If the physical and extension integrators do not follow a force-first scheme or
233 do not have the same step size.
235 Example
236 -------
237 >>> import openxps as xps
238 >>> import openmm as mm
239 >>> from openmm import unit
241 Using OpenMM's LangevinMiddleIntegrator integrator (force-first)
243 >>> integrator = mm.LangevinMiddleIntegrator(
244 ... 300 * unit.kelvin, 1 / unit.picosecond, 1 * unit.femtosecond
245 ... )
246 >>> xps_integrator = xps.LockstepIntegrator(integrator)
247 >>> xps_integrator.getStepSize()
248 0.001 ps
250 Using CSVR integrator with force-first scheme for the extension system
252 >>> csvr = xps.integrators.CSVRIntegrator(
253 ... 300 * unit.kelvin, 10 / unit.picosecond, 1 * unit.femtosecond,
254 ... forceFirst=True
255 ... )
256 >>> xps_integrator_ff = xps.LockstepIntegrator(integrator, csvr)
257 """
259 def __init__(
260 self,
261 physical_integrator: mm.Integrator,
262 extension_integrator: t.Optional[mm.Integrator] = None,
263 assume_force_first: bool = False,
264 ) -> None:
265 if extension_integrator is None:
266 extension_integrator = deepcopy(physical_integrator)
267 elif not np.isclose(
268 mmswig.Integrator_getStepSize(physical_integrator),
269 mmswig.Integrator_getStepSize(extension_integrator),
270 ):
271 raise ValueError("The step sizes must be equal.")
272 if not (
273 assume_force_first
274 or (
275 self._is_force_first(physical_integrator)
276 and self._is_force_first(extension_integrator)
277 )
278 ):
279 raise ValueError(
280 "The integrators must follow a force-first scheme.\n"
281 "If you are certain they do, set assume_force_first=True."
282 )
283 super().__init__(physical_integrator, extension_integrator)
285 @staticmethod
286 def _is_force_first(integrator: mm.Integrator) -> bool:
287 """Check if an integrator follows a force-first scheme."""
288 if isinstance(integrator, KNOWN_FORCE_FIRST_INTEGRATORS):
289 return True
290 if isinstance(integrator, IntegratorMixin):
291 return integrator.isForceFirst()
292 return False
294 def step(self, steps: int) -> None:
295 """Advance the extended phase-space simulation by a specified number of steps.
297 Parameters
298 ----------
299 steps
300 The number of time steps to advance the simulation.
301 """
302 for _ in range(steps):
303 self._physical_integrator.step(1)
304 self._extension_integrator.step(1)
305 self._coupling.updatePhysicalContext(
306 self._physical_context,
307 self._extension_context,
308 )
309 self._coupling.updateExtensionContext(
310 self._physical_context,
311 self._extension_context,
312 )
315class SplitIntegrator(ExtendedSpaceIntegrator):
316 r"""
317 An integrator that advances the physical and extension systems using an operator
318 splitting scheme.
320 This integrator assumes that the physical and extension integrators are symmetric
321 in terms of operator splitting, i.e., they can be represented as a palindromic
322 sequence of operations. If either integrator does not follow this scheme, the
323 overall integration scheme will be incorrect.
325 The step sizes of the physical and extension integrators must be related by an even
326 integer ratio, with either the physical or extension integrator having the larger
327 step size.
329 This class integrates the physical and extension systems in a :math:`B^n A B^n`
330 pattern, where :math:`n` is the number of substeps, :math:`A` is the integrator with
331 the larger step size :math:`\Delta t` (considered as the overall step size), and
332 :math:`B` is the integrator with the smaller step size :math:`\Delta t/(2n)`.
334 Parameters
335 ----------
336 physical_integrator
337 The integrator for the physical system.
338 extension_integrator
339 The integrator for the extension system. If None, a copy of the physical
340 integrator is used with half the step size of the physical integrator.
341 assume_symmetric
342 If True, skip the validation that checks whether the integrators are known
343 to be symmetric in terms of operator splitting. Use this at your own risk if
344 you know your integrators are compatible but are not in the known list.
345 Default is False.
347 Raises
348 ------
349 ValueError
350 If the physical and extension integrators are not symmetric in terms of
351 operator splitting, or do not have a step size ratio equal to an even integer
352 number.
354 Example
355 -------
356 >>> import openxps as xps
357 >>> import openmm as mm
358 >>> from openmm import unit
360 Using symmetric Verlet integrator with half step size
362 >>> integrator = xps.integrators.SymmetricVerletIntegrator(4 * unit.femtosecond)
363 >>> xps_integrator = xps.SplitIntegrator(integrator)
364 >>> xps_integrator.getStepSize()
365 0.004 ps
366 >>> xps_integrator.getNumSubsteps()
367 1
369 Using symmetric Langevin integrator with 4:1 step size ratio
371 >>> physical = xps.integrators.SymmetricLangevinIntegrator(
372 ... 300 * unit.kelvin, 1 / unit.picosecond, 4 * unit.femtosecond
373 ... )
374 >>> extension = xps.integrators.SymmetricLangevinIntegrator(
375 ... 300 * unit.kelvin, 1 / unit.picosecond, 1 * unit.femtosecond
376 ... )
377 >>> xps_integrator = xps.SplitIntegrator(physical, extension)
378 >>> xps_integrator.getStepSize()
379 0.004 ps
380 >>> xps_integrator.getNumSubsteps()
381 2
383 Using symmetric Langevin integrator with 1:4 step size ratio
385 >>> physical = xps.integrators.SymmetricLangevinIntegrator(
386 ... 300 * unit.kelvin, 1 / unit.picosecond, 2 * unit.femtosecond
387 ... )
388 >>> extension = xps.integrators.SymmetricLangevinIntegrator(
389 ... 300 * unit.kelvin, 1 / unit.picosecond, 8 * unit.femtosecond
390 ... )
391 >>> xps_integrator = xps.SplitIntegrator(physical, extension)
392 >>> xps_integrator.getStepSize()
393 0.008 ps
394 >>> xps_integrator.getNumSubsteps()
395 2
396 """
398 def __init__(
399 self,
400 physical_integrator: mm.Integrator,
401 extension_integrator: t.Optional[mm.Integrator] = None,
402 assume_symmetric: bool = False,
403 ) -> None:
404 physical_step_size = mmswig.Integrator_getStepSize(physical_integrator)
405 if extension_integrator is None:
406 extension_integrator = deepcopy(physical_integrator)
407 extension_step_size = physical_step_size / 2
408 mmswig.Integrator_setStepSize(extension_integrator, extension_step_size)
409 else:
410 extension_step_size = mmswig.Integrator_getStepSize(extension_integrator)
411 if not (
412 assume_symmetric
413 or (
414 self._is_symmetric(physical_integrator)
415 and self._is_symmetric(extension_integrator)
416 )
417 ):
418 raise ValueError(
419 "The integrators must be symmetric in terms of operator splitting.\n"
420 "If you are certain they are, set assume_symmetric=True."
421 )
422 step_size = max(physical_step_size, extension_step_size)
423 substep_size = min(physical_step_size, extension_step_size)
424 if not self._is_even_division(step_size, substep_size):
425 raise ValueError("The step sizes must be related by an even integer ratio.")
426 super().__init__(physical_integrator, extension_integrator)
428 @staticmethod
429 def _is_symmetric(integrator: mm.Integrator) -> bool:
430 """Check if an integrator is symmetric in terms of operator splitting."""
431 if isinstance(integrator, KNOWN_SYMMETRIC_INTEGRATORS):
432 return True
433 if isinstance(integrator, IntegratorMixin):
434 return not integrator.isForceFirst()
435 return False
437 @staticmethod
438 def _is_even_division(a: float, b: float) -> bool:
439 if b == 0:
440 raise ValueError("Division by zero is not allowed.")
441 return np.isclose(a / b - round(a / b), 0) and int(round(a / b)) % 2 == 0
443 def _initialize(self) -> None:
444 super()._initialize()
445 physical_step_size = mmswig.Integrator_getStepSize(self._physical_integrator)
446 extension_step_size = mmswig.Integrator_getStepSize(self._extension_integrator)
447 if physical_step_size > extension_step_size:
448 self._middle_integrator = self._physical_integrator
449 self._end_integrator = self._extension_integrator
450 ratio = physical_step_size / (2 * extension_step_size)
451 else:
452 self._middle_integrator = self._extension_integrator
453 self._end_integrator = self._physical_integrator
454 ratio = extension_step_size / (2 * physical_step_size)
455 self._num_substeps = int(np.rint(ratio))
457 def configure(
458 self,
459 physical_context: mm.Context,
460 extension_context: mm.Context,
461 coupling: Coupling,
462 ) -> None:
463 super().configure(physical_context, extension_context, coupling)
464 if self._middle_integrator is self._physical_integrator:
465 self._update_middle_context = self._coupling.updatePhysicalContext
466 self._update_end_context = self._coupling.updateExtensionContext
467 else:
468 self._update_middle_context = self._coupling.updateExtensionContext
469 self._update_end_context = self._coupling.updatePhysicalContext
471 def getNumSubsteps(self) -> int:
472 """Get the number of substeps for the integrator.
474 Returns
475 -------
476 int
477 The number of substeps for the integrator.
478 """
479 return self._num_substeps
481 def step(self, steps: int) -> None:
482 """Advance the extended phase-space simulation by integrating the physical and
483 extension systems in lockstep over a specified number of time steps.
485 Parameters
486 ----------
487 steps : int
488 The number of time steps to advance the simulation.
489 """
490 step_count = self._physical_context.getStepCount()
491 for _ in range(steps):
492 self._end_integrator.step(self._num_substeps)
493 self._update_middle_context(self._physical_context, self._extension_context)
494 self._middle_integrator.step(1)
495 self._update_end_context(self._physical_context, self._extension_context)
496 self._end_integrator.step(self._num_substeps)
497 self._update_middle_context(self._physical_context, self._extension_context)
498 self._physical_context.setStepCount(step_count + steps)