Build SIR models
One of the simplest and most commonly used models to describe the progression of an outbreak or epidemic is the SIR (Susceptible - Infected - Recovered) model. We can use the SIR model to explore how to use the LASER framework, staring with a basic SIR model and adding complexity.
This tutorial will:
- Demonstrate how the
LaserFrame and PropertySet libraries are used
- Structure a basic disease transmission framework
- Track and visualize results
As you progress through the sections, you will learn how to add spatial dynamics and migration into the disease model, using both synthetic and real-world data.
Simple SIR
The SIR model presented here simulates disease dynamics within a closed population in a single node using the LaserFrame framework. The population starts with a defined number of susceptible and infected individuals, progresses over time with recovery and transmission components, and tracks results for visualization. This example serves as a practical guide for modeling simple epidemic dynamics. This simple example does not include vital dynamics, age-structured populations, vaccination, or other complex interactions.
Model components
The SIRModel class is the core of the implementation. It initializes a population using LaserFrame, sets up disease state and recovery timer properties, and tracks results across timesteps.
Code example: Implementing SIRModel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 | import numpy as np
import matplotlib.pyplot as plt
from laser.core import LaserFrame
from laser.core import PropertySet
class SIRModel:
def __init__(self, params):
# Model Parameters
self.params = params
# Initialize the population LaserFrame
self.population = LaserFrame(capacity=params.population_size,initial_count=params.population_size)
# Add disease state property (0 = Susceptible, 1 = Infected, 2 = Recovered)
self.population.add_scalar_property("disease_state", dtype=np.int32, default=0)
# Add a recovery timer property (for intrahost progression, optional for timing)
self.population.add_scalar_property("recovery_timer", dtype=np.int32, default=0)
# Results tracking
self.results = LaserFrame( capacity = 1 ) # number of nodes
self.results.add_vector_property( "S", length=params["timesteps"], dtype=np.float32 )
self.results.add_vector_property( "I", length=params["timesteps"], dtype=np.float32 )
self.results.add_vector_property( "R", length=params["timesteps"], dtype=np.float32 )
# Components
self.components = []
def add_component(self, component):
self.components.append(component)
def track_results(self, tick):
susceptible = (self.population.disease_state == 0).sum()
infected = (self.population.disease_state == 1).sum()
recovered = (self.population.disease_state == 2).sum()
total = self.population.count
self.results.S[tick] = susceptible / total
self.results.I[tick] = infected / total
self.results.R[tick] = recovered / total
def run(self):
for tick in range(self.params.timesteps):
for component in self.components:
component.step()
self.track_results(tick)
def plot_results(self):
plt.figure(figsize=(10, 6))
plt.plot(self.results.S, label="Susceptible (S)", color="blue")
plt.plot(self.results.I, label="Infected (I)", color="red")
plt.plot(self.results.R, label="Recovered (R)", color="green")
plt.title("SIR Model Dynamics with LASER Components")
plt.xlabel("Time (Timesteps)")
plt.ylabel("Fraction of Population")
plt.legend()
plt.grid()
plt.show()
plt.savefig("gpt_sir.png")
|
The IntrahostProgression class manages recovery dynamics by updating infected individuals based on a given recovery rate.
Code example: Implementing IntrahostProgression
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 | class IntrahostProgression:
def __init__(self, model):
self.population = model.population
# Seed the infection
num_initial_infected = int(0.01 * params.population_size) # e.g., 1% initially infected
infected_indices = np.random.choice(params.population_size, size=num_initial_infected, replace=False)
self.population.disease_state[infected_indices] = 1
# Initialize recovery timer for initially infected individuals
initially_infected = self.population.disease_state == 1
self.population.recovery_timer[initially_infected] = np.random.randint(5, 15, size=initially_infected.sum())
def step(self):
infected = self.population.disease_state == 1
# Decrement recovery timer
self.population.recovery_timer[infected] -= 1
# Recover individuals whose recovery_timer has reached 0
recoveries = infected & (self.population.recovery_timer <= 0)
self.population.disease_state[recoveries] = 2
|
The Transmission class manages disease spread by modeling interactions between susceptible and infected individuals.
Code example: Implementing Transmission
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 | class Transmission:
def __init__(self, model):
self.population = model.population
self.infection_rate = model.params.infection_rate
def step(self):
susceptible = self.population.disease_state == 0
infected = self.population.disease_state == 1
num_susceptible = susceptible.sum()
num_infected = infected.sum()
population_size = len(self.population)
# Fraction of infected and susceptible individuals
fraction_infected = num_infected / population_size
# Transmission logic: Probability of infection per susceptible individual
infection_probability = self.infection_rate * fraction_infected
# Apply infection probability to all susceptible individuals
new_infections = np.random.rand(num_susceptible) < infection_probability
# Set new infections and initialize their recovery_timer
susceptible_indices = np.where(susceptible)[0]
newly_infected_indices = susceptible_indices[new_infections]
self.population.disease_state[newly_infected_indices] = 1
self.population.recovery_timer[newly_infected_indices] = np.random.randint(5, 15, size=newly_infected_indices.size) # Random recovery time
|
The simulation parameters are defined using the PropertySet class.
Code example: Defining simulation parameters using PropertySet
| params = PropertySet({
"population_size": 100_000,
"infection_rate": 0.3,
"timesteps": 160
})
|
The model is initialized with the defined parameters, components are added, and the simulation is run for the specified timesteps. Results are then visualized.
Code example: Initialize, run the simulation, and plot the results
1
2
3
4
5
6
7
8
9
10
11
12 | # Initialize the model
sir_model = SIRModel(params)
# Initialize and add components
sir_model.add_component(IntrahostProgression(sir_model))
sir_model.add_component(Transmission(sir_model))
# Run the simulation
sir_model.run()
# Plot results
sir_model.plot_results()
|
Spatial SIR
Building upon the simple SIR model created above, we can add spatial complexity to the framework. Here the simple SIR model will spread the population across 20 nodes. The nodes are arranged in a 1-dimensional chain and infection spreads spatially from node 0 as agents migrate; migration is based on a migration matrix.
In this example, two migration options are available:
- Sequential migration matrix: Agents can only move to the next node in the chain.
- Gravity model migration matrix: Agents can move in a 2-dimensional spatial dynamic, where migration probabilities depend on node distances and population sizes.
For this example, the population is distributed across nodes using a rural-urban skew, and migration timers are assigned to control agent migration frequency.
Model components
As above, the model will require the use of LaserFrame, but will now also include spatial components.
Code example: Initial model importations
| import numpy as np
import matplotlib.pyplot as plt
import csv
from laser.core.laserframe import LaserFrame
from laser.core.demographics.spatialpops import distribute_population_skewed as dps
from laser.core.migration import gravity
|
Instead of using the SIRModel, we will use the MultiNodeSIRModel.
Code example: Creating a model using MultiNodeSIRModel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121 | # Define the model
class MultiNodeSIRModel:
"""
A multi-node SIR (Susceptible-Infected-Recovered) disease transmission model.
Attributes:
params (dict): Configuration parameters for the model.
nodes (int): Number of nodes in the simulation.
population (LaserFrame): Represents the population with agent-level properties.
results (np.ndarray): Stores simulation results for S, I, and R per node.
components (list): List of components (e.g., Migration, Transmission) added to the model.
"""
def __init__(self, params):
"""
Initializes the SIR model with the given parameters.
Args:
params (dict): Dictionary containing parameters such as population size,
number of nodes, timesteps, and rates for transmission/migration.
"""
self.components = []
self.params = params
self.nodes = params["nodes"]
self.population = LaserFrame(capacity=params["population_size"], initial_count=params["population_size"])
# Define properties
self.population.add_scalar_property("node_id", dtype=np.int32)
self.population.add_scalar_property("disease_state", dtype=np.int32, default=0) # 0=S, 1=I, 2=R
self.population.add_scalar_property("recovery_timer", dtype=np.int32, default=0)
self.population.add_scalar_property("migration_timer", dtype=np.int32, default=0)
# Initialize population distribution
node_pops = dps(params["population_size"], self.nodes, frac_rural=0.3)
node_ids = np.concatenate([np.full(count, i) for i, count in enumerate(node_pops)])
np.random.shuffle(node_ids)
self.population.node_id[:params["population_size"]] = node_ids
# Reporting structure: Use LaserFrame for reporting
self.results = LaserFrame( capacity=self.nodes ) # not timesteps for capacity
for state in ["S", "I", "R"]:
self.results.add_vector_property(state, length=params["timesteps"], dtype=np.int32)
# Record results: reporting could actually be a component if we wanted. Or it can be
# done in a log/report function in the relevant component (e.g., Transmission)
self.results.S[self.current_timestep, :] = [
np.sum(self.population.disease_state[self.population.node_id == i] == 0)
for i in range(self.nodes)
]
self.results.I[self.current_timestep, :] = [
np.sum(self.population.disease_state[self.population.node_id == i] == 1)
for i in range(self.nodes)
]
self.results.R[self.current_timestep, :] = [
np.sum(self.population.disease_state[self.population.node_id == i] == 2)
for i in range(self.nodes)
]
def add_component(self, component):
"""
Adds a component (e.g., Migration, Transmission, Recovery) to the model.
Args:
component: An instance of a component to be added.
"""
self.components.append(component)
def step(self):
"""
Advances the simulation by one timestep, updating all components and recording results.
"""
for component in self.components:
component.step()
# Record results
for i in range(self.nodes):
in_node = self.population.node_id == i
self.results[self.current_timestep, i, 0] = (self.population.disease_state[in_node] == 0).sum()
self.results[self.current_timestep, i, 1] = (self.population.disease_state[in_node] == 1).sum()
self.results[self.current_timestep, i, 2] = (self.population.disease_state[in_node] == 2).sum()
def run(self):
"""
Runs the simulation for the configured number of timesteps.
"""
from tqdm import tqdm
for self.current_timestep in tqdm(range(self.params["timesteps"])):
self.step()
def save_results(self, filename):
"""
Saves the simulation results to a CSV file.
Args:
filename (str): Path to the output file.
"""
with open(filename, mode='w', newline='') as file:
writer = csv.writer(file)
writer.writerow(["Timestep", "Node", "Susceptible", "Infected", "Recovered"])
for t in range(self.params["timesteps"]):
for node in range(self.nodes):
writer.writerow([t, node, *self.results[t, node]])
def plot_results(self):
"""
Plots the prevalence of infected agents over time for all nodes.
"""
plt.figure(figsize=(10, 6))
for i in range(self.nodes):
prevalence = self.results.I[:, i] / (
self.results.S[:, i] +
self.results.I[:, i] +
self.results.R[:, i]
)
plt.plot(prevalence, label=f"Node {i}")
plt.title("Prevalence Across All Nodes")
plt.xlabel("Timesteps")
plt.ylabel("Prevalence of Infected Agents")
plt.legend()
plt.show()
|
To add migration between nodes, we will need to select the type of migration model to use and import the component. Here, we will use the sequential migration matrix to move agents sequentially between nodes. The 0th node is the 'urban' node which contains the largest population and where we seed the infection. The infection will travel sequentially from node to node, but the below example breaks the connection at node 13 for demonstrative purposes.
Code example: Adding migration using the sequential migration matrix
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86 | class MigrationComponent:
"""
Handles migration behavior of agents between nodes in the model.
Attributes:
model (MultiNodeSIRModel): The simulation model instance.
migration_matrix (ndarray): A matrix representing migration probabilities between nodes.
"""
def __init__(self, model):
"""
Initializes the MigrationComponent.
Args:
model (MultiNodeSIRModel): The simulation model instance.
"""
self.model = model
# Set initial migration timers
max_timer = int(1 / model.params["migration_rate"])
model.population.migration_timer[:] = np.random.randint(1, max_timer + 1, size=model.params["population_size"])
self.migration_matrix = self.get_sequential_migration_matrix(model.nodes)
# Example customization: Disable migration from node 13 to 14
def break_matrix_node(matrix, from_node, to_node):
matrix[from_node][to_node] = 0
break_matrix_node(self.migration_matrix, 13, 14)
def get_sequential_migration_matrix(self, nodes):
"""
Creates a migration matrix where agents can only migrate to the next sequential node.
Args:
nodes (int): Number of nodes in the simulation.
Returns:
ndarray: A migration matrix where migration is allowed only to the next node.
"""
migration_matrix = np.zeros((nodes, nodes))
for i in range(nodes - 1):
migration_matrix[i, i + 1] = 1.0
return migration_matrix
def step(self):
"""
Updates the migration state of the population by determining which agents migrate
and their destinations based on the migration matrix.
"""
node_ids = self.model.population.node_id
# Decrement migration timers
self.model.population.migration_timer -= 1
# Identify agents ready to migrate
migrating_indices = np.where(self.model.population.migration_timer <= 0)[0]
if migrating_indices.size == 0:
return
# Shuffle migrants and assign destinations based on migration matrix
np.random.shuffle(migrating_indices)
destinations = np.empty(len(migrating_indices), dtype=int)
for origin in range(self.model.nodes):
origin_mask = node_ids[migrating_indices] == origin
num_origin_migrants = origin_mask.sum()
if num_origin_migrants > 0:
# Assign destinations proportionally to migration matrix
destination_counts = np.round(self.migration_matrix[origin] * num_origin_migrants).astype(int)
destination_counts = np.maximum(destination_counts, 0) # Clip negative values
if destination_counts.sum() == 0: # No valid destinations
destinations[origin_mask] = origin # Stay in the same node
continue
destination_counts[origin] += num_origin_migrants - destination_counts.sum() # Adjust rounding errors
# Create ordered destination assignments
destination_indices = np.repeat(np.arange(self.model.nodes), destination_counts)
destinations[origin_mask] = destination_indices[:num_origin_migrants]
# Update node IDs of migrants
node_ids[migrating_indices] = destinations
# Reset migration timers for migrated agents
self.model.population.migration_timer[migrating_indices] = np.random.randint(
1, int(1 / self.model.params["migration_rate"]) + 1, size=migrating_indices.size
)
|
To create more complicated and more realistic migration dynamics, instead of using sequential migration we can use the gravity model to implement 2-D migration. Migration rates are proportional to population sizes, but the example still uses synthetic distances for ease of demonstration.
Code example: Adding migration using the gravity model of migration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95 | class MigrationComponent2D:
"""
Handles migration behavior of agents between nodes in the model.
Attributes:
model (MultiNodeSIRModel): The simulation model instance.
migration_matrix (ndarray): A matrix representing migration probabilities between nodes.
"""
def __init__(self, model):
"""
Initializes the MigrationComponent.
Args:
model (MultiNodeSIRModel): The simulation model instance.
"""
self.model = model
# Set initial migration timers
max_timer = int(1 / model.params["migration_rate"])
model.population.migration_timer[:] = np.random.randint(1, max_timer + 1, size=model.params["population_size"])
self.migration_matrix = self.get_gravity_migration_matrix(model.nodes)
def get_gravity_migration_matrix(self, nodes):
"""
Generates a gravity-based migration matrix based on population and distances between nodes.
Args:
nodes (int): Number of nodes in the simulation.
Returns:
ndarray: A migration matrix where each row represents probabilities of migration to other nodes.
"""
pops = np.array([np.sum(self.model.population.node_id == i) for i in range(nodes)])
distances = np.ones((nodes, nodes)) - np.eye(nodes)
migration_matrix = gravity(pops, distances, k=1.0, a=0.5, b=0.5, c=2.0)
migration_matrix = migration_matrix / migration_matrix.sum(axis=1, keepdims=True)
return migration_matrix
def step(self):
"""
Executes the migration step for the agent-based model.
This function selects a fraction of agents to migrate based on expired migration timers.
It then changes their node_id according to the migration matrix, ensuring that movements
follow the prescribed probability distributions.
Steps:
- Selects a subset of the population for migration.
- Determines the origin nodes of migrating agents.
- Uses a multinomial draw to assign new destinations based on the migration matrix.
- Updates the agents' node assignments accordingly.
Returns:
None
"""
# Decrement migration timers
self.model.population.migration_timer -= 1
# Identify agents ready to migrate
migrating_indices = np.where(self.model.population.migration_timer <= 0)[0]
if migrating_indices.size == 0:
return
np.random.shuffle(migrating_indices)
origins = model.population.node_id[migrating_indices]
origin_counts = np.bincount(origins, minlength=model.params["nodes"])
offset = 0
for origin in range(model.params["nodes"]):
count = origin_counts[origin]
if count == 0:
continue
origin_slice = migrating_indices[offset : offset + count]
offset += count
row = self.migration_matrix[origin]
row_sum = row.sum()
if row_sum <= 0:
continue
fraction_row = row / row_sum
destination_counts = np.random.multinomial(count, fraction_row)
destinations = np.repeat(np.arange(model.params["nodes"]), destination_counts)
model.population.node_id[origin_slice] = destinations[:count]
# Reset migration timers for migrated agents
self.model.population.migration_timer[migrating_indices] = np.random.randint(
1, int(1 / self.model.params["migration_rate"]) + 1, size=migrating_indices.size
)
|
After selecting your desired migration patterns, you will need to add in a transmission component to create disease dynamics.
Code example: Adding in disease transmission
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 | class TransmissionComponent:
"""
Handles the disease transmission dynamics within the population.
Attributes:
model (MultiNodeSIRModel): The simulation model instance.
"""
def __init__(self, model):
"""
Initializes the TransmissionComponent and infects initial agents.
Args:
model (MultiNodeSIRModel): The simulation model instance.
"""
self.model = model
def step(self):
"""
Simulates disease transmission for each node in the current timestep.
"""
for i in range(self.model.nodes):
in_node = self.model.population.node_id == i
susceptible = in_node & (self.model.population.disease_state == 0)
infected = in_node & (self.model.population.disease_state == 1)
num_susceptible = susceptible.sum()
num_infected = infected.sum()
total_in_node = in_node.sum()
if total_in_node > 0 and num_infected > 0 and num_susceptible > 0:
infectious_fraction = num_infected / total_in_node
susceptible_fraction = num_susceptible / total_in_node
new_infections = int(
self.model.params["transmission_rate"] * infectious_fraction * susceptible_fraction * total_in_node
)
susceptible_indices = np.where(susceptible)[0]
newly_infected_indices = np.random.choice(susceptible_indices, size=new_infections, replace=False)
self.model.population.disease_state[newly_infected_indices] = 1
self.model.population.recovery_timer[newly_infected_indices] = np.random.randint(5, 15, size=new_infections)
|
Finally, we need to add recovery dynamics to the model to move agents through the disease progression.
Code example: Adding in recovery dynamics
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 | class RecoveryComponent:
"""
Handles the recovery dynamics of infected individuals in the population.
Attributes:
model (MultiNodeSIRModel): The simulation model instance.
"""
def __init__(self, model):
"""
Initializes the RecoveryComponent.
Args:
model (MultiNodeSIRModel): The simulation model instance.
"""
self.model = model
def step(self):
"""
Updates the recovery state of infected individuals, moving them to the recovered state
if their recovery timer has elapsed.
"""
infected = self.model.population.disease_state == 1
self.model.population.recovery_timer[infected] -= 1
self.model.population.disease_state[(infected) & (self.model.population.recovery_timer <= 0)] = 2
|
To run the created model and visualize your output, we will need to set our model parameters and run the simulation.
Code example: Running your spatial SIR model
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 | # Parameters
params = {
"population_size": 1_000_000,
"nodes": 20,
"timesteps": 600,
"initial_infected_fraction": 0.01,
"transmission_rate": 0.25,
"migration_rate": 0.001
}
# Run simulation
model = MultiNodeSIRModel(params)
model.add_component(MigrationComponent(model))
model.add_component(TransmissionComponent(model))
model.add_component(RecoveryComponent(model))
model.run()
model.save_results("simulation_results.csv")
model.plot_results()
|
Using real data
To utilize SIR models to understand real-world transmission dynamics, you will need to use real data. Model structure will be similar to what was presented above, but instead of using a synthetic population we will initialize the model using real population data. In this example, we will use data from Rwanda. You will want your data saved in a .csv file, with the following information:
- Region_id: node location, here each node is a city in Rwanda
- Population: the population of the node
- Centroid_lat and centroid-long: the latitude and longitude at the center of the node
- Birth_rate: the birth rate for the node
| region_id |
population |
centroid_lat |
centroid_long |
birth_rate |
| Ryansoro |
46482.7 |
-3.70727 |
29.7988 |
11.6565 |
| Ndava |
72979.3 |
-3.39156 |
29.7534 |
15.8815 |
| Buyengero |
76468.8 |
-3.84874 |
29.533 |
12.5038 |
| Bugarama |
44571.9 |
-3.69048 |
29.4004 |
11.0256 |
| Rumonge |
300248 |
-3.96221 |
29.4571 |
19.5677 |
| Burambi |
63219.7 |
-3.79864 |
29.4524 |
9.19902 |
| Kanyosha1 |
115018 |
-3.43097 |
29.4153 |
37.9514 |
| Kabezi |
71913.8 |
-3.5311 |
29.37 |
31.8319 |
| Muhuta |
88141.7 |
-3.62351 |
29.4152 |
21.5989 |
The model code will be very similar to the code used above, but the population data will be loaded from the .csv file instead of created synthetically. In the following example, numbers are rounded and scaled down (which is optional), and each node is assigned an ID.
Code example: Creating a model using data loaded from a CSV
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50 | class SpatialSIRModelRealData:
def __init__(self, params, population_data):
"""
Initialize the mode, LASER-style, using the population_data loaded from a csv file (pandas).
Create nodes and a migration_matrix based on populations and locations of each node.
"""
self.params = params
# We scale down population here from literal values but this may not be necessary.
population_data["scaled_population"] = (population_data["population"] / params["scale_factor"]).round().astype(int)
total_population = int(population_data["scaled_population"].sum())
print( f"{total_population=}" )
# Set up the properties as before
self.population = LaserFrame(capacity=total_population, initial_count=total_population)
self.population.add_scalar_property("node_id", dtype=np.int32)
self.population.add_scalar_property("disease_state", dtype=np.int32, default=0)
self.population.add_scalar_property("recovery_timer", dtype=np.int32, default=0)
self.population.add_scalar_property("migration_timer", dtype=np.int32, default=0)
node_pops = population_data["scaled_population"].values
self.params["nodes"] = len(node_pops)
node_ids = np.concatenate([np.full(count, i) for i, count in enumerate(node_pops)])
np.random.shuffle(node_ids)
self.population.node_id[:total_population] = node_ids
# seed in big node
big_node_id = np.argmax( node_pops )
available_population = population_data["scaled_population"][big_node_id]
initial_infected = int(params["initial_infected_fraction"] * available_population)
infection_indices = np.random.choice(np.where(self.population.node_id == big_node_id)[0], initial_infected, replace=False)
self.population.disease_state[infection_indices] = 1
self.population.recovery_timer[infection_indices] = np.random.uniform(params["recovery_time"] - 3, params["recovery_time"] + 3, size=initial_infected).astype(int)
pop_sizes = np.array(node_pops)
latitudes = population_data["centroid_lat"].values
longitudes = population_data["centroid_long"].values
distances = np.zeros((self.params["nodes"], self.params["nodes"]))
# Nested for loop here is optimized for readbility, not performance
for i in range(self.params["nodes"]):
for j in range(self.params["nodes"]):
if i != j:
distances[i, j] = distance(latitudes[i], longitudes[i], latitudes[j], longitudes[j])
# Set up our migration_matrix based on gravity model and input data (pops & distances)
self.distances = distances
self.migration_matrix = gravity(pop_sizes, distances, k=10.0, a=1.0, b=1.0, c=1.0)
self.migration_matrix /= self.migration_matrix.sum(axis=1, keepdims=True) # normalize
|
Alternative migration approach
In the above examples, we modeled migration by moving individual agents from node to node. An alternative approach is to model the migration of infection instead of individuals; this allows for computational efficiency while maintaining accurate disease transmission dynamics. Note that in this example, we do not use a MigrationComponent or migration_timer.
Code example: Modeling migration of infection instead of individuals
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 | import numpy as np
from laser.core.migration import gravity
from laser.core.utils import calc_distances
class TransmissionComponent:
"""
Transmission Component
=======================
This class models the transmission of disease using "infection migration"
instead of human movement. Instead of tracking individual movement,
infection is spread probabilistically based on a gravity-inspired network.
This approach can significantly improve computational efficiency for
large-scale spatial epidemic simulations.
Attributes:
------------
model : object
The simulation model containing population and node information.
network : ndarray
A matrix representing the transmission probabilities between nodes.
locations : ndarray
Array of node latitude and longitude coordinates.
"""
def __init__(self, model):
"""
Initializes the transmission component by computing the infection migration network.
Parameters:
-----------
model : object
The simulation model containing population and node information.
"""
self.model = model
model.nodes.add_vector_property("network", length=model.nodes.count, dtype=np.float32)
self.network = model.nodes.network
# Extract node locations and populations from model.population_data
self.locations = np.column_stack((model.population_data["centroid_lat"], model.population_data["centroid_long"]))
initial_populations = np.array(model.population_data["population"])
# Initialize heterogeneous transmission factor per agent (0.5 to 2.0)
self.model.population.tx_hetero_factor = np.random.uniform(0.5, 2.0, size=model.population.capacity)
# Compute the infection migration network based on node populations.
a, b, c, k = self.model.params.a, self.model.params.b, self.model.params.c, self.model.params.k
# Compute all pairwise distances in one call (this speeds up initialization significantly)
# from laser.core.migration import gravity, row_normalizer
# from laser.core.utils import calc_distances
distances = calc_distances(self.locations[:, 0], self.locations[:, 1])
self.network = gravity(initial_populations, distances, k, a, b, c)
self.network /= np.power(initial_populations.sum(), c) # Normalize
self.network = row_normalizer(self.network, 0.01) # 0.01=max_frac
def step(self):
"""
Simulates disease transmission and infection migration across the network.
New infections are determined deterministically based on contagion levels and susceptible fraction.
"""
contagious_indices = np.where(self.model.population.disease_state == 1)[0]
values = self.model.population.tx_hetero_factor[contagious_indices] # Apply heterogeneity factor
# Compute contagion levels per node
contagion = np.bincount(
self.model.population.node_id[contagious_indices],
weights=values,
minlength=self.model.nodes.count
).astype(np.float64)
# Apply network-based infection movement
transfer = (contagion * self.network).round().astype(np.float64)
# Ensure net contagion remains positive after movement
contagion += transfer.sum(axis=1) - transfer.sum(axis=0)
contagion = np.maximum(contagion, 0) # Prevent negative contagion
# Infect susceptible individuals in each node deterministically
for i in range(self.model.nodes.count):
node_population = np.where(self.model.population.node_id == i)[0]
susceptible = node_population[self.model.population.disease_state[node_population] == 0]
if len(susceptible) > 0:
# Compute new infections deterministically based on prevalence and susceptible fraction
num_new_infections = int(min(len(susceptible), (
self.model.params.transmission_rate * contagion[i] * len(susceptible) / len(node_population)
)))
# Randomly select susceptible individuals for infection
new_infected_indices = np.random.choice(susceptible, size=num_new_infections, replace=False)
self.model.population.disease_state[new_infected_indices] = 1
# Assign recovery timers to newly infected individuals
self.model.population.recovery_timer[new_infected_indices] = np.random.randint(5, 15, size=num_new_infections)
|