Source code for experimental.continuous_space.continuous_space

"""A Continuous Space class."""

import warnings
from collections.abc import Iterable
from itertools import compress
from random import Random

import numpy as np
from numpy.typing import ArrayLike
from scipy.spatial.distance import cdist

from mesa.agent import Agent, AgentSet


[docs] class ContinuousSpace: """Continuous space where each agent can have an arbitrary position.""" @property def x_min(self): # noqa: D102 # compatibility with solara_viz return self.dimensions[0, 0] @property def x_max(self): # noqa: D102 # compatibility with solara_viz return self.dimensions[0, 1] @property def y_min(self): # noqa: D102 # compatibility with solara_viz return self.dimensions[1, 0] @property def y_max(self): # noqa: D102 # compatibility with solara_viz return self.dimensions[1, 1] @property def width(self): # noqa: D102 # compatibility with solara_viz return self.size[0] @property def height(self): # noqa: D102 # compatibility with solara_viz return self.size[1] def __init__( self, dimensions: ArrayLike, torus: bool = False, random: Random | None = None, n_agents: int = 100, ) -> None: """Create a new continuous space. Args: dimensions: a numpy array like object where each row specifies the minimum and maximum value of that dimension. torus: boolean for whether the space wraps around or not random: a seeded stdlib random.Random instance n_agents: the expected number of agents in the space Internally, a numpy array is used to store the positions of all agents. This is resized if needed, but you can control the initial size explicitly by passing n_agents. """ if random is None: warnings.warn( "Random number generator not specified, this can make models non-reproducible. Please pass a random number generator explicitly", UserWarning, stacklevel=2, ) random = Random() self.random = random self.dimensions: np.array = np.asanyarray(dimensions) self.ndims: int = self.dimensions.shape[0] self.size: np.array = self.dimensions[:, 1] - self.dimensions[:, 0] self.center: np.array = np.sum(self.dimensions, axis=1) / 2 self.torus: bool = torus # self._agent_positions is the array containing all agent positions # plus potential extra empty rows # agent_positions is a view into _agent_positions containing only the filled rows self._agent_positions: np.array = np.empty( (n_agents, self.dimensions.shape[0]), dtype=float ) self.agent_positions: ( np.array ) # a view on _agent_positions containing all active positions # the list of agents in the space self.active_agents = [] self._n_agents = 0 # the number of active agents in the space # a mapping from agents to index and vice versa self._index_to_agent: dict[int, Agent] = {} self._agent_to_index: dict[Agent, int | None] = {} @property def agents(self) -> AgentSet: """Return an AgentSet with the agents in the space.""" return AgentSet(self.active_agents, random=self.random) def _add_agent(self, agent: Agent) -> int: """Helper method for adding an agent to the space. This method manages the numpy array with the agent positions and ensuring it is enlarged if and when needed. It is called automatically by ContinousSpaceAgent when created. """ index = self._n_agents self._n_agents += 1 if self._agent_positions.shape[0] <= index: # we are out of space fraction = 0.2 # we add 20% Fixme n = int(round(fraction * self._n_agents)) self._agent_positions = np.vstack( [ self._agent_positions, np.empty( (n, self.dimensions.shape[0]), ), ] ) self._agent_to_index[agent] = index self._index_to_agent[index] = agent # we want to maintain a view rather than a copy on the active agents and positions # this is essential for the performance of the rest of this code self.active_agents.append(agent) self.agent_positions = self._agent_positions[0 : self._n_agents] return index def _remove_agent(self, agent: Agent) -> None: """Remove an agent from the space. This method is automatically called by ContinuousSpaceAgent.remove. """ index = self._agent_to_index[agent] self._agent_to_index.pop(agent, None) self._index_to_agent.pop(index, None) del self.active_agents[index] # we update all indices for agent in self.active_agents[index::]: old_index = self._agent_to_index[agent] self._agent_to_index[agent] = old_index - 1 self._index_to_agent[old_index - 1] = agent # we move all data below the removed agent one row up self._agent_positions[index : self._n_agents - 1] = self._agent_positions[ index + 1 : self._n_agents ] self._n_agents -= 1 self.agent_positions = self._agent_positions[0 : self._n_agents]
[docs] def calculate_difference_vector(self, point: np.ndarray, agents=None) -> np.ndarray: """Calculate the difference vector between the point and all agenents. Args: point: the point to calculate the difference vector for agents: the agents to calculate the difference vector of point with. By default, all agents are considered. """ point = np.asanyarray(point) positions = ( self.agent_positions if agents is None else self._agent_positions[[self._agent_to_index[a] for a in agents]] ) delta = positions - point if self.torus: inverse_delta = delta - np.sign(delta) * self.size # we need to use the lowest absolute value from delta and inverse delta logical = np.abs(delta) < np.abs(inverse_delta) out = np.zeros(delta.shape) out[logical] = delta[logical] out[~logical] = inverse_delta[~logical] delta = out return delta
[docs] def calculate_distances( self, point: ArrayLike, agents: Iterable[Agent] | None = None, **kwargs ) -> tuple[np.ndarray, list]: """Calculate the distance between the point and all agents. Args: point: the point to calculate the difference vector for agents: the agents to calculate the difference vector of point with. By default, all agents are considered. kwargs: any additional keyword arguments are passed to scipy's cdist, which is used only if torus is False. This allows for non-Euclidian distance measures. """ point = np.asanyarray(point) if agents is None: positions = self.agent_positions agents = self.active_agents else: positions = self._agent_positions[[self._agent_to_index[a] for a in agents]] agents = np.asarray(agents) if self.torus: delta = np.abs(point - positions) delta = np.minimum(delta, self.size - delta, out=delta) # + is much faster than np.sum or array.sum dists = delta[:, 0] ** 2 for i in range(1, self.ndims): dists += delta[:, i] ** 2 dists = np.sqrt(dists) else: dists = cdist(point[np.newaxis, :], positions, **kwargs)[0, :] return dists, agents
[docs] def get_agents_in_radius( self, point: ArrayLike, radius: float | int = 1 ) -> tuple[list, np.ndarray]: """Return the agents and their distances within a radius for the point.""" distances, agents = self.calculate_distances(point) logical = distances <= radius agents = list(compress(agents, logical)) return ( agents, distances[logical], )
[docs] def get_k_nearest_agents( self, point: ArrayLike, k: int = 1 ) -> tuple[list, np.ndarray]: """Return the k nearest agents and their distances to the point. Notes: This method returns exactly k agents, ignoring ties. In case of ties, the earlier an agent is inserted the higher it will rank. """ dists, agents = self.calculate_distances(point) indices = np.argpartition(dists, k)[:k] agents = [agents[i] for i in indices] return agents, dists[indices]
[docs] def in_bounds(self, point: ArrayLike) -> bool: """Check if point is inside the bounds of the space.""" return bool( ( (np.asanyarray(point) >= self.dimensions[:, 0]) & (point <= self.dimensions[:, 1]) ).all() )
[docs] def torus_correct(self, point: ArrayLike) -> np.ndarray: """Apply a torus correction to the point.""" return self.dimensions[:, 0] + np.mod( np.asanyarray(point) - self.dimensions[:, 0], self.size )