Intrinsic periodicity of the SIR system¶
Continuing our investigation of the Susceptible-Infected-Recovered system,
$$ \dot{S} = -\frac{\beta*S*I}{N} + \mu N - \mu S $$
$$ \dot{I} = \frac{\beta*S*I}{N} - \gamma I - \mu I $$
$$ \dot{R} = \gamma I - \mu R $$
With non-trivial endemic equilbrium
$$ (S^*, \: I^*, \: R^*) = (\frac{1}{R_0}, \:\: \frac{\mu (R_0-1)}{\beta}, \: \: 1-\frac{1}{R_0} - \frac{\mu (R_0-1)}{\beta}) $$
$$ \text{where} \:\: R_0 = \frac{\beta}{\gamma + \mu} $$
General analysis of a system's stability of and approach to equilibria is beyond the scope of this notebook, and detailed discussions are available elsewhere (e.g., Keeling/Rohani Box 2.4). In brief, one constructs the Jacobian of the system at the equilibrium points and computes its eigenvalues. If all eigenvalues have negative real component, then the equilibrium is stable; if the dominant eigenvalues are complex conjugates, then system approaches equilibrium via damped oscillations, with damping constant equal to the real component and frequency equal to the imaginary component. This is the case for the SIR system around the non-trivial ($R_0 \gt 1$) equilibrium, with dominant eigenvalues:
$$ \Lambda = -\frac{\mu R_0}{2} \pm \frac{\sqrt{\mu^2 R_0^2 - \frac{4}{A G}}}{2} $$
$$ \text{where} \:\: A = \frac{1}{\mu (R_0 -1)} and G = \frac{1}{\mu + \gamma} $$
In general, $\mu^2 R_0^2$ is quite small, and the intrinsic periodicity of the system is $T \approx 2 \pi \sqrt{A G}$.
Construct the model¶
The model is contructed as in notebook 05, though we will not record date of infection here. As again, we are looking at behavior around the endemic equilibrium, the same considerations of large-ish populations and long simulations apply here.
Sanity check¶
The first test, as always, ensures that certain basic constraints are being obeyed by the model.
Scientific test¶
The scientific test will sample a set of $(\mu, \gamma, R_0)$ tuplets and confirm that the periodicity is as expected.
Future work¶
The addition of an exposed compartment with rate constant $\sigma$ should change this result, by changing the generation time $G$ to $\frac{1}{\mu + \gamma} + \frac{1}{\mu + \sigma}$
import math
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from laser.core.propertyset import PropertySet
import laser.core
import laser.generic
import laser.core.distributions as dists
from laser.generic import SIR
from laser.generic import Model
from laser.generic.utils import ValuesMap
from laser.generic.vitaldynamics import ConstantPopVitalDynamics
from laser.core.utils import grid
from scipy.ndimage import gaussian_filter1d
from scipy.signal import find_peaks
print(f"{np.__version__=}")
print(f"{laser.core.__version__=}")
print(f"{laser.generic.__version__=}")
There will be a couple of challenges in testing here. We want to capture enough oscillations to get a stable signal but would also like to limit runtime. So aiming to sample a region in which oscillation periods will be order 1-3 years, so that we get plenty of cycles in a few decade simulation. Second is that there are lots of transient signals so we have to throw out some of the initial data. Lastly, and most importantly - the analysis above does rely on a bit of a linearization around the equilibrium point of the system, but this system is fundamentally a stochastic non-linear oscillator and is subject to lots of fun dynamics that can complicate empirically observing the fundamental periodicity.
To ameliorate these, we will: Try to start relatively close to equilibrium values of S, I, R. Use two methods to estimate frequency - "peak finding" in the time-domain, and autocorrelation spectrum in the frequency domain. And instead of looking for the period of maximum power, specifically look for a peak in the power spectrum in the vicinity of the expected period.
class Importation:
def __init__(self, model, infdurdist, infdurmin: int =1, period: int = 180, count: int = 3):
self.model = model
self.infdurdist = infdurdist
self.infdurmin = infdurmin
self.period = period
self.count = count
self.model.nodes.add_vector_property("imports", model.params.nticks + 1, dtype=np.uint32, default=0)
return
def step(self, tick: int) -> None:
if tick > 0 and tick % self.period == 0:
i_susceptible = np.nonzero(self.model.people.state == SIR.State.SUSCEPTIBLE.value)[0]
if len(i_susceptible) > 0:
count = min(self.count, len(i_susceptible))
i_infect = np.random.choice(i_susceptible, size=count, replace=False)
self.model.people.state[i_infect] = SIR.State.INFECTIOUS.value
samples = dists.sample_floats(self.infdurdist, np.zeros(count, np.float32))
samples = np.round(samples)
samples = np.maximum(samples, self.infdurmin).astype(self.model.people.itimer.dtype)
self.model.people.itimer[i_infect] = samples
inf_by_node = np.bincount(self.model.people.nodeid[i_infect], minlength=len(self.model.nodes)).astype(self.model.nodes.S.dtype)
self.model.nodes.S[tick + 1] -= inf_by_node
self.model.nodes.I[tick + 1] += inf_by_node
self.model.nodes.imports[tick] = inf_by_node
# else:
# print(f"No susceptibles to infect at tick {tick}")
return
pop=2e5
scenario = grid(M=1, N=1, population_fn=lambda x,y: pop, origin_x=0, origin_y=0)
initial_infected = 1
scenario["S"] = scenario.population - initial_infected
scenario["I"] = initial_infected
scenario["R"] = 0
parameters = PropertySet(
{"seed": 4, "nticks": 365*100, "verbose": True, "beta": 0.4, "inf_mean": 12, "cbr": 90, "importation_period": 180, "importation_count": 3}
)
# Use ConstantPopVitalDynamics (recycles existing agents on death) instead of
# BirthsByCBR + MortalityByEstimator. The recycling approach keeps the
# LaserFrame at the initial population size regardless of CBR, so we can use
# the deliberately-high CBR=90 (chosen for fast equilibration and short
# oscillation period) without pre-allocating slots for billions of future
# births. As a bonus, it keeps total N stable by construction — no reliance
# on a KaplanMeierEstimator tuned to a specific CBR regime.
recycle_rates = ValuesMap.from_scalar(parameters.cbr, nticks=parameters.nticks, nnodes=len(scenario))
infdurdist = dists.exponential(scale=parameters.inf_mean)
# Pass zero birthrates to Model so no slots are pre-allocated for future
# births — ConstantPopVitalDynamics handles demographics via recycling.
model = Model(scenario, parameters,
birthrates=ValuesMap.from_scalar(0.0, nticks=parameters.nticks, nnodes=len(scenario)))
model.components = [
SIR.Susceptible(model),
SIR.Recovered(model),
SIR.Infectious(model, infdurdist),
Importation(model, infdurdist),
SIR.Transmission(model, infdurdist),
ConstantPopVitalDynamics(model, recycle_rates),
]
model.run()
plt.plot(model.nodes.S, color="blue")
plt.plot(model.nodes.I, color="red")
plt.plot(model.nodes.R, color="green")
plt.plot(model.nodes.S+model.nodes.I+model.nodes.R, color="black")
plt.legend(["S", "I", "R", "N"])
plt.show()
Sanity checks¶
As always, check that we haven't broken anything - S+I+R = N at all times, stocks track flows (need to have deaths by state to do this right).
mu = (1 + model.params.cbr / 1000) ** (1 / 365) - 1
R0 = model.params.beta / (1 / model.params.inf_mean + mu)
A = 1 / ((R0 - 1) * mu) / 365
G = 1 / (mu + 1 / model.params.inf_mean) / 365
T_exp = 2 * np.pi * np.sqrt(A * G)
def ID_freq_peakfinder(y0, T_exp, cutoff=18250, plot=False):
y = y0[cutoff:]
y = y - np.mean(y)
y = gaussian_filter1d(y, sigma=50)
peaks, _ = find_peaks(y, distance=T_exp * 365 / 2)
peaks2, _ = find_peaks(-1*y, distance=T_exp * 365 / 2)
if plot:
plt.figure()
plt.plot(y, 'b-', lw=1)
plt.plot(peaks, y[peaks], "rx")
plt.plot(peaks2, y[peaks2], "kx")
return np.median(np.diff(peaks)) / 365, np.median(np.diff(peaks2)) / 365
T_obs_pf, T_obs_pf2 = ID_freq_peakfinder(np.squeeze(model.nodes.I), T_exp, plot=True)
def ID_freq_autocorr(y0, cutoff=18250):
# Compute the FFT
Y1 = np.fft.fft(y0[cutoff:] - np.mean(y0[cutoff:]))
# Compute the circular autocorrelation using the inverse FFT
circular_autocorr = np.fft.ifft(Y1 * np.conj(Y1)).real
# Plot only the positive frequency spectrum
peaks, _ = find_peaks(circular_autocorr, distance=300)
return peaks[0] / 365
T_obs_fft = ID_freq_autocorr(np.squeeze(model.nodes.I))
plt.text(0.5, 0.99, f"T expected: {T_exp:.2f} y", transform=plt.gca().transAxes)
plt.text(0.5, 0.95, f"T observed, peakfinding: {T_obs_pf:.2f} y", transform=plt.gca().transAxes)
plt.text(0.5, 0.91, f"T observed, valley-finding : {T_obs_pf:.2f} y", transform=plt.gca().transAxes)
plt.text(0.5, 0.87, f"T observed, FFT: {T_obs_fft:.2f} y", transform=plt.gca().transAxes)
plt.gca().spines['top'].set_visible(False)
plt.show()
Larger test suite¶
OK, so now we are going to replicate the above test for many values of R0 and cbr, as a scientific validity test.
This takes 5+ minutes.
Note on the CBR range. The sweep below samples CBR in 70–100 per 1000 per year. The world average is ~18 and even the highest real-world country (Niger) is ~45 — so this range is non-physical by design. The author's choice is for measurement convenience: at higher CBR the system equilibrates faster (characteristic timescale ~ 1/CBR years), the oscillation period is shorter (so more cycles fit in a fixed-length simulation), and the disease is more endemically stable (lots of new susceptibles per year). All three accelerate the validation test.
The trade-off is that the demonstration directly verifies the analytic formula only in this convenience regime. The expectation is that the formula remains accurate at realistic CBRs (~15–40), but this notebook doesn't independently demonstrate it without the "Regime robustness" section near the end, which sweeps CBR across both regimes at fixed
R₀andinf_mean.
nsims = 10
nticks = 36500
cbrs = 70 + 30 * np.random.rand(nsims)
inf_means = 50 + 50 * np.random.rand(nsims)
R0s = 5 + 10 * np.random.rand(nsims)
mu = [((1 + cbr / 1000) ** (1 / 365) - 1) for cbr in cbrs]
A = [1 / ((R0 - 1) * mu) / 365 for R0, mu in zip(R0s, mu)]
G = [1 / (mu + 1 / inf_mean) / 365 for mu, inf_mean in zip(mu, inf_means)]
T_exp = [2 * math.pi * np.sqrt(A * G) for A, G in zip(A, G)]
mycases = np.zeros((nsims, nticks))
params_df = pd.DataFrame(
{
"cbr": cbrs,
"inf_mean": inf_means,
"R0": R0s,
"A": A,
"G": G,
"T_exp": T_exp,
"mu": mu,
}
)
pop=2e5
initial_infected = 1
mycases = np.zeros((nsims, nticks+1))
params_df
| cbr | inf_mean | R0 | A | G | T_exp | mu | |
|---|---|---|---|---|---|---|---|
| 0 | 91.930983 | 62.254472 | 14.072421 | 0.869696 | 0.168039 | 2.401977 | 0.000241 |
| 1 | 90.884256 | 74.406422 | 10.701467 | 1.184809 | 0.200301 | 3.060874 | 0.000238 |
| 2 | 73.702560 | 68.962034 | 5.772357 | 2.946291 | 0.186432 | 4.656696 | 0.000195 |
| 3 | 75.019508 | 50.755062 | 10.059807 | 1.525691 | 0.137670 | 2.879605 | 0.000198 |
| 4 | 76.882418 | 50.648404 | 11.988046 | 1.228547 | 0.137351 | 2.581021 | 0.000203 |
| 5 | 73.679016 | 74.924584 | 10.868083 | 1.425312 | 0.202320 | 3.374071 | 0.000195 |
| 6 | 70.720934 | 92.910228 | 8.861059 | 1.861457 | 0.250196 | 4.287921 | 0.000187 |
| 7 | 86.677990 | 78.410540 | 10.893828 | 1.215774 | 0.211054 | 3.182755 | 0.000228 |
| 8 | 93.607440 | 58.109627 | 9.839757 | 1.264072 | 0.156968 | 2.798796 | 0.000245 |
| 9 | 95.751173 | 57.784105 | 10.887793 | 1.105884 | 0.156053 | 2.610183 | 0.000251 |
for i in range(nsims):
row = params_df.iloc[i]
scenario = grid(M=1, N=1, population_fn=lambda x,y: pop, origin_x=0, origin_y=0)
parameters = PropertySet(
{"seed": 42+i,
"nticks": 365*100,
"verbose": True,
"beta": row["R0"] * (row["mu"] + 1 / row["inf_mean"]),
"inf_mean": row["inf_mean"],
"cbr": row["cbr"],
"importation_period": 180,
"importation_count": 3}
)
scenario["I"] = initial_infected
scenario["R"] = np.round(pop * (1 / row["R0"])).astype(np.int32)
scenario["S"] = scenario.population - scenario["I"] - scenario["R"]
# Zero birthrates → no capacity pre-allocation; ConstantPopVitalDynamics
# recycles existing agents at the configured rate.
recycle_rates = ValuesMap.from_scalar(parameters.cbr, nticks=parameters.nticks, nnodes=len(scenario))
model = Model(scenario, parameters,
birthrates=ValuesMap.from_scalar(0.0, nticks=parameters.nticks, nnodes=len(scenario)))
infdurdist = dists.exponential(scale=parameters.inf_mean)
model.components = [
SIR.Susceptible(model),
SIR.Recovered(model),
SIR.Infectious(model, infdurdist),
Importation(model, infdurdist),
SIR.Transmission(model, infdurdist),
ConstantPopVitalDynamics(model, recycle_rates),
]
model.run(f"{i+1:2}/{nsims} simulations")
mycases[i, :] = np.squeeze(model.nodes.I)
params_df["T_obs_peakfinder1"] = np.nan
params_df["T_obs_peakfinder2"] = np.nan
params_df["T_obs_autocorr"] = np.nan
for i in range(mycases.shape[0]):
#plt.plot(mycases[i, :])
pf1, pf2 = ID_freq_peakfinder(np.squeeze(mycases[i, :]), params_df.loc[i, "T_exp"], plot=True)
params_df.loc[i, "T_obs_peakfinder1"] = pf1
params_df.loc[i, "T_obs_peakfinder2"] = pf2
params_df.loc[i, "T_obs_autocorr"] = ID_freq_autocorr(np.squeeze(mycases[i, :]))
plt.text(0.5, 0.99, f"T expected: {params_df.loc[i, 'T_exp']:.2f} y", transform=plt.gca().transAxes)
plt.text(0.5, 0.95, f"T observed, peakfinding: {params_df.loc[i, 'T_obs_peakfinder1']:.2f} y", transform=plt.gca().transAxes)
plt.text(0.5, 0.91, f"T observed, valleyfinding: {params_df.loc[i, 'T_obs_peakfinder2']:.2f} y", transform=plt.gca().transAxes)
plt.text(0.5, 0.87, f"T observed, FFT: {params_df.loc[i, 'T_obs_autocorr']:.2f} y", transform=plt.gca().transAxes)
plt.gca().spines['top'].set_visible(False)
plt.show()
params_df
| cbr | inf_mean | R0 | A | G | T_exp | mu | T_obs_peakfinder1 | T_obs_peakfinder2 | T_obs_autocorr | |
|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 91.930983 | 62.254472 | 14.072421 | 0.869696 | 0.168039 | 2.401977 | 0.000241 | 2.013699 | 2.191781 | 1.153425 |
| 1 | 90.884256 | 74.406422 | 10.701467 | 1.184809 | 0.200301 | 3.060874 | 0.000238 | 2.547945 | 2.189041 | 2.926027 |
| 2 | 73.702560 | 68.962034 | 5.772357 | 2.946291 | 0.186432 | 4.656696 | 0.000195 | 3.589041 | 3.449315 | 4.200000 |
| 3 | 75.019508 | 50.755062 | 10.059807 | 1.525691 | 0.137670 | 2.879605 | 0.000198 | 2.568493 | 2.438356 | 2.895890 |
| 4 | 76.882418 | 50.648404 | 11.988046 | 1.228547 | 0.137351 | 2.581021 | 0.000203 | 2.172603 | 2.298630 | 2.397260 |
| 5 | 73.679016 | 74.924584 | 10.868083 | 1.425312 | 0.202320 | 3.374071 | 0.000195 | 3.041096 | 2.939726 | 1.561644 |
| 6 | 70.720934 | 92.910228 | 8.861059 | 1.861457 | 0.250196 | 4.287921 | 0.000187 | 3.821918 | 3.102740 | 4.087671 |
| 7 | 86.677990 | 78.410540 | 10.893828 | 1.215774 | 0.211054 | 3.182755 | 0.000228 | 2.501370 | 2.497260 | 1.375342 |
| 8 | 93.607440 | 58.109627 | 9.839757 | 1.264072 | 0.156968 | 2.798796 | 0.000245 | 2.335616 | 2.408219 | 2.797260 |
| 9 | 95.751173 | 57.784105 | 10.887793 | 1.105884 | 0.156053 | 2.610183 | 0.000251 | 1.997260 | 2.358904 | 2.326027 |
fig, ax = plt.subplots(figsize=(6, 6))
# y=x reference line spanning the actual data range, not the hardcoded [1, 4]
# from the original notebook (which fell short of the data once T_exp moved
# into the 4-7 year range at realistic CBR).
all_vals = pd.concat([
params_df["T_exp"],
params_df["T_obs_peakfinder1"],
params_df["T_obs_peakfinder2"],
params_df["T_obs_autocorr"],
])
t_min, t_max = all_vals.min(), all_vals.max()
pad = 0.5 + 0.05 * (t_max - t_min)
lim_lo, lim_hi = t_min - pad, t_max + pad
ax.plot([lim_lo, lim_hi], [lim_lo, lim_hi], "k--", lw=1, alpha=0.4, label="y = x")
ax.plot(params_df["T_exp"], params_df["T_obs_peakfinder1"], "o",
color="C0", ms=8, label="peak-finder")
ax.plot(params_df["T_exp"], params_df["T_obs_peakfinder2"], "s",
color="C2", ms=8, label="valley-finder")
ax.plot(params_df["T_exp"], params_df["T_obs_autocorr"], "x",
color="C3", ms=10, mew=2, label="autocorr FFT")
ax.set_xlabel(r"$T_{\mathrm{exp}}$ (years, analytic)")
ax.set_ylabel(r"$T_{\mathrm{obs}}$ (years, measured)")
ax.set_title("Observed vs analytic oscillation period")
ax.set_xlim(lim_lo, lim_hi)
ax.set_ylim(lim_lo, lim_hi)
ax.set_aspect("equal", adjustable="box")
ax.grid(True, alpha=0.3)
ax.legend(loc="best", framealpha=0.9)
plt.tight_layout()
plt.show()
Programmatic validation¶
Don't rely on the human eye to confirm that the observed periods track the analytic prediction. The next cell does it numerically: it compares each of the three T_obs estimators (peak-finder, valley-finder, autocorrelation FFT) against T_exp from T ≈ 2π√(A·G), and asserts that the agreement is within reasonable bounds.
The three estimators are individually noisy on a stochastic SIR oscillator, so the test takes the best of the three per sim as the consensus. Thresholds are deliberately loose enough to absorb run-to-run variation from the random draws of (CBR, inf_mean, R₀), but tight enough to catch a real regression (e.g. a buggy peak-finder, or parameters that put the system outside the regime where the linearized theory holds).
import os
# Tolerances picked with run-to-run variation in mind. With CBR=70-100 and
# stable population (via ConstantPopVitalDynamics recycling), max
# best-of-three relative error tends to sit around 20% and per-method
# correlations are typically >0.7. Tighten if the model gets cleaner; loosen
# only if a legitimate parameter range starts hitting the wall.
#
# We require *at least* MIN_PASSING_METHODS of the three estimators to correlate
# with T_exp. The autocorrelation FFT is known to land on T_exp/2 when the
# signal carries a second harmonic, and the peak-finder can drift on noisy
# signals — but the three together are robust, which mirrors the per-sim
# "best-of-three" relative error philosophy used below.
#
# By default a failure prints a soft warning and continues, so a stochastic
# outlier doesn't kill the docs build. The Build Combined Doc workflow runs
# docs/check_executed_nbs.py with ALLOW_NB_ERRORS=0 by default, which fails
# the build if any executed notebook contains error-output cells — a hard
# AssertionError here would land in that category. Set STRICT_VALIDATION=1
# in the env to convert failures into a hard AssertionError for local
# validation or a dedicated gating CI lane.
CORR_THRESHOLD = 0.5
MIN_PASSING_METHODS = 2
MEDIAN_REL_ERR_THRESHOLD = 0.30
MAX_REL_ERR_THRESHOLD = 0.60
STRICT_VALIDATION = os.environ.get("STRICT_VALIDATION", "0") == "1"
methods = ["T_obs_peakfinder1", "T_obs_peakfinder2", "T_obs_autocorr"]
T_exp_arr = params_df["T_exp"].to_numpy()
rel_err_per_method = {
m: np.abs(params_df[m].to_numpy() - T_exp_arr) / T_exp_arr for m in methods
}
best_per_sim = np.minimum.reduce(list(rel_err_per_method.values()))
correlations = {m: float(np.corrcoef(T_exp_arr, params_df[m].to_numpy())[0, 1]) for m in methods}
print("=" * 64)
print("Periodicity validation: T_obs vs T_exp")
print("=" * 64)
print("\nPer-method correlation with T_exp:")
passing_methods = 0
for m, r in correlations.items():
# NaN guard: np.corrcoef returns NaN for a constant series; treat as fail.
if math.isnan(r):
ok = False
status = "nan"
else:
ok = r > CORR_THRESHOLD
status = "PASS" if ok else "fail"
print(f" {m:24s} r = {r:+.3f} [{status}]")
if ok:
passing_methods += 1
print(f"\nMethods passing correlation threshold: {passing_methods}/{len(methods)} (need ≥ {MIN_PASSING_METHODS})")
print("\nPer-sim best-of-three relative error:")
for i, err in enumerate(best_per_sim):
print(f" sim {i}: {err * 100:5.1f}%")
median_best = float(np.median(best_per_sim))
max_best = float(np.max(best_per_sim))
print(f"\nMedian best-of-three rel error: {median_best * 100:5.1f}% (threshold {MEDIAN_REL_ERR_THRESHOLD * 100:.0f}%)")
print(f"Max best-of-three rel error: {max_best * 100:5.1f}% (threshold {MAX_REL_ERR_THRESHOLD * 100:.0f}%)")
failures = []
if passing_methods < MIN_PASSING_METHODS:
failed_methods = [m for m, r in correlations.items() if math.isnan(r) or r <= CORR_THRESHOLD]
failures.append(
f"only {passing_methods}/{len(methods)} methods correlated with T_exp "
f"(failed: {', '.join(failed_methods)})"
)
# NaN aggregates would otherwise silently pass the threshold comparisons —
# count them as failures explicitly.
if math.isnan(median_best):
failures.append("median best-of-three is NaN (an estimator returned non-finite values)")
elif median_best > MEDIAN_REL_ERR_THRESHOLD:
failures.append(f"median best-of-three {median_best:.3f} > {MEDIAN_REL_ERR_THRESHOLD}")
if math.isnan(max_best):
failures.append("max best-of-three is NaN (an estimator returned non-finite values)")
elif max_best > MAX_REL_ERR_THRESHOLD:
failures.append(f"max best-of-three {max_best:.3f} > {MAX_REL_ERR_THRESHOLD}")
print("\n" + "=" * 64)
if failures:
msg = "Periodicity validation FAILED:\n - " + "\n - ".join(failures)
if STRICT_VALIDATION:
raise AssertionError(msg)
print(f"⚠️ SOFT-WARN: {msg}")
print(" (export STRICT_VALIDATION=1 to convert this into a hard failure.)")
else:
print(f"ALL CHECKS PASSED — T_obs tracks T_exp across {len(params_df)} sims")
print("=" * 64)
Regime robustness: does the formula hold at realistic CBRs?¶
The headline test above runs at CBR = 70–100, far above any real-world demographic regime, because that's where the system equilibrates fast enough and the period is short enough to fit many cycles in a reasonable simulation. Showing that the analytic prediction holds there doesn't directly demonstrate it also holds at realistic CBRs (~15–40), where someone would actually apply it.
The cell below addresses that gap: hold R₀ and inf_mean fixed and sweep CBR across both the unrealistic-but-easy-to-measure range and the realistic range. If T_obs / T_exp stays close to 1 across the full sweep, the formula is shown to be regime-independent, and the high-CBR convenience used above doesn't hide a systematic bias at lower CBRs.
This adds ~3 minutes of runtime.
# Hold R0 and inf_mean fixed; sweep CBR across both the test-convenience and
# real-world regimes. The pop=200K, nticks=100y, ConstantPopVitalDynamics
# setup means every point in the sweep stays demographically stable.
regime_cbrs = np.array([15, 25, 40, 60, 80, 100])
regime_R0 = 10
regime_inf_mean = 12
regime_pop = 2e5
regime_nticks = 365 * 100
regime_T_exp = []
regime_T_obs_best = []
regime_T_obs_pf1 = []
regime_T_obs_pf2 = []
regime_T_obs_ac = []
for cbr in regime_cbrs:
mu = (1 + cbr / 1000) ** (1 / 365) - 1
A = 1 / ((regime_R0 - 1) * mu) / 365 # years
G = 1 / (mu + 1 / regime_inf_mean) / 365 # years
T_exp = 2 * math.pi * np.sqrt(A * G)
regime_T_exp.append(T_exp)
scenario = grid(M=1, N=1, population_fn=lambda x, y: regime_pop, origin_x=0, origin_y=0)
scenario["I"] = 1
scenario["R"] = np.round(regime_pop * (1 / regime_R0)).astype(np.int32)
scenario["S"] = scenario.population - scenario["I"] - scenario["R"]
parameters = PropertySet({
"seed": 4242,
"nticks": regime_nticks,
"verbose": False,
"beta": regime_R0 * (mu + 1 / regime_inf_mean),
"inf_mean": regime_inf_mean,
"cbr": float(cbr),
"importation_period": 180,
"importation_count": 3,
})
recycle_rates = ValuesMap.from_scalar(parameters.cbr, nticks=parameters.nticks, nnodes=len(scenario))
model = Model(
scenario, parameters,
birthrates=ValuesMap.from_scalar(0.0, nticks=parameters.nticks, nnodes=len(scenario)),
)
infdurdist = dists.exponential(scale=parameters.inf_mean)
model.components = [
SIR.Susceptible(model),
SIR.Recovered(model),
SIR.Infectious(model, infdurdist),
Importation(model, infdurdist),
SIR.Transmission(model, infdurdist),
ConstantPopVitalDynamics(model, recycle_rates),
]
model.run(f"regime sweep: CBR={cbr}")
I_series = np.squeeze(model.nodes.I)
t_pf1, t_pf2 = ID_freq_peakfinder(I_series, T_exp)
t_ac = ID_freq_autocorr(I_series)
regime_T_obs_pf1.append(t_pf1)
regime_T_obs_pf2.append(t_pf2)
regime_T_obs_ac.append(t_ac)
# Filter out non-finite estimates (e.g. peak-finder may find <2 peaks and
# return NaN from np.median(np.diff([])); autocorr could return NaN on a
# constant series) before picking the closest-to-T_exp. Falls back to NaN
# with a warning if all three are non-finite, rather than letting min()
# silently propagate NaN through the comparison.
finite_candidates = [t for t in (t_pf1, t_pf2, t_ac) if math.isfinite(t)]
if finite_candidates:
best = min(finite_candidates, key=lambda t: abs(t - T_exp))
else:
print(f" WARNING: all three estimators returned non-finite values at CBR={cbr}; using NaN.")
best = float("nan")
regime_T_obs_best.append(best)
regime_T_exp = np.array(regime_T_exp)
regime_T_obs_best = np.array(regime_T_obs_best)
ratio = regime_T_obs_best / regime_T_exp
fig, axes = plt.subplots(1, 2, figsize=(11, 4.5))
axes[0].plot(regime_cbrs, regime_T_exp, "o-", label=r"$T_{\mathrm{exp}}$ (analytic)", color="C0")
axes[0].plot(regime_cbrs, regime_T_obs_pf1, "x", label="peak-finder", color="C0", alpha=0.5)
axes[0].plot(regime_cbrs, regime_T_obs_pf2, "+", label="valley-finder", color="C2", alpha=0.5)
axes[0].plot(regime_cbrs, regime_T_obs_ac, "v", label="autocorr", color="C3", alpha=0.5)
axes[0].plot(regime_cbrs, regime_T_obs_best, "s-", label="best-of-three", color="C1")
axes[0].axvspan(15, 40, alpha=0.10, color="green", label="realistic CBR range")
axes[0].set_xlabel("CBR (per 1000 / year)")
axes[0].set_ylabel("Period (years)")
axes[0].set_title("Period vs CBR (R0=10, inf_mean=12)")
axes[0].legend(fontsize=8, loc="best")
axes[0].grid(True, alpha=0.3)
axes[1].plot(regime_cbrs, ratio, "o-", color="C3")
axes[1].axhline(1.0, ls="--", color="k", alpha=0.4, label="ratio = 1")
axes[1].axvspan(15, 40, alpha=0.10, color="green", label="realistic CBR range")
axes[1].set_xlabel("CBR (per 1000 / year)")
axes[1].set_ylabel(r"$T_{\mathrm{obs}} / T_{\mathrm{exp}}$ (best-of-three)")
axes[1].set_title("Formula accuracy across CBR regime")
axes[1].set_ylim(0.5, 1.5)
axes[1].legend(loc="best")
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print(f"\nRegime-sweep (R0={regime_R0}, inf_mean={regime_inf_mean}, n=1 sim per CBR):")
print(f" CBR | T_exp | T_obs(best) | T_obs/T_exp")
print(f" ----+-------+-------------+------------")
for cbr, te, to, r in zip(regime_cbrs, regime_T_exp, regime_T_obs_best, ratio):
print(f" {int(cbr):3d} | {te:5.2f} | {to:11.2f} | {r:6.3f}")
# Soft check: flag if ratio drifts substantially with CBR.
realistic_mask = regime_cbrs <= 40
testing_mask = regime_cbrs >= 60
if realistic_mask.any() and testing_mask.any():
# nanmean tolerates the rare case where best fell back to NaN above.
realistic_ratio = float(np.nanmean(ratio[realistic_mask]))
testing_ratio = float(np.nanmean(ratio[testing_mask]))
drift = abs(realistic_ratio - testing_ratio)
print(f"\nMean T_obs/T_exp in realistic CBR (≤40): {realistic_ratio:.3f}")
print(f"Mean T_obs/T_exp in testing CBR (≥60): {testing_ratio:.3f}")
print(f"Drift between regimes: {drift:.3f}")
if drift > 0.20:
print("\nNOTE: T_obs/T_exp drifts > 0.20 between regimes — the formula's")
print(" accuracy may depend on CBR. Worth investigating before trusting")
print(" the headline test at unrealistic CBR as evidence for the realistic one.")