"""Mesa Space Module.
Objects used to add a spatial component to a model.
* Grid: base grid, which creates a rectangular grid.
* SingleGrid: extension to Grid which strictly enforces one agent per cell.
* MultiGrid: extension to Grid where each cell can contain a set of agents.
* HexGrid: extension to Grid to handle hexagonal neighbors.
* ContinuousSpace: a two-dimensional space where each agent has an arbitrary position of `float`'s.
* NetworkGrid: a network where each node contains zero or more agents.
"""
# Mypy; for the `|` operator purpose
# Remove this __future__ import once the oldest supported Python is 3.10
from __future__ import annotations
import collections
import contextlib
import inspect
import itertools
import math
import warnings
from collections.abc import Callable, Iterable, Iterator, Sequence
from numbers import Real
from typing import Any, TypeVar, cast, overload
from warnings import warn
with contextlib.suppress(ImportError):
import networkx as nx
import numpy as np
import numpy.typing as npt
# For Mypy
from .agent import Agent
# for better performance, we calculate the tuple to use in the is_integer function
_types_integer = (int, np.integer)
Coordinate = tuple[int, int]
# used in ContinuousSpace
FloatCoordinate = tuple[float, float] | npt.NDArray[float]
NetworkCoordinate = int
Position = Coordinate | FloatCoordinate | NetworkCoordinate
GridContent = Agent | None
MultiGridContent = list[Agent]
F = TypeVar("F", bound=Callable[..., Any])
[docs]
def accept_tuple_argument(wrapped_function: F) -> F:
"""Decorator to allow grid methods that take a list of (x, y) coord tuples to also handle a single position.
Tuples are wrapped in a single-item list rather than forcing user to do it.
"""
def wrapper(grid_instance, positions) -> Any:
if len(positions) == 2 and not isinstance(positions[0], tuple):
positions = [positions]
return wrapped_function(grid_instance, positions)
return cast(F, wrapper)
[docs]
def is_integer(x: Real) -> bool:
"""Check if x is either a CPython integer or Numpy integer."""
return isinstance(x, _types_integer)
[docs]
def warn_if_agent_has_position_already(placement_func):
"""Decorator to give warning if agent has position already set."""
def wrapper(self, agent, *args, **kwargs):
if agent.pos is not None:
warnings.warn(
f"""Agent {agent.unique_id} is being placed with
place_agent() despite already having the position {agent.pos}. In most
cases, you'd want to clear the current position with remove_agent()
before placing the agent again.""",
stacklevel=2,
)
placement_func(self, agent, *args, **kwargs)
return wrapper
class _Grid:
"""Base class for a rectangular grid.
Grid cells are indexed by [x, y], where [0, 0] is assumed to be the
bottom-left and [width-1, height-1] is the top-right. If a grid is
toroidal, the top and bottom, and left and right, edges wrap to each other
Properties:
width, height: The grid's width and height.
torus: Boolean which determines whether to treat the grid as a torus.
"""
def __init__(self, width: int, height: int, torus: bool) -> None:
"""Create a new grid.
Args:
width: The grid's width.
height: The grid's height.
torus: Boolean whether the grid wraps or not.
"""
self.height = height
self.width = width
self.torus = torus
self.num_cells = height * width
# Internal list-of-lists which holds the grid cells themselves
self._grid: list[list[GridContent]]
self._grid = [
[self.default_val() for _ in range(self.height)] for _ in range(self.width)
]
# Flag to check if the empties set has been created. Better than initializing
# _empties as set() because in this case it would become impossible to discern
# if the set hasn't still being built or if it has become empty after creation.
self._empties_built = False
# Neighborhood Cache
self._neighborhood_cache: dict[Any, Sequence[Coordinate]] = {}
# Cutoff used inside self.move_to_empty. The parameters are fitted on Python
# 3.11 and it was verified that they are roughly the same for 3.10. Refer to
# the code in PR#1565 to check for their stability when a new release gets out.
self.cutoff_empties = 7.953 * self.num_cells**0.384
@staticmethod
def default_val() -> None:
"""Default value for new cell elements."""
return None
@property
def empties(self) -> set:
if not self._empties_built:
self.build_empties()
return self._empties
def build_empties(self) -> None:
self._empties = set(
filter(
self.is_cell_empty,
itertools.product(range(self.width), range(self.height)),
)
)
self._empties_built = True
@overload
def __getitem__(self, index: int | Sequence[Coordinate]) -> list[GridContent]: ...
@overload
def __getitem__(
self, index: tuple[int | slice, int | slice]
) -> GridContent | list[GridContent]: ...
def __getitem__(self, index):
"""Access contents from the grid."""
if isinstance(index, int):
# grid[x]
return self._grid[index]
elif isinstance(index[0], tuple):
# grid[(x1, y1), (x2, y2), ...]
index = cast(Sequence[Coordinate], index)
return [self._grid[x][y] for x, y in map(self.torus_adj, index)]
x, y = index
x_int, y_int = is_integer(x), is_integer(y)
if x_int and y_int:
# grid[x, y]
index = cast(Coordinate, index)
x, y = self.torus_adj(index)
return self._grid[x][y]
elif x_int:
# grid[x, :]
x, _ = self.torus_adj((x, 0))
y = cast(slice, y)
return self._grid[x][y]
elif y_int:
# grid[:, y]
_, y = self.torus_adj((0, y))
x = cast(slice, x)
return [rows[y] for rows in self._grid[x]]
else:
# grid[:, :]
x, y = (cast(slice, x), cast(slice, y))
return [cell for rows in self._grid[x] for cell in rows[y]]
def __iter__(self) -> Iterator[GridContent]:
"""Create an iterator that chains the rows of the grid together as if it is one list."""
return itertools.chain(*self._grid)
def coord_iter(self) -> Iterator[tuple[GridContent, Coordinate]]:
"""An iterator that returns positions as well as cell contents."""
for row in range(self.width):
for col in range(self.height):
yield self._grid[row][col], (row, col) # agent, position
def iter_neighborhood(
self,
pos: Coordinate,
moore: bool,
include_center: bool = False,
radius: int = 1,
) -> Iterator[Coordinate]:
"""Return an iterator over cell coordinates that are in the neighborhood of a certain point.
Args:
pos: Coordinate tuple for the neighborhood to get.
moore: If True, return Moore neighborhood
(including diagonals)
If False, return Von Neumann neighborhood
(exclude diagonals)
include_center: If True, return the (x, y) cell as well.
Otherwise, return surrounding cells only.
radius: radius, in cells, of neighborhood to get.
Returns:
An iterator of coordinate tuples representing the neighborhood. For
example with radius 1, it will return list with number of elements
equals at most 9 (8) if Moore, 5 (4) if Von Neumann (if not
including the center).
"""
yield from self.get_neighborhood(pos, moore, include_center, radius)
def get_neighborhood(
self,
pos: Coordinate,
moore: bool,
include_center: bool = False,
radius: int = 1,
) -> Sequence[Coordinate]:
"""Return a list of cells that are in the neighborhood of a certain point.
Args:
pos: Coordinate tuple for the neighborhood to get.
moore: If True, return Moore neighborhood
(including diagonals)
If False, return Von Neumann neighborhood
(exclude diagonals)
include_center: If True, return the (x, y) cell as well.
Otherwise, return surrounding cells only.
radius: radius, in cells, of neighborhood to get.
Returns:
A list of coordinate tuples representing the neighborhood;
With radius 1, at most 9 if Moore, 5 if Von Neumann (8 and 4
if not including the center).
"""
cache_key = (pos, moore, include_center, radius)
neighborhood = self._neighborhood_cache.get(cache_key, None)
if neighborhood is not None:
return neighborhood
if self.out_of_bounds(pos):
raise Exception("The `pos` tuple passed is out of bounds.")
# we use a dict to keep insertion order
neighborhood = {}
x, y = pos
# First we check if the neighborhood is inside the grid
if (
x >= radius
and self.width - x > radius
and y >= radius
and self.height - y > radius
):
# If the radius is smaller than the distance from the borders, we
# can skip boundary checks.
x_range = range(x - radius, x + radius + 1)
y_range = range(y - radius, y + radius + 1)
for new_x in x_range:
for new_y in y_range:
if not moore and abs(new_x - x) + abs(new_y - y) > radius:
continue
neighborhood[(new_x, new_y)] = True
else:
# If the radius is larger than the distance from the borders, we
# must use a slower method, that takes into account the borders
# and the torus property.
for dx in range(-radius, radius + 1):
for dy in range(-radius, radius + 1):
if not moore and abs(dx) + abs(dy) > radius:
continue
new_x = x + dx
new_y = y + dy
if self.torus:
new_x %= self.width
new_y %= self.height
if not self.out_of_bounds((new_x, new_y)):
neighborhood[(new_x, new_y)] = True
if not include_center:
neighborhood.pop(pos, None)
self._neighborhood_cache[cache_key] = tuple(neighborhood.keys())
return tuple(neighborhood.keys())
def iter_neighbors(
self,
pos: Coordinate,
moore: bool,
include_center: bool = False,
radius: int = 1,
) -> Iterator[Agent]:
"""Return an iterator over neighbors to a certain point.
Args:
pos: Coordinates for the neighborhood to get.
moore: If True, return Moore neighborhood
(including diagonals)
If False, return Von Neumann neighborhood
(exclude diagonals)
include_center: If True, return the (x, y) cell as well.
Otherwise,
return surrounding cells only.
radius: radius, in cells, of neighborhood to get.
Returns:
An iterator of non-None objects in the given neighborhood;
at most 9 if Moore, 5 if Von-Neumann
(8 and 4 if not including the center).
"""
default_val = self.default_val()
for x, y in self.get_neighborhood(pos, moore, include_center, radius):
if (cell := self._grid[x][y]) != default_val:
yield cell
def get_neighbors(
self,
pos: Coordinate,
moore: bool,
include_center: bool = False,
radius: int = 1,
) -> list[Agent]:
"""Return a list of neighbors to a certain point.
Args:
pos: Coordinate tuple for the neighborhood to get.
moore: If True, return Moore neighborhood
(including diagonals)
If False, return Von Neumann neighborhood
(exclude diagonals)
include_center: If True, return the (x, y) cell as well.
Otherwise,
return surrounding cells only.
radius: radius, in cells, of neighborhood to get.
Returns:
A list of non-None objects in the given neighborhood;
at most 9 if Moore, 5 if Von-Neumann
(8 and 4 if not including the center).
"""
return list(self.iter_neighbors(pos, moore, include_center, radius))
def torus_adj(self, pos: Coordinate) -> Coordinate:
"""Convert coordinate, handling torus looping."""
if not self.out_of_bounds(pos):
return pos
elif not self.torus:
raise Exception("Point out of bounds, and space non-toroidal.")
else:
return pos[0] % self.width, pos[1] % self.height
def out_of_bounds(self, pos: Coordinate) -> bool:
"""Determines whether position is off the grid, returns the out of bounds coordinate."""
x, y = pos
return x < 0 or x >= self.width or y < 0 or y >= self.height
@accept_tuple_argument
def iter_cell_list_contents(
self, cell_list: Iterable[Coordinate]
) -> Iterator[Agent]:
"""Returns an iterator of the agents contained in the cells identified in `cell_list`; cells with empty content are excluded.
Args:
cell_list: Array-like of (x, y) tuples, or single tuple.
Returns:
An iterator of the agents contained in the cells identified in `cell_list`.
"""
# iter_cell_list_contents returns only non-empty contents.
default_val = self.default_val()
for x, y in cell_list:
if (cell := self._grid[x][y]) != default_val:
yield cell
@accept_tuple_argument
def get_cell_list_contents(self, cell_list: Iterable[Coordinate]) -> list[Agent]:
"""Returns an iterator of the agents contained in the cells identified in `cell_list`; cells with empty content are excluded.
Args:
cell_list: Array-like of (x, y) tuples, or single tuple.
Returns:
A list of the agents contained in the cells identified in `cell_list`.
"""
return list(self.iter_cell_list_contents(cell_list))
def place_agent(self, agent: Agent, pos: Coordinate) -> None: ...
def remove_agent(self, agent: Agent) -> None: ...
def move_agent(self, agent: Agent, pos: Coordinate) -> None:
"""Move an agent from its current position to a new position.
Args:
agent: Agent object to move. Assumed to have its current location
stored in a 'pos' tuple.
pos: Tuple of new position to move the agent to.
"""
pos = self.torus_adj(pos)
self.remove_agent(agent)
self.place_agent(agent, pos)
def move_agent_to_one_of(
self,
agent: Agent,
pos: list[Coordinate],
selection: str = "random",
handle_empty: str | None = None,
) -> None:
"""Move an agent to one of the given positions.
Args:
agent: Agent object to move. Assumed to have its current location stored in a 'pos' tuple.
pos: List of possible positions.
selection: String, either "random" (default) or "closest". If "closest" is selected and multiple
cells are the same distance, one is chosen randomly.
handle_empty: String, either "warning", "error" or None (default). If "warning" or "error" is selected
and no positions are given (an empty list), a warning or error is raised respectively.
"""
# Only move agent if there are positions given (non-empty list)
if pos:
if selection == "random":
chosen_pos = agent.random.choice(pos)
elif selection == "closest":
current_pos = agent.pos
# Find the closest position without sorting all positions
# TODO: See if this method can be optimized further
closest_pos = []
min_distance = float("inf")
agent.random.shuffle(pos)
for p in pos:
distance = self._distance_squared(p, current_pos)
if distance < min_distance:
min_distance = distance
closest_pos.clear()
closest_pos.append(p)
elif distance == min_distance:
closest_pos.append(p)
chosen_pos = agent.random.choice(closest_pos)
else:
raise ValueError(
f"Invalid selection method {selection}. Choose 'random' or 'closest'."
)
# Move agent to chosen position
self.move_agent(agent, chosen_pos)
# If no positions are given, throw warning/error if selected
elif handle_empty == "warning":
warn(
f"No positions given, could not move agent {agent.unique_id}.",
RuntimeWarning,
stacklevel=2,
)
elif handle_empty == "error":
raise ValueError(
f"No positions given, could not move agent {agent.unique_id}."
)
def _distance_squared(self, pos1: Coordinate, pos2: Coordinate) -> float:
"""Calculate the squared Euclidean distance between two points for performance."""
# Use squared Euclidean distance to avoid sqrt operation
dx, dy = abs(pos1[0] - pos2[0]), abs(pos1[1] - pos2[1])
if self.torus:
dx = min(dx, self.width - dx)
dy = min(dy, self.height - dy)
return dx**2 + dy**2
def swap_pos(self, agent_a: Agent, agent_b: Agent) -> None:
"""Swap agents positions."""
agents_no_pos = []
if (pos_a := agent_a.pos) is None:
agents_no_pos.append(agent_a)
if (pos_b := agent_b.pos) is None:
agents_no_pos.append(agent_b)
if agents_no_pos:
agents_no_pos = [f"<Agent id: {a.unique_id}>" for a in agents_no_pos]
raise Exception(f"{', '.join(agents_no_pos)} - not on the grid")
if pos_a == pos_b:
return
self.remove_agent(agent_a)
self.remove_agent(agent_b)
self.place_agent(agent_a, pos_b)
self.place_agent(agent_b, pos_a)
def is_cell_empty(self, pos: Coordinate) -> bool:
"""Returns a bool of the contents of a cell."""
x, y = pos
return self._grid[x][y] == self.default_val()
def move_to_empty(self, agent: Agent) -> None:
"""Moves agent to a random empty cell, vacating agent's old cell."""
num_empty_cells = len(self.empties)
if num_empty_cells == 0:
raise Exception("ERROR: No empty cells")
# This method is based on Agents.jl's random_empty() implementation. See
# https://github.com/JuliaDynamics/Agents.jl/pull/541. For the discussion, see
# https://github.com/projectmesa/mesa/issues/1052 and
# https://github.com/projectmesa/mesa/pull/1565. The cutoff value provided
# is the break-even comparison with the time taken in the else branching point.
if num_empty_cells > self.cutoff_empties:
while True:
new_pos = (
agent.random.randrange(self.width),
agent.random.randrange(self.height),
)
if self.is_cell_empty(new_pos):
break
else:
new_pos = agent.random.choice(sorted(self.empties))
self.remove_agent(agent)
self.place_agent(agent, new_pos)
def exists_empty_cells(self) -> bool:
"""Return True if any cells empty else False."""
return len(self.empties) > 0
[docs]
def is_single_argument_function(function):
"""Check if a function is a single argument function."""
return (
inspect.isfunction(function)
and len(inspect.signature(function).parameters) == 1
)
[docs]
class PropertyLayer:
"""A class representing a layer of properties in a two-dimensional grid.
Each cell in the grid can store a value of a specified data type.
Attributes:
name (str): The name of the property layer.
width (int): The width of the grid (number of columns).
height (int): The height of the grid (number of rows).
data (numpy.ndarray): A NumPy array representing the grid data.
"""
propertylayer_experimental_warning_given = False
def __init__(
self, name: str, width: int, height: int, default_value, dtype=np.float64
):
"""Initializes a new PropertyLayer instance.
Args:
name (str): The name of the property layer.
width (int): The width of the grid (number of columns). Must be a positive integer.
height (int): The height of the grid (number of rows). Must be a positive integer.
default_value: The default value to initialize each cell in the grid. Should ideally
be of the same type as specified by the dtype parameter.
dtype (data-type, optional): The desired data-type for the grid's elements. Default is np.float64.
Raises:
ValueError: If width or height is not a positive integer.
Notes:
A UserWarning is raised if the default_value is not of a type compatible with dtype.
The dtype parameter can accept both Python data types (like bool, int or float) and NumPy data types
(like np.int64 or np.float64). Using NumPy data types is recommended (except for bool) for better control
over the precision and efficiency of data storage and computations, especially in cases of large data
volumes or specialized numerical operations.
"""
self.name = name
self.width = width
self.height = height
# Check that width and height are positive integers
if (not isinstance(width, int) or width < 1) or (
not isinstance(height, int) or height < 1
):
raise ValueError(
f"Width and height must be positive integers, got {width} and {height}."
)
# Check if the dtype is suitable for the data
if not isinstance(default_value, dtype):
warn(
f"Default value {default_value} ({type(default_value).__name__}) might not be best suitable with dtype={dtype.__name__}.",
UserWarning,
stacklevel=2,
)
self.data = np.full((width, height), default_value, dtype=dtype)
if not self.__class__.propertylayer_experimental_warning_given:
warnings.warn(
"The new PropertyLayer and _PropertyGrid classes experimental. It may be changed or removed in any and all future releases, including patch releases.\n"
"We would love to hear what you think about this new feature. If you have any thoughts, share them with us here: https://github.com/projectmesa/mesa/discussions/1932",
FutureWarning,
stacklevel=2,
)
self.__class__.propertylayer_experimental_warning_given = True
[docs]
def set_cell(self, position: Coordinate, value):
"""Update a single cell's value in-place."""
self.data[position] = value
[docs]
def set_cells(self, value, condition=None):
"""Perform a batch update either on the entire grid or conditionally, in-place.
Args:
value: The value to be used for the update.
condition: (Optional) A callable (like a lambda function or a NumPy ufunc)
that returns a boolean array when applied to the data.
"""
if condition is None:
np.copyto(self.data, value) # In-place update
else:
if isinstance(condition, np.ufunc):
# Directly apply NumPy ufunc
condition_result = condition(self.data)
else:
# Vectorize non-ufunc conditions
vectorized_condition = np.vectorize(condition)
condition_result = vectorized_condition(self.data)
if (
not isinstance(condition_result, np.ndarray)
or condition_result.shape != self.data.shape
):
raise ValueError(
"Result of condition must be a NumPy array with the same shape as the grid."
)
np.copyto(self.data, value, where=condition_result)
[docs]
def modify_cell(self, position: Coordinate, operation, value=None):
"""Modify a single cell using an operation, which can be a lambda function or a NumPy ufunc.
If a NumPy ufunc is used, an additional value should be provided.
Args:
position: The grid coordinates of the cell to modify.
operation: A function to apply. Can be a lambda function or a NumPy ufunc.
value: The value to be used if the operation is a NumPy ufunc. Ignored for lambda functions.
"""
current_value = self.data[position]
# Determine if the operation is a lambda function or a NumPy ufunc
if is_single_argument_function(operation):
# Lambda function case
self.data[position] = operation(current_value)
elif value is not None:
# NumPy ufunc case
self.data[position] = operation(current_value, value)
else:
raise ValueError("Invalid operation or missing value for NumPy ufunc.")
[docs]
def modify_cells(self, operation, value=None, condition_function=None):
"""Modify cells using an operation, which can be a lambda function or a NumPy ufunc.
If a NumPy ufunc is used, an additional value should be provided.
Args:
operation: A function to apply. Can be a lambda function or a NumPy ufunc.
value: The value to be used if the operation is a NumPy ufunc. Ignored for lambda functions.
condition_function: (Optional) A callable that returns a boolean array when applied to the data.
"""
condition_array = np.ones_like(
self.data, dtype=bool
) # Default condition (all cells)
if condition_function is not None:
if isinstance(condition_function, np.ufunc):
condition_array = condition_function(self.data)
else:
vectorized_condition = np.vectorize(condition_function)
condition_array = vectorized_condition(self.data)
# Check if the operation is a lambda function or a NumPy ufunc
if isinstance(operation, np.ufunc):
if ufunc_requires_additional_input(operation):
if value is None:
raise ValueError("This ufunc requires an additional input value.")
modified_data = operation(self.data, value)
else:
modified_data = operation(self.data)
else:
# Vectorize non-ufunc operations
vectorized_operation = np.vectorize(operation)
modified_data = vectorized_operation(self.data)
self.data = np.where(condition_array, modified_data, self.data)
[docs]
def select_cells(self, condition, return_list=True):
"""Find cells that meet a specified condition using NumPy's boolean indexing, in-place.
Args:
condition: A callable that returns a boolean array when applied to the data.
return_list: (Optional) If True, return a list of (x, y) tuples. Otherwise, return a boolean array.
Returns:
A list of (x, y) tuples or a boolean array.
"""
condition_array = condition(self.data)
if return_list:
return list(zip(*np.where(condition_array)))
else:
return condition_array
[docs]
def aggregate_property(self, operation):
"""Perform an aggregate operation (e.g., sum, mean) on a property across all cells.
Args:
operation: A function to apply. Can be a lambda function or a NumPy ufunc.
"""
return operation(self.data)
class _PropertyGrid(_Grid):
"""A private subclass of _Grid that supports the addition of property layers.
This enables the representation and manipulation of additional data layers on the grid. This class is
intended for internal use within the Mesa framework and is currently utilized by SingleGrid
and MultiGrid classes to provide enhanced grid functionality.
The `_PropertyGrid` extends the capabilities of a basic grid by allowing each cell
to have multiple properties, each represented by a separate PropertyLayer.
These properties can be used to model complex environments where each cell
has multiple attributes or states.
Attributes:
properties (dict): A dictionary mapping property layer names to PropertyLayer instances.
empty_mask (np.ndarray): A boolean array indicating empty cells on the grid.
Several methods in this class accept a mask as an input, which is a NumPy ndarray of boolean values. This mask
specifies the cells to be considered (True) or ignored (False) in operations. Users can create custom masks,
including neighborhood masks, to apply specific conditions or constraints. Additionally, methods that deal with
cell selection or agent movement can return either a list of cell coordinates or a mask, based on the 'return_list'
parameter. This flexibility allows for more nuanced control and customization of grid operations, catering to a wide
range of modeling requirements and scenarios.
Notes:
This class is not intended for direct use in user models but is currently used by the SingleGrid and MultiGrid.
"""
def __init__(
self,
width: int,
height: int,
torus: bool,
property_layers: None | PropertyLayer | list[PropertyLayer] = None,
):
"""Initializes a new _PropertyGrid instance with specified dimensions and optional property layers.
Args:
width (int): The width of the grid (number of columns).
height (int): The height of the grid (number of rows).
torus (bool): A boolean indicating if the grid should behave like a torus.
property_layers (None | PropertyLayer | list[PropertyLayer], optional): A single PropertyLayer instance,
a list of PropertyLayer instances, or None to initialize without any property layers.
Raises:
ValueError: If a property layer's dimensions do not match the grid dimensions.
"""
super().__init__(width, height, torus)
self.properties = {}
# Initialize an empty mask as a boolean NumPy array
self._empty_mask = np.ones((self.width, self.height), dtype=bool)
# Handle both single PropertyLayer instance and list of PropertyLayer instances
if property_layers:
# If a single PropertyLayer is passed, convert it to a list
if isinstance(property_layers, PropertyLayer):
property_layers = [property_layers]
for layer in property_layers:
self.add_property_layer(layer)
@property
def empty_mask(self) -> np.ndarray:
"""Returns a boolean mask indicating empty cells on the grid."""
return self._empty_mask
# Add and remove properties to the grid
def add_property_layer(self, property_layer: PropertyLayer):
"""Adds a new property layer to the grid.
Args:
property_layer (PropertyLayer): The PropertyLayer instance to be added to the grid.
Raises:
ValueError: If a property layer with the same name already exists in the grid.
ValueError: If the dimensions of the property layer do not match the grid's dimensions.
"""
if property_layer.name in self.properties:
raise ValueError(f"Property layer {property_layer.name} already exists.")
if property_layer.width != self.width or property_layer.height != self.height:
raise ValueError(
f"Property layer dimensions {property_layer.width}x{property_layer.height} do not match grid dimensions {self.width}x{self.height}."
)
self.properties[property_layer.name] = property_layer
def remove_property_layer(self, property_name: str):
"""Removes a property layer from the grid by its name.
Args:
property_name (str): The name of the property layer to be removed.
Raises:
ValueError: If a property layer with the given name does not exist in the grid.
"""
if property_name not in self.properties:
raise ValueError(f"Property layer {property_name} does not exist.")
del self.properties[property_name]
def get_neighborhood_mask(
self, pos: Coordinate, moore: bool, include_center: bool, radius: int
) -> np.ndarray:
"""Generate a boolean mask representing the neighborhood.
Helper method for select_cells_multi_properties() and move_agent_to_random_cell()
Args:
pos (Coordinate): Center of the neighborhood.
moore (bool): True for Moore neighborhood, False for Von Neumann.
include_center (bool): Include the central cell in the neighborhood.
radius (int): The radius of the neighborhood.
Returns:
np.ndarray: A boolean mask representing the neighborhood.
"""
neighborhood = self.get_neighborhood(pos, moore, include_center, radius)
mask = np.zeros((self.width, self.height), dtype=bool)
# Convert the neighborhood list to a NumPy array and use advanced indexing
coords = np.array(neighborhood)
mask[coords[:, 0], coords[:, 1]] = True
return mask
def select_cells(
self,
conditions: dict | None = None,
extreme_values: dict | None = None,
masks: np.ndarray | list[np.ndarray] = None,
only_empty: bool = False,
return_list: bool = True,
) -> list[Coordinate] | np.ndarray:
"""Select cells based on property conditions, extreme values, and/or masks, with an option to only select empty cells.
Args:
conditions (dict): A dictionary where keys are property names and values are callables that return a boolean when applied.
extreme_values (dict): A dictionary where keys are property names and values are either 'highest' or 'lowest'.
masks (np.ndarray | list[np.ndarray], optional): A mask or list of masks to restrict the selection.
only_empty (bool, optional): If True, only select cells that are empty. Default is False.
return_list (bool, optional): If True, return a list of coordinates, otherwise return a mask.
Returns:
Union[list[Coordinate], np.ndarray]: Coordinates where conditions are satisfied or the combined mask.
"""
# Initialize the combined mask
combined_mask = np.ones((self.width, self.height), dtype=bool)
# Apply the masks
if masks is not None:
if isinstance(masks, list):
for mask in masks:
combined_mask = np.logical_and(combined_mask, mask)
else:
combined_mask = np.logical_and(combined_mask, masks)
# Apply the empty mask if only_empty is True
if only_empty:
combined_mask = np.logical_and(combined_mask, self.empty_mask)
# Apply conditions
if conditions:
for prop_name, condition in conditions.items():
prop_layer = self.properties[prop_name].data
prop_mask = condition(prop_layer)
combined_mask = np.logical_and(combined_mask, prop_mask)
# Apply extreme values
if extreme_values:
for property_name, mode in extreme_values.items():
prop_values = self.properties[property_name].data
# Create a masked array using the combined_mask
masked_values = np.ma.masked_array(prop_values, mask=~combined_mask)
if mode == "highest":
target_value = masked_values.max()
elif mode == "lowest":
target_value = masked_values.min()
else:
raise ValueError(
f"Invalid mode {mode}. Choose from 'highest' or 'lowest'."
)
extreme_value_mask = prop_values == target_value
combined_mask = np.logical_and(combined_mask, extreme_value_mask)
# Generate output
if return_list:
selected_cells = list(zip(*np.where(combined_mask)))
return selected_cells
else:
return combined_mask
[docs]
class SingleGrid(_PropertyGrid):
"""Rectangular grid where each cell contains exactly at most one agent.
Grid cells are indexed by [x, y], where [0, 0] is assumed to be the
bottom-left and [width-1, height-1] is the top-right. If a grid is
toroidal, the top and bottom, and left and right, edges wrap to each other.
This class provides a property `empties` that returns a set of coordinates
for all empty cells in the grid. It is automatically updated whenever
agents are added or removed from the grid. The `empties` property should be
used for efficient access to current empty cells rather than manually
iterating over the grid to check for emptiness.
Properties:
width, height: The grid's width and height.
torus: Boolean which determines whether to treat the grid as a torus.
empties: Returns a set of (x, y) tuples for all empty cells. This set is
maintained internally and provides a performant way to query
the grid for empty spaces.
"""
[docs]
@warn_if_agent_has_position_already
def place_agent(self, agent: Agent, pos: Coordinate) -> None:
"""Place the agent at the specified location, and set its pos variable."""
if self.is_cell_empty(pos):
x, y = pos
self._grid[x][y] = agent
if self._empties_built:
self._empties.discard(pos)
self._empty_mask[pos] = False
agent.pos = pos
else:
raise Exception("Cell not empty")
[docs]
def remove_agent(self, agent: Agent) -> None:
"""Remove the agent from the grid and set its pos attribute to None."""
if (pos := agent.pos) is None:
return
x, y = pos
self._grid[x][y] = self.default_val()
if self._empties_built:
self._empties.add(pos)
self._empty_mask[agent.pos] = True
agent.pos = None
[docs]
class MultiGrid(_PropertyGrid):
"""Rectangular grid where each cell can contain more than one agent.
Grid cells are indexed by [x, y], where [0, 0] is assumed to be at
bottom-left and [width-1, height-1] is the top-right. If a grid is
toroidal, the top and bottom, and left and right, edges wrap to each other.
This class maintains an `empties` property, which is a set of coordinates
for all cells that currently contain no agents. This property is updated
automatically as agents are added to or removed from the grid.
Properties:
width, height: The grid's width and height.
torus: Boolean which determines whether to treat the grid as a torus.
empties: Returns a set of (x, y) tuples for all empty cells.
"""
grid: list[list[MultiGridContent]]
[docs]
@staticmethod
def default_val() -> MultiGridContent:
"""Default value for new cell elements."""
return []
[docs]
@warn_if_agent_has_position_already
def place_agent(self, agent: Agent, pos: Coordinate) -> None:
"""Place the agent at the specified location, and set its pos variable."""
x, y = pos
if agent.pos is None or agent not in self._grid[x][y]:
self._grid[x][y].append(agent)
agent.pos = pos
if self._empties_built:
self._empties.discard(pos)
self._empty_mask[agent.pos] = True
[docs]
def remove_agent(self, agent: Agent) -> None:
"""Remove the agent from the given location and set its pos attribute to None."""
pos = agent.pos
x, y = pos
self._grid[x][y].remove(agent)
if self._empties_built and self.is_cell_empty(pos):
self._empties.add(pos)
self._empty_mask[agent.pos] = False
agent.pos = None
[docs]
def iter_neighbors( # noqa: D102
self,
pos: Coordinate,
moore: bool,
include_center: bool = False,
radius: int = 1,
) -> Iterator[Agent]:
return itertools.chain.from_iterable(
super().iter_neighbors(pos, moore, include_center, radius)
)
[docs]
@accept_tuple_argument
def iter_cell_list_contents(
self, cell_list: Iterable[Coordinate]
) -> Iterator[Agent]:
"""Returns an iterator of the agents contained in the cells identified in `cell_list`.
Cells with empty content are excluded.
Args:
cell_list: Array-like of (x, y) tuples, or single tuple.
Returns:
An iterator of the agents contained in the cells identified in `cell_list`.
"""
default_val = self.default_val()
return itertools.chain.from_iterable(
cell for x, y in cell_list if (cell := self._grid[x][y]) != default_val
)
class _HexGrid:
"""Hexagonal Grid which handles hexagonal neighbors.
Functions according to odd-q rules.
See http://www.redblobgames.com/grids/hexagons/#coordinates for more.
Properties:
width, height: The grid's width and height.
torus: Boolean which determines whether to treat the grid as a torus.
Methods:
get_neighbors: Returns the objects surrounding a given cell.
get_neighborhood: Returns the cells surrounding a given cell.
iter_neighbors: Iterates over position neighbors.
iter_neighborhood: Returns an iterator over cell coordinates that are
in the neighborhood of a certain point.
"""
def torus_adj_2d(self, pos: Coordinate) -> Coordinate:
return pos[0] % self.width, pos[1] % self.height
def get_neighborhood(
self, pos: Coordinate, include_center: bool = False, radius: int = 1
) -> list[Coordinate]:
"""Return a list of coordinates that are in the neighborhood of a certain point.
To calculate the neighborhood for a HexGrid the parity of the x coordinate of the point is
important, the neighborhood can be sketched as:
Always: (0,-), (0,+)
When x is even: (-,+), (-,0), (+,+), (+,0)
When x is odd: (-,0), (-,-), (+,0), (+,-)
Args:
pos: Coordinate tuple for the neighborhood to get.
include_center: If True, return the (x, y) cell as well.
Otherwise, return surrounding cells only.
radius: radius, in cells, of neighborhood to get.
Returns:
A list of coordinate tuples representing the neighborhood. For
example with radius 1, it will return list with number of elements
equals at most 9 (8) if Moore, 5 (4) if Von Neumann (if not
including the center).
"""
cache_key = (pos, include_center, radius)
neighborhood = self._neighborhood_cache.get(cache_key, None)
if neighborhood is not None:
return neighborhood
queue = collections.deque()
queue.append(pos)
coordinates = set()
while radius > 0:
level_size = len(queue)
radius -= 1
for _i in range(level_size):
x, y = queue.pop()
if x % 2 == 0:
adjacent = [
(x, y - 1),
(x, y + 1),
(x - 1, y + 1),
(x - 1, y),
(x + 1, y + 1),
(x + 1, y),
]
else:
adjacent = [
(x, y - 1),
(x, y + 1),
(x - 1, y),
(x - 1, y - 1),
(x + 1, y),
(x + 1, y - 1),
]
if self.torus:
adjacent = [
coord
for coord in map(self.torus_adj_2d, adjacent)
if coord not in coordinates
]
else:
adjacent = [
coord
for coord in adjacent
if not self.out_of_bounds(coord) and coord not in coordinates
]
coordinates.update(adjacent)
if radius > 0:
queue.extendleft(adjacent)
if include_center:
coordinates.add(pos)
else:
coordinates.discard(pos)
neighborhood = tuple(sorted(coordinates))
self._neighborhood_cache[cache_key] = neighborhood
return neighborhood
def iter_neighborhood(
self, pos: Coordinate, include_center: bool = False, radius: int = 1
) -> Iterator[Coordinate]:
"""Return an iterator over cell coordinates that are in the neighborhood of a certain point.
Args:
pos: Coordinate tuple for the neighborhood to get.
include_center: If True, return the (x, y) cell as well.
Otherwise, return surrounding cells only.
radius: radius, in cells, of neighborhood to get.
Returns:
An iterator of coordinate tuples representing the neighborhood.
"""
yield from self.get_neighborhood(pos, include_center, radius)
def iter_neighbors(
self, pos: Coordinate, include_center: bool = False, radius: int = 1
) -> Iterator[Agent]:
"""Return an iterator over neighbors to a certain point.
Args:
pos: Coordinates for the neighborhood to get.
include_center: If True, return the (x, y) cell as well.
Otherwise,
return surrounding cells only.
radius: radius, in cells, of neighborhood to get.
Returns:
An iterator of non-None objects in the given neighborhood
"""
neighborhood = self.get_neighborhood(pos, include_center, radius)
return self.iter_cell_list_contents(neighborhood)
def get_neighbors(
self, pos: Coordinate, include_center: bool = False, radius: int = 1
) -> list[Agent]:
"""Return a list of neighbors to a certain point.
Args:
pos: Coordinate tuple for the neighborhood to get.
include_center: If True, return the (x, y) cell as well.
Otherwise,
return surrounding cells only.
radius: radius, in cells, of neighborhood to get.
Returns:
A list of non-None objects in the given neighborhood
"""
return list(self.iter_neighbors(pos, include_center, radius))
[docs]
class HexSingleGrid(_HexGrid, SingleGrid):
"""Hexagonal SingleGrid: a SingleGrid where neighbors are computed according to a hexagonal tiling of the grid.
Functions according to odd-q rules.
See http://www.redblobgames.com/grids/hexagons/#coordinates for more.
This class also maintains an `empties` property, similar to SingleGrid,
which provides a set of coordinates for all empty hexagonal cells.
Properties:
width, height: The grid's width and height.
torus: Boolean which determines whether to treat the grid as a torus.
empties: Returns a set of hexagonal coordinates for all empty cells.
"""
[docs]
class HexMultiGrid(_HexGrid, MultiGrid):
"""Hexagonal MultiGrid: a MultiGrid where neighbors are computed according to a hexagonal tiling of the grid.
Functions according to odd-q rules.
See http://www.redblobgames.com/grids/hexagons/#coordinates for more.
Similar to the standard MultiGrid, this class maintains an `empties` property,
which is a set of coordinates for all hexagonal cells that currently contain
no agents. This property is updated automatically as agents are added to or
removed from the grid.
Properties:
width, height: The grid's width and height.
torus: Boolean which determines whether to treat the grid as a torus.
empties: Returns a set of hexagonal coordinates for all empty cells.
"""
[docs]
class HexGrid(HexSingleGrid):
"""Hexagonal Grid: a Grid where neighbors are computed according to a hexagonal tiling of the grid.
Functions according to odd-q rules.
See http://www.redblobgames.com/grids/hexagons/#coordinates for more.
Properties:
width, height: The grid's width and height.
torus: Boolean which determines whether to treat the grid as a torus.
"""
def __init__(self, width: int, height: int, torus: bool) -> None:
"""Initializes a HexGrid, deprecated.
Args:
width: the width of the grid
height: the height of the grid
torus: whether the grid wraps
"""
super().__init__(width, height, torus)
warn(
(
"HexGrid is being deprecated; use instead HexSingleGrid or HexMultiGrid "
"depending on your use case."
),
DeprecationWarning,
stacklevel=2,
)
[docs]
class ContinuousSpace:
"""Continuous space where each agent can have an arbitrary position.
Assumes that all agents have a pos property storing their position as
an (x, y) tuple.
This class uses a numpy array internally to store agents in order to speed
up neighborhood lookups. This array is calculated on the first neighborhood
lookup, and is updated if agents are added or removed.
The concept of 'empty cells' is not directly applicable in continuous space,
as positions are not discretized.
"""
def __init__(
self,
x_max: float,
y_max: float,
torus: bool,
x_min: float = 0,
y_min: float = 0,
) -> None:
"""Create a new continuous space.
Args:
x_max: the maximum x-coordinate
y_max: the maximum y-coordinate.
torus: Boolean for whether the edges loop around.
x_min: (default 0) If provided, set the minimum x -coordinate for the space. Below them, values loop to
the other edge (if torus=True) or raise an exception.
y_min: (default 0) If provided, set the minimum y -coordinate for the space. Below them, values loop to
the other edge (if torus=True) or raise an exception.
"""
self.x_min = x_min
self.x_max = x_max
self.width = x_max - x_min
self.y_min = y_min
self.y_max = y_max
self.height = y_max - y_min
self.center = np.array(((x_max + x_min) / 2, (y_max + y_min) / 2))
self.size = np.array((self.width, self.height))
self.torus = torus
self._agent_points: npt.NDArray[FloatCoordinate] | None = None
self._index_to_agent: dict[int, Agent] = {}
self._agent_to_index: dict[Agent, int | None] = {}
def _build_agent_cache(self):
"""Cache agents positions to speed up neighbors calculations."""
self._index_to_agent = {}
for idx, agent in enumerate(self._agent_to_index):
self._agent_to_index[agent] = idx
self._index_to_agent[idx] = agent
# Since dicts are ordered by insertion, we can iterate through agents keys
self._agent_points = np.array([agent.pos for agent in self._agent_to_index])
def _invalidate_agent_cache(self):
"""Clear cached data of agents and positions in the space."""
self._agent_points = None
self._index_to_agent = {}
[docs]
@warn_if_agent_has_position_already
def place_agent(self, agent: Agent, pos: FloatCoordinate) -> None:
"""Place a new agent in the space.
Args:
agent: Agent object to place.
pos: Coordinate tuple for where to place the agent.
"""
self._invalidate_agent_cache()
self._agent_to_index[agent] = None
pos = self.torus_adj(pos)
agent.pos = pos
[docs]
def move_agent(self, agent: Agent, pos: FloatCoordinate) -> None:
"""Move an agent from its current position to a new position.
Args:
agent: The agent object to move.
pos: Coordinate tuple to move the agent to.
"""
pos = self.torus_adj(pos)
agent.pos = pos
if self._agent_points is not None:
# instead of invalidating the full cache,
# apply the move to the cached values
idx = self._agent_to_index[agent]
self._agent_points[idx] = pos
[docs]
def remove_agent(self, agent: Agent) -> None:
"""Remove an agent from the space.
Args:
agent: The agent object to remove
"""
if agent not in self._agent_to_index:
raise Exception("Agent does not exist in the space")
del self._agent_to_index[agent]
self._invalidate_agent_cache()
agent.pos = None
[docs]
def get_neighbors(
self, pos: FloatCoordinate, radius: float, include_center: bool = True
) -> list[Agent]:
"""Get all agents within a certain radius.
Args:
pos: (x,y) coordinate tuple to center the search at.
radius: Get all the objects within this distance of the center.
include_center: If True, include an object at the *exact* provided
coordinates. i.e. if you are searching for the
neighbors of a given agent, True will include that
agent in the results.
"""
if self._agent_points is None:
self._build_agent_cache()
deltas = np.abs(self._agent_points - np.array(pos))
if self.torus:
deltas = np.minimum(deltas, self.size - deltas)
dists = deltas[:, 0] ** 2 + deltas[:, 1] ** 2
(idxs,) = np.where(dists <= radius**2)
neighbors = [
self._index_to_agent[x] for x in idxs if include_center or dists[x] > 0
]
return neighbors
[docs]
def get_heading(
self, pos_1: FloatCoordinate, pos_2: FloatCoordinate
) -> FloatCoordinate:
"""Get the heading vector between two points, accounting for toroidal space.
It is possible to calculate the heading angle by applying the atan2 function to the
result.
Args:
pos_1: Coordinate tuples for both points.
pos_2: Coordinate tuples for both points.
"""
one = np.array(pos_1)
two = np.array(pos_2)
heading = two - one
if self.torus:
inverse_heading = heading - np.sign(heading) * self.size
def get_min_abs(x, y):
return x if abs(x) < abs(y) else y
# Choose the smaller heading based on their absolute value for
# each dimension independently.
heading = tuple(
get_min_abs(heading[i], inverse_heading[i]) for i in range(2)
)
if isinstance(pos_1, np.ndarray):
heading = np.asarray(heading)
else:
heading = tuple(heading)
return heading
[docs]
def get_distance(self, pos_1: FloatCoordinate, pos_2: FloatCoordinate) -> float:
"""Get the distance between two point, accounting for toroidal space.
Args:
pos_1: Coordinate tuples for point1.
pos_2: Coordinate tuples for point2.
"""
x1, y1 = pos_1
x2, y2 = pos_2
dx = abs(x1 - x2)
dy = abs(y1 - y2)
if self.torus:
dx = min(dx, self.width - dx)
dy = min(dy, self.height - dy)
return math.sqrt(dx * dx + dy * dy)
[docs]
def torus_adj(self, pos: FloatCoordinate) -> FloatCoordinate:
"""Adjust coordinates to handle torus looping.
If the coordinate is out-of-bounds and the space is toroidal, return
the corresponding point within the space. If the space is not toroidal,
raise an exception.
Args:
pos: Coordinate tuple to convert.
"""
if not self.out_of_bounds(pos):
return pos
elif not self.torus:
raise Exception("Point out of bounds, and space non-toroidal.")
else:
x = self.x_min + ((pos[0] - self.x_min) % self.width)
y = self.y_min + ((pos[1] - self.y_min) % self.height)
if isinstance(pos, tuple):
return (x, y)
else:
return np.array((x, y))
[docs]
def out_of_bounds(self, pos: FloatCoordinate) -> bool:
"""Check if a point is out of bounds."""
x, y = pos
return x < self.x_min or x >= self.x_max or y < self.y_min or y >= self.y_max
[docs]
class NetworkGrid:
"""Network Grid where each node contains zero or more agents."""
def __init__(self, g: Any) -> None:
"""Create a new network.
Args:
g: a NetworkX graph instance.
"""
self.G = g
for node_id in self.G.nodes:
g.nodes[node_id]["agent"] = self.default_val()
[docs]
@staticmethod
def default_val() -> list:
"""Default value for a new node."""
return []
[docs]
@warn_if_agent_has_position_already
def place_agent(self, agent: Agent, node_id: int) -> None:
"""Place an agent in a node."""
self.G.nodes[node_id]["agent"].append(agent)
agent.pos = node_id
[docs]
def get_neighborhood(
self, node_id: int, include_center: bool = False, radius: int = 1
) -> list[int]:
"""Get all adjacent nodes within a certain radius.
Args:
node_id: node id for which to get neighborhood
include_center: boolean to include node itself or not
radius: size of neighborhood
Returns:
a list
"""
if radius == 1:
neighborhood = list(self.G.neighbors(node_id))
if include_center:
neighborhood.append(node_id)
else:
neighbors_with_distance = nx.single_source_shortest_path_length(
self.G, node_id, radius
)
if not include_center:
del neighbors_with_distance[node_id]
neighborhood = sorted(neighbors_with_distance.keys())
return neighborhood
[docs]
def get_neighbors(
self, node_id: int, include_center: bool = False, radius: int = 1
) -> list[Agent]:
"""Get all agents in adjacent nodes (within a certain radius).
Args:
node_id: node id for which to get neighbors
include_center: whether to include node itself or not
radius: size of neighborhood in which to find neighbors
Returns:
list of agents in neighborhood.
"""
neighborhood = self.get_neighborhood(node_id, include_center, radius)
return self.get_cell_list_contents(neighborhood)
[docs]
def move_agent(self, agent: Agent, node_id: int) -> None:
"""Move an agent from its current node to a new node.
Args:
agent: agent instance
node_id: id of node
"""
self.remove_agent(agent)
self.place_agent(agent, node_id)
[docs]
def remove_agent(self, agent: Agent) -> None:
"""Remove the agent from the network and set its pos attribute to None.
Args:
agent: agent instance
"""
node_id = agent.pos
self.G.nodes[node_id]["agent"].remove(agent)
agent.pos = None
[docs]
def is_cell_empty(self, node_id: int) -> bool:
"""Returns a bool of the contents of a cell.
Args:
node_id: id of node
"""
return self.G.nodes[node_id]["agent"] == self.default_val()
[docs]
def get_cell_list_contents(self, cell_list: list[int]) -> list[Agent]:
"""Returns a list of the agents contained in the nodes identified in `cell_list`.
Nodes with empty content are excluded.
Args:
cell_list: list of cell ids.
Returns:
list of the agents contained in the nodes identified in `cell_list`.
"""
return list(self.iter_cell_list_contents(cell_list))
[docs]
def get_all_cell_contents(self) -> list[Agent]:
"""Returns a list of all the agents in the network."""
return self.get_cell_list_contents(self.G)
[docs]
def iter_cell_list_contents(self, cell_list: list[int]) -> Iterator[Agent]:
"""Returns an iterator of the agents contained in the nodes identified in `cell_list`.
Nodes with empty content are excluded.
Args:
cell_list: list of cell ids.
Returns:
iterator of the agents contained in the nodes identified in `cell_list`.
"""
return itertools.chain.from_iterable(
self.G.nodes[node_id]["agent"]
for node_id in itertools.filterfalse(self.is_cell_empty, cell_list)
)