Coverage for openxps/integrator.py: 86%

138 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-13 22:08 +0000

1""" 

2.. module:: openxps.integrator 

3 :platform: Linux, MacOS, Windows 

4 :synopsis: Integrators for extended phase-space simulations with OpenMM. 

5 

6.. classauthor:: Charlles Abreu <craabreu@gmail.com> 

7 

8""" 

9 

10import typing as t 

11from abc import ABC, abstractmethod 

12from copy import deepcopy 

13 

14import numpy as np 

15import openmm as mm 

16from openmm import _openmm as mmswig 

17from openmm import unit as mmunit 

18 

19from . import integrators 

20from .couplings import Coupling 

21from .integrators.utils import IntegratorMixin 

22from .utils import STRING_SEPARATOR 

23 

24#: Tuple of OpenMM integrator classes known to evaluate forces exclusively at the 

25#: beginning of each time step (force-first or leapfrog schemes). 

26KNOWN_FORCE_FIRST_INTEGRATORS = ( 

27 mm.VerletIntegrator, 

28 mm.LangevinIntegrator, 

29 mm.LangevinMiddleIntegrator, 

30 mm.NoseHooverIntegrator, 

31) 

32 

33#: Tuple of OpenMM integrator classes known to be symmetric in the sense of operator 

34#: splitting, i.e., they can be represented as a palindromic sequence of operations. 

35KNOWN_SYMMETRIC_INTEGRATORS = ( 

36 integrators.SymmetricVerletIntegrator, 

37 integrators.SymmetricLangevinIntegrator, 

38) 

39 

40 

41class ExtendedSpaceIntegrator(mm.Integrator, ABC): 

42 """Base class for integrators that advance extended phase-space simulations. 

43 

44 An extended phase-space integrator manages two separate OpenMM integrators: one for 

45 the physical system and one for the extension system containing dynamical variables. 

46 

47 The step size of this integrator is the maximum of the step sizes of the physical 

48 and extension integrators. 

49 

50 All :OpenMM:`Integrator` methods are applied to the physical system integrator, 

51 except for ``step``, ``getStepSize``, and ``setStepSize``, which are applied to 

52 both the physical and extension system integrators. 

53 

54 Parameters 

55 ---------- 

56 physical_integrator 

57 The integrator for the physical system. 

58 extension_integrator 

59 The integrator for the extension system. 

60 

61 """ 

62 

63 def __init__( 

64 self, 

65 physical_integrator: mm.Integrator, 

66 extension_integrator: mm.Integrator, 

67 ) -> None: 

68 self._physical_integrator = physical_integrator 

69 self._extension_integrator = extension_integrator 

70 self._initialize() 

71 

72 def __copy__(self) -> "ExtendedSpaceIntegrator": 

73 """Return an unconfigured copy of the integrator.""" 

74 return self.__class__( 

75 self._physical_integrator.__copy__(), 

76 self._extension_integrator.__copy__(), 

77 ) 

78 

79 def __getstate__(self) -> str: 

80 """Get the state of the integrator as a string.""" 

81 return ( 

82 self._physical_integrator.__getstate__() 

83 + STRING_SEPARATOR 

84 + self._extension_integrator.__getstate__() 

85 ) 

86 

87 def __setstate__(self, state: str) -> None: 

88 """Set the state of the integrator from a string.""" 

89 physical_state, extension_state = state.split(STRING_SEPARATOR) 

90 self._physical_integrator = mm.XmlSerializer.deserialize(physical_state) 

91 self._extension_integrator = mm.XmlSerializer.deserialize(extension_state) 

92 self._initialize() 

93 

94 def _initialize(self) -> None: 

95 """Initialize the integrator.""" 

96 self.this = self._physical_integrator.this 

97 self._step_size = max( 

98 mmswig.Integrator_getStepSize(self._physical_integrator), 

99 mmswig.Integrator_getStepSize(self._extension_integrator), 

100 ) 

101 self._extension_context = None 

102 self._physical_context = None 

103 self._dynamical_variables = None 

104 self._coupling = None 

105 

106 def configure( 

107 self, 

108 physical_context: mm.Context, 

109 extension_context: mm.Context, 

110 coupling: Coupling, 

111 ) -> None: 

112 """Configure the integrator. 

113 

114 Store pointers to the given contexts, dynamical variables, and coupling 

115 potential in the integrator. This allows the integrator to update the contexts 

116 during the simulation. 

117 

118 Parameters 

119 ---------- 

120 physical_context 

121 The OpenMM context containing the physical system. 

122 extension_context 

123 The OpenMM context containing the extension system. 

124 coupling 

125 The potential that couples the physical and dynamical variables. 

126 """ 

127 for integrator, context, isExtension in ( 

128 (self._physical_integrator, physical_context, False), 

129 (self._extension_integrator, extension_context, True), 

130 ): 

131 if isinstance(integrator, IntegratorMixin): 

132 integrator.registerWithSystem(context.getSystem(), isExtension) 

133 self._physical_context = physical_context 

134 self._extension_context = extension_context 

135 self._coupling = coupling 

136 self._dynamical_variables = coupling.getDynamicalVariables() 

137 

138 def getPhysicalIntegrator(self) -> mm.Integrator: 

139 """Get the integrator for the physical system. 

140 

141 Returns 

142 ------- 

143 mm.Integrator 

144 The integrator for the physical system. 

145 """ 

146 return self._physical_integrator 

147 

148 def getExtensionIntegrator(self) -> mm.Integrator: 

149 """Get the integrator for the extension system. 

150 

151 Returns 

152 ------- 

153 mm.Integrator 

154 The integrator for the extension system. 

155 """ 

156 return self._extension_integrator 

157 

158 def getStepSize(self) -> mmunit.Quantity: 

159 """Get the step size for the integrator. 

160 

161 This is the maximum of the step sizes of the physical and extension integrators. 

162 

163 Returns 

164 ------- 

165 mmunit.Quantity 

166 The step size for the integrator. 

167 """ 

168 return self._step_size * mmunit.picosecond 

169 

170 def setStepSize(self, size: mmunit.Quantity | float) -> None: 

171 """Set the step size for the extended phase-space integrator. 

172 

173 This scales the step size of the physical and extension integrators by a factor 

174 defined by the ratio of the given step size to the largest current step size. 

175 

176 Parameters 

177 ---------- 

178 size 

179 The step size for the extended phase-space integrator. 

180 """ 

181 if mmunit.is_quantity(size): 

182 size = size.value_in_unit(mmunit.picosecond) 

183 factor = size / self._step_size 

184 mmswig.Integrator_setStepSize( 

185 self._physical_integrator, 

186 factor * mmswig.Integrator_getStepSize(self._physical_integrator), 

187 ) 

188 mmswig.Integrator_setStepSize( 

189 self._extension_integrator, 

190 factor * mmswig.Integrator_getStepSize(self._extension_integrator), 

191 ) 

192 self._step_size = size 

193 

194 @abstractmethod 

195 def step(self, steps: int) -> None: 

196 """Advance the extended phase-space simulation by a specified number of steps. 

197 

198 Parameters 

199 ---------- 

200 steps 

201 The number of time steps to advance the simulation. 

202 """ 

203 raise NotImplementedError("This method must be implemented by the subclass.") 

204 

205 

206class LockstepIntegrator(ExtendedSpaceIntegrator): 

207 """ 

208 An integrator that advances the physical and extension systems in lockstep. 

209 

210 This class integrates both the physical and extension systems simultaneously, then 

211 synchronizes the contexts after each step. It assumes that both integrators apply 

212 all forces at the beginning of each time step (a force-first scheme). If either 

213 integrator does not follow this scheme, the overall integration will be incorrect. 

214 

215 The step sizes of the physical and extension integrators must be equal. 

216 

217 Parameters 

218 ---------- 

219 physical_integrator 

220 The integrator for the physical system. 

221 extension_integrator 

222 The integrator for the extension system. If None, a copy of the physical 

223 integrator is used. 

224 assume_force_first 

225 If True, skip the validation that checks whether the integrators are known 

226 to follow a force-first scheme. Use this at your own risk if you know your 

227 integrators are compatible. Default is False. 

228 

229 Raises 

230 ------ 

231 ValueError 

232 If the physical and extension integrators do not follow a force-first scheme or 

233 do not have the same step size. 

234 

235 Example 

236 ------- 

237 >>> import openxps as xps 

238 >>> import openmm as mm 

239 >>> from openmm import unit 

240 

241 Using OpenMM's LangevinMiddleIntegrator integrator (force-first) 

242 

243 >>> integrator = mm.LangevinMiddleIntegrator( 

244 ... 300 * unit.kelvin, 1 / unit.picosecond, 1 * unit.femtosecond 

245 ... ) 

246 >>> xps_integrator = xps.LockstepIntegrator(integrator) 

247 >>> xps_integrator.getStepSize() 

248 0.001 ps 

249 

250 Using CSVR integrator with force-first scheme for the extension system 

251 

252 >>> csvr = xps.integrators.CSVRIntegrator( 

253 ... 300 * unit.kelvin, 10 / unit.picosecond, 1 * unit.femtosecond, 

254 ... forceFirst=True 

255 ... ) 

256 >>> xps_integrator_ff = xps.LockstepIntegrator(integrator, csvr) 

257 """ 

258 

259 def __init__( 

260 self, 

261 physical_integrator: mm.Integrator, 

262 extension_integrator: t.Optional[mm.Integrator] = None, 

263 assume_force_first: bool = False, 

264 ) -> None: 

265 if extension_integrator is None: 

266 extension_integrator = deepcopy(physical_integrator) 

267 elif not np.isclose( 

268 mmswig.Integrator_getStepSize(physical_integrator), 

269 mmswig.Integrator_getStepSize(extension_integrator), 

270 ): 

271 raise ValueError("The step sizes must be equal.") 

272 if not ( 

273 assume_force_first 

274 or ( 

275 self._is_force_first(physical_integrator) 

276 and self._is_force_first(extension_integrator) 

277 ) 

278 ): 

279 raise ValueError( 

280 "The integrators must follow a force-first scheme.\n" 

281 "If you are certain they do, set assume_force_first=True." 

282 ) 

283 super().__init__(physical_integrator, extension_integrator) 

284 

285 @staticmethod 

286 def _is_force_first(integrator: mm.Integrator) -> bool: 

287 """Check if an integrator follows a force-first scheme.""" 

288 if isinstance(integrator, KNOWN_FORCE_FIRST_INTEGRATORS): 

289 return True 

290 if isinstance(integrator, IntegratorMixin): 

291 return integrator.isForceFirst() 

292 return False 

293 

294 def step(self, steps: int) -> None: 

295 """Advance the extended phase-space simulation by a specified number of steps. 

296 

297 Parameters 

298 ---------- 

299 steps 

300 The number of time steps to advance the simulation. 

301 """ 

302 for _ in range(steps): 

303 self._physical_integrator.step(1) 

304 self._extension_integrator.step(1) 

305 self._coupling.updatePhysicalContext( 

306 self._physical_context, 

307 self._extension_context, 

308 ) 

309 self._coupling.updateExtensionContext( 

310 self._physical_context, 

311 self._extension_context, 

312 ) 

313 

314 

315class SplitIntegrator(ExtendedSpaceIntegrator): 

316 r""" 

317 An integrator that advances the physical and extension systems using an operator 

318 splitting scheme. 

319 

320 This integrator assumes that the physical and extension integrators are symmetric 

321 in terms of operator splitting, i.e., they can be represented as a palindromic 

322 sequence of operations. If either integrator does not follow this scheme, the 

323 overall integration scheme will be incorrect. 

324 

325 The step sizes of the physical and extension integrators must be related by an even 

326 integer ratio, with either the physical or extension integrator having the larger 

327 step size. 

328 

329 This class integrates the physical and extension systems in a :math:`B^n A B^n` 

330 pattern, where :math:`n` is the number of substeps, :math:`A` is the integrator with 

331 the larger step size :math:`\Delta t` (considered as the overall step size), and 

332 :math:`B` is the integrator with the smaller step size :math:`\Delta t/(2n)`. 

333 

334 Parameters 

335 ---------- 

336 physical_integrator 

337 The integrator for the physical system. 

338 extension_integrator 

339 The integrator for the extension system. If None, a copy of the physical 

340 integrator is used with half the step size of the physical integrator. 

341 assume_symmetric 

342 If True, skip the validation that checks whether the integrators are known 

343 to be symmetric in terms of operator splitting. Use this at your own risk if 

344 you know your integrators are compatible but are not in the known list. 

345 Default is False. 

346 

347 Raises 

348 ------ 

349 ValueError 

350 If the physical and extension integrators are not symmetric in terms of 

351 operator splitting, or do not have a step size ratio equal to an even integer 

352 number. 

353 

354 Example 

355 ------- 

356 >>> import openxps as xps 

357 >>> import openmm as mm 

358 >>> from openmm import unit 

359 

360 Using symmetric Verlet integrator with half step size 

361 

362 >>> integrator = xps.integrators.SymmetricVerletIntegrator(4 * unit.femtosecond) 

363 >>> xps_integrator = xps.SplitIntegrator(integrator) 

364 >>> xps_integrator.getStepSize() 

365 0.004 ps 

366 >>> xps_integrator.getNumSubsteps() 

367 1 

368 

369 Using symmetric Langevin integrator with 4:1 step size ratio 

370 

371 >>> physical = xps.integrators.SymmetricLangevinIntegrator( 

372 ... 300 * unit.kelvin, 1 / unit.picosecond, 4 * unit.femtosecond 

373 ... ) 

374 >>> extension = xps.integrators.SymmetricLangevinIntegrator( 

375 ... 300 * unit.kelvin, 1 / unit.picosecond, 1 * unit.femtosecond 

376 ... ) 

377 >>> xps_integrator = xps.SplitIntegrator(physical, extension) 

378 >>> xps_integrator.getStepSize() 

379 0.004 ps 

380 >>> xps_integrator.getNumSubsteps() 

381 2 

382 

383 Using symmetric Langevin integrator with 1:4 step size ratio 

384 

385 >>> physical = xps.integrators.SymmetricLangevinIntegrator( 

386 ... 300 * unit.kelvin, 1 / unit.picosecond, 2 * unit.femtosecond 

387 ... ) 

388 >>> extension = xps.integrators.SymmetricLangevinIntegrator( 

389 ... 300 * unit.kelvin, 1 / unit.picosecond, 8 * unit.femtosecond 

390 ... ) 

391 >>> xps_integrator = xps.SplitIntegrator(physical, extension) 

392 >>> xps_integrator.getStepSize() 

393 0.008 ps 

394 >>> xps_integrator.getNumSubsteps() 

395 2 

396 """ 

397 

398 def __init__( 

399 self, 

400 physical_integrator: mm.Integrator, 

401 extension_integrator: t.Optional[mm.Integrator] = None, 

402 assume_symmetric: bool = False, 

403 ) -> None: 

404 physical_step_size = mmswig.Integrator_getStepSize(physical_integrator) 

405 if extension_integrator is None: 

406 extension_integrator = deepcopy(physical_integrator) 

407 extension_step_size = physical_step_size / 2 

408 mmswig.Integrator_setStepSize(extension_integrator, extension_step_size) 

409 else: 

410 extension_step_size = mmswig.Integrator_getStepSize(extension_integrator) 

411 if not ( 

412 assume_symmetric 

413 or ( 

414 self._is_symmetric(physical_integrator) 

415 and self._is_symmetric(extension_integrator) 

416 ) 

417 ): 

418 raise ValueError( 

419 "The integrators must be symmetric in terms of operator splitting.\n" 

420 "If you are certain they are, set assume_symmetric=True." 

421 ) 

422 step_size = max(physical_step_size, extension_step_size) 

423 substep_size = min(physical_step_size, extension_step_size) 

424 if not self._is_even_division(step_size, substep_size): 

425 raise ValueError("The step sizes must be related by an even integer ratio.") 

426 super().__init__(physical_integrator, extension_integrator) 

427 

428 @staticmethod 

429 def _is_symmetric(integrator: mm.Integrator) -> bool: 

430 """Check if an integrator is symmetric in terms of operator splitting.""" 

431 if isinstance(integrator, KNOWN_SYMMETRIC_INTEGRATORS): 

432 return True 

433 if isinstance(integrator, IntegratorMixin): 

434 return not integrator.isForceFirst() 

435 return False 

436 

437 @staticmethod 

438 def _is_even_division(a: float, b: float) -> bool: 

439 if b == 0: 

440 raise ValueError("Division by zero is not allowed.") 

441 return np.isclose(a / b - round(a / b), 0) and int(round(a / b)) % 2 == 0 

442 

443 def _initialize(self) -> None: 

444 super()._initialize() 

445 physical_step_size = mmswig.Integrator_getStepSize(self._physical_integrator) 

446 extension_step_size = mmswig.Integrator_getStepSize(self._extension_integrator) 

447 if physical_step_size > extension_step_size: 

448 self._middle_integrator = self._physical_integrator 

449 self._end_integrator = self._extension_integrator 

450 ratio = physical_step_size / (2 * extension_step_size) 

451 else: 

452 self._middle_integrator = self._extension_integrator 

453 self._end_integrator = self._physical_integrator 

454 ratio = extension_step_size / (2 * physical_step_size) 

455 self._num_substeps = int(np.rint(ratio)) 

456 

457 def configure( 

458 self, 

459 physical_context: mm.Context, 

460 extension_context: mm.Context, 

461 coupling: Coupling, 

462 ) -> None: 

463 super().configure(physical_context, extension_context, coupling) 

464 if self._middle_integrator is self._physical_integrator: 

465 self._update_middle_context = self._coupling.updatePhysicalContext 

466 self._update_end_context = self._coupling.updateExtensionContext 

467 else: 

468 self._update_middle_context = self._coupling.updateExtensionContext 

469 self._update_end_context = self._coupling.updatePhysicalContext 

470 

471 def getNumSubsteps(self) -> int: 

472 """Get the number of substeps for the integrator. 

473 

474 Returns 

475 ------- 

476 int 

477 The number of substeps for the integrator. 

478 """ 

479 return self._num_substeps 

480 

481 def step(self, steps: int) -> None: 

482 """Advance the extended phase-space simulation by integrating the physical and 

483 extension systems in lockstep over a specified number of time steps. 

484 

485 Parameters 

486 ---------- 

487 steps : int 

488 The number of time steps to advance the simulation. 

489 """ 

490 step_count = self._physical_context.getStepCount() 

491 for _ in range(steps): 

492 self._end_integrator.step(self._num_substeps) 

493 self._update_middle_context(self._physical_context, self._extension_context) 

494 self._middle_integrator.step(1) 

495 self._update_end_context(self._physical_context, self._extension_context) 

496 self._end_integrator.step(self._num_substeps) 

497 self._update_middle_context(self._physical_context, self._extension_context) 

498 self._physical_context.setStepCount(step_count + steps)