"""
The agent class for Mesa framework.
Core Objects: Agent
"""
# Mypy; for the `|` operator purpose
# Remove this __future__ import once the oldest supported Python is 3.10
from __future__ import annotations
import contextlib
import copy
import operator
import warnings
import weakref
from collections import defaultdict
from collections.abc import Iterable, Iterator, MutableSet, Sequence
from random import Random
# mypy
from typing import TYPE_CHECKING, Any, Callable
if TYPE_CHECKING:
# We ensure that these are not imported during runtime to prevent cyclic
# dependency.
from mesa.model import Model
from mesa.space import Position
[docs]
class Agent:
"""
Base class for a model agent in Mesa.
Attributes:
unique_id (int): A unique identifier for this agent.
model (Model): A reference to the model instance.
self.pos: Position | None = None
"""
def __init__(self, unique_id: int, model: Model) -> None:
"""
Create a new agent.
Args:
unique_id (int): A unique identifier for this agent.
model (Model): The model instance in which the agent exists.
"""
self.unique_id = unique_id
self.model = model
self.pos: Position | None = None
# register agent
try:
self.model.agents_[type(self)][self] = None
except AttributeError:
# model super has not been called
self.model.agents_ = defaultdict(dict)
self.model.agents_[type(self)][self] = None
self.model.agentset_experimental_warning_given = False
warnings.warn(
"The Mesa Model class was not initialized. In the future, you need to explicitly initialize the Model by calling super().__init__() on initialization.",
FutureWarning,
stacklevel=2,
)
[docs]
def remove(self) -> None:
"""Remove and delete the agent from the model."""
with contextlib.suppress(KeyError):
self.model.agents_[type(self)].pop(self)
[docs]
def step(self) -> None:
"""A single step of the agent."""
[docs]
def advance(self) -> None:
pass
@property
def random(self) -> Random:
return self.model.random
[docs]
class AgentSet(MutableSet, Sequence):
"""
A collection class that represents an ordered set of agents within an agent-based model (ABM). This class
extends both MutableSet and Sequence, providing set-like functionality with order preservation and
sequence operations.
Attributes:
model (Model): The ABM model instance to which this AgentSet belongs.
Methods:
__len__, __iter__, __contains__, select, shuffle, sort, _update, do, get, __getitem__,
add, discard, remove, __getstate__, __setstate__, random
Note:
The AgentSet maintains weak references to agents, allowing for efficient management of agent lifecycles
without preventing garbage collection. It is associated with a specific model instance, enabling
interactions with the model's environment and other agents.The implementation uses a WeakKeyDictionary to store agents,
which means that agents not referenced elsewhere in the program may be automatically removed from the AgentSet.
"""
agentset_experimental_warning_given = False
def __init__(self, agents: Iterable[Agent], model: Model):
"""
Initializes the AgentSet with a collection of agents and a reference to the model.
Args:
agents (Iterable[Agent]): An iterable of Agent objects to be included in the set.
model (Model): The ABM model instance to which this AgentSet belongs.
"""
self.model = model
self._agents = weakref.WeakKeyDictionary({agent: None for agent in agents})
def __len__(self) -> int:
"""Return the number of agents in the AgentSet."""
return len(self._agents)
def __iter__(self) -> Iterator[Agent]:
"""Provide an iterator over the agents in the AgentSet."""
return self._agents.keys()
def __contains__(self, agent: Agent) -> bool:
"""Check if an agent is in the AgentSet. Can be used like `agent in agentset`."""
return agent in self._agents
[docs]
def select(
self,
filter_func: Callable[[Agent], bool] | None = None,
n: int = 0,
inplace: bool = False,
agent_type: type[Agent] | None = None,
) -> AgentSet:
"""
Select a subset of agents from the AgentSet based on a filter function and/or quantity limit.
Args:
filter_func (Callable[[Agent], bool], optional): A function that takes an Agent and returns True if the
agent should be included in the result. Defaults to None, meaning no filtering is applied.
n (int, optional): The number of agents to select. If 0, all matching agents are selected. Defaults to 0.
inplace (bool, optional): If True, modifies the current AgentSet; otherwise, returns a new AgentSet. Defaults to False.
agent_type (type[Agent], optional): The class type of the agents to select. Defaults to None, meaning no type filtering is applied.
Returns:
AgentSet: A new AgentSet containing the selected agents, unless inplace is True, in which case the current AgentSet is updated.
"""
if filter_func is None and agent_type is None and n == 0:
return self if inplace else copy.copy(self)
def agent_generator(filter_func=None, agent_type=None, n=0):
count = 0
for agent in self:
if (not filter_func or filter_func(agent)) and (
not agent_type or isinstance(agent, agent_type)
):
yield agent
count += 1
if 0 < n <= count:
break
agents = agent_generator(filter_func, agent_type, n)
return AgentSet(agents, self.model) if not inplace else self._update(agents)
[docs]
def shuffle(self, inplace: bool = False) -> AgentSet:
"""
Randomly shuffle the order of agents in the AgentSet.
Args:
inplace (bool, optional): If True, shuffles the agents in the current AgentSet; otherwise, returns a new shuffled AgentSet. Defaults to False.
Returns:
AgentSet: A shuffled AgentSet. Returns the current AgentSet if inplace is True.
Note:
Using inplace = True is more performant
"""
weakrefs = list(self._agents.keyrefs())
self.random.shuffle(weakrefs)
if inplace:
self._agents.data = {entry: None for entry in weakrefs}
return self
else:
return AgentSet(
(agent for ref in weakrefs if (agent := ref()) is not None), self.model
)
[docs]
def sort(
self,
key: Callable[[Agent], Any] | str,
ascending: bool = False,
inplace: bool = False,
) -> AgentSet:
"""
Sort the agents in the AgentSet based on a specified attribute or custom function.
Args:
key (Callable[[Agent], Any] | str): A function or attribute name based on which the agents are sorted.
ascending (bool, optional): If True, the agents are sorted in ascending order. Defaults to False.
inplace (bool, optional): If True, sorts the agents in the current AgentSet; otherwise, returns a new sorted AgentSet. Defaults to False.
Returns:
AgentSet: A sorted AgentSet. Returns the current AgentSet if inplace is True.
"""
if isinstance(key, str):
key = operator.attrgetter(key)
sorted_agents = sorted(self._agents.keys(), key=key, reverse=not ascending)
return (
AgentSet(sorted_agents, self.model)
if not inplace
else self._update(sorted_agents)
)
def _update(self, agents: Iterable[Agent]):
"""Update the AgentSet with a new set of agents.
This is a private method primarily used internally by other methods like select, shuffle, and sort.
"""
self._agents = weakref.WeakKeyDictionary({agent: None for agent in agents})
return self
[docs]
def do(
self, method_name: str, *args, return_results: bool = False, **kwargs
) -> AgentSet | list[Any]:
"""
Invoke a method on each agent in the AgentSet.
Args:
method_name (str): The name of the method to call on each agent.
return_results (bool, optional): If True, returns the results of the method calls; otherwise, returns the AgentSet itself. Defaults to False, so you can chain method calls.
*args: Variable length argument list passed to the method being called.
**kwargs: Arbitrary keyword arguments passed to the method being called.
Returns:
AgentSet | list[Any]: The results of the method calls if return_results is True, otherwise the AgentSet itself.
"""
# we iterate over the actual weakref keys and check if weakref is alive before calling the method
res = [
getattr(agent, method_name)(*args, **kwargs)
for agentref in self._agents.keyrefs()
if (agent := agentref()) is not None
]
return res if return_results else self
[docs]
def get(self, attr_names: str | list[str]) -> list[Any]:
"""
Retrieve the specified attribute(s) from each agent in the AgentSet.
Args:
attr_names (str | list[str]): The name(s) of the attribute(s) to retrieve from each agent.
Returns:
list[Any]: A list with the attribute value for each agent in the set if attr_names is a str
list[list[Any]]: A list with a list of attribute values for each agent in the set if attr_names is a list of str
Raises:
AttributeError if an agent does not have the specified attribute(s)
"""
if isinstance(attr_names, str):
return [getattr(agent, attr_names) for agent in self._agents]
else:
return [
[getattr(agent, attr_name) for attr_name in attr_names]
for agent in self._agents
]
def __getitem__(self, item: int | slice) -> Agent:
"""
Retrieve an agent or a slice of agents from the AgentSet.
Args:
item (int | slice): The index or slice for selecting agents.
Returns:
Agent | list[Agent]: The selected agent or list of agents based on the index or slice provided.
"""
return list(self._agents.keys())[item]
[docs]
def add(self, agent: Agent):
"""
Add an agent to the AgentSet.
Args:
agent (Agent): The agent to add to the set.
Note:
This method is an implementation of the abstract method from MutableSet.
"""
self._agents[agent] = None
[docs]
def discard(self, agent: Agent):
"""
Remove an agent from the AgentSet if it exists.
This method does not raise an error if the agent is not present.
Args:
agent (Agent): The agent to remove from the set.
Note:
This method is an implementation of the abstract method from MutableSet.
"""
with contextlib.suppress(KeyError):
del self._agents[agent]
[docs]
def remove(self, agent: Agent):
"""
Remove an agent from the AgentSet.
This method raises an error if the agent is not present.
Args:
agent (Agent): The agent to remove from the set.
Note:
This method is an implementation of the abstract method from MutableSet.
"""
del self._agents[agent]
def __getstate__(self):
"""
Retrieve the state of the AgentSet for serialization.
Returns:
dict: A dictionary representing the state of the AgentSet.
"""
return {"agents": list(self._agents.keys()), "model": self.model}
def __setstate__(self, state):
"""
Set the state of the AgentSet during deserialization.
Args:
state (dict): A dictionary representing the state to restore.
"""
self.model = state["model"]
self._update(state["agents"])
@property
def random(self) -> Random:
"""
Provide access to the model's random number generator.
Returns:
Random: The random number generator associated with the model.
"""
return self.model.random
# consider adding for performance reasons
# for Sequence: __reversed__, index, and count
# for MutableSet clear, pop, remove, __ior__, __iand__, __ixor__, and __isub__