Source code for mesa.time.events

"""Core event management functionality for Mesa's discrete event simulation system.

This module provides the foundational data structures and classes needed for event-based
simulation in Mesa. The EventList class is a priority queue implementation that maintains
simulation events in chronological order while respecting event priorities. Key features:

- Priority-based event ordering
- Weak references to prevent memory leaks from canceled events
- Efficient event insertion and removal using a heap queue
- Support for event cancellation without breaking the heap structure

The module contains three main components:
- Priority: An enumeration defining event priority levels (HIGH, DEFAULT, LOW)
- Event: A class representing individual events with timing and execution details
- EventList: A heap-based priority queue managing the chronological ordering of events

The implementation supports both pure discrete event simulation and hybrid approaches
combining agent-based modeling with event scheduling.
"""

from __future__ import annotations

import itertools
import types
from collections.abc import Callable
from dataclasses import dataclass
from enum import IntEnum
from heapq import heapify, heappop, heappush, nsmallest
from types import MethodType
from typing import TYPE_CHECKING, Any
from weakref import ReferenceType, WeakMethod, ref

if TYPE_CHECKING:
    from mesa import Model


def _create_callable_reference(
    function: Callable[..., None],
) -> ReferenceType[Any] | WeakMethod:
    """Validate and create a weak-reference wrapper for an event callback."""
    if not callable(function):
        raise TypeError("function must be a callable")

    if isinstance(function, types.FunctionType) and function.__name__ == "<lambda>":
        raise ValueError("function must be alive at Event creation.")

    if isinstance(function, MethodType):
        function_ref = WeakMethod(function)
    else:
        try:
            function_ref = ref(function)
        except TypeError as exc:
            raise TypeError("function must be weak referenceable") from exc

    return function_ref


[docs] class Priority(IntEnum): """Enumeration of priority levels.""" LOW = 10 DEFAULT = 5 HIGH = 1
[docs] class Event: """A simulation event. The callable is wrapped using weakref, so there is no need to explicitly cancel event if e.g., an agent is removed from the simulation. Attributes: time (float): The simulation time of the event fn (Callable): The function to execute for this event priority (Priority): The priority of the event unique_id (int) the unique identifier of the event function_args list[Any]: Argument for the function function_kwargs dict[str, Any]: Keyword arguments for the function Notes: Simulation events use a weak reference to the callable. If the callback no longer exists at execution time (e.g., because an agent has been removed), execution will fail silently. Lambda callbacks are rejected at Event creation. """ _ids = itertools.count() @property def CANCELED(self) -> bool: # noqa: D102 return self._canceled def __init__( self, time: int | float, function: Callable[..., None], priority: Priority = Priority.DEFAULT, function_args: list[Any] | None = None, function_kwargs: dict[str, Any] | None = None, ) -> None: """Initialize a simulation event. Args: time: the instant of time of the simulation event function: the callable to invoke priority: the priority of the event function_args: arguments for callable function_kwargs: keyword arguments for the callable """ super().__init__() self.time = time self.priority = priority.value self._canceled = False weak_ref_fn = _create_callable_reference(function) self.fn = weak_ref_fn self.unique_id = next(self._ids) self.function_args = function_args if function_args else [] self.function_kwargs = function_kwargs if function_kwargs else {}
[docs] def execute(self) -> None: """Execute this event.""" if not self._canceled: fn = self.fn() if fn is not None: fn(*self.function_args, **self.function_kwargs)
[docs] def cancel(self) -> None: """Cancel this event.""" self._canceled = True self.fn = None self.function_args = [] self.function_kwargs = {}
def __lt__(self, other: Event) -> bool: """Define a total ordering for events to be used by the heapq.""" if self.time != other.time: return self.time < other.time if self.priority != other.priority: return self.priority < other.priority return self.unique_id < other.unique_id def __getstate__(self) -> dict[str, Any]: """Prepare state for pickling.""" state = self.__dict__.copy() # Convert weak reference back to strong reference for pickling fn = self.fn() if self.fn is not None else None state["_fn_strong"] = fn state["fn"] = None # Don't pickle the weak reference return state def __setstate__(self, state: dict[str, Any]) -> None: """Restore state after unpickling.""" fn = state.pop("_fn_strong") self.__dict__.update(state) # Recreate callable reference strategy. if fn is not None: self.fn = _create_callable_reference(fn) else: self.fn = None
[docs] @dataclass(frozen=True, slots=True) class Schedule: """Defines when something should happen repeatedly. Attributes: interval: Time between executions (fixed value or callable returning value) start: Absolute time to begin (None = use current model time + interval) end: Absolute time to stop (None = no end) count: Maximum executions (None = unlimited) """ interval: float | int | Callable[[Model], float | int] = 1.0 start: float | None = None end: float | None = None count: int | None = None def __post_init__(self): """Validate schedule parameters.""" if not callable(self.interval) and self.interval <= 0: raise ValueError(f"Schedule interval must be > 0, got {self.interval}") if self.count is not None and self.count <= 0: raise ValueError( f"Schedule count must be > 0 if provided, got {self.count}" ) if self.start is not None and self.end is not None and self.start > self.end: raise ValueError( f"Schedule start ({self.start}) cannot be after end ({self.end})" )
[docs] class EventGenerator: """A generator that creates recurring events based on a Schedule. Unlike a single Event, an EventGenerator is persistent and can be stopped or configured with stop conditions. Attributes: model: The model this generator belongs to function: The callable to execute for each generated event schedule: The Schedule defining when events occur priority: Priority level for generated events Notes: Event generators use a weak reference to the callable. Therefore, you cannot pass a lambda function in fn. A simulation event where the callable no longer exists (e.g., because the agent has been removed from the model) will fail silently. If you want to use functools.partial, please assign the partial function to a variable prior to creating the generator. """ def __init__( self, model: Model, function: Callable[..., None], schedule: Schedule, priority: Priority = Priority.DEFAULT, ) -> None: """Initialize an EventGenerator. Args: model: The model this generator belongs to function: The callable to execute for each generated event. Use functools.partial to bind arguments. schedule: The Schedule defining timing priority: Priority level for generated events """ self.model = model self.function = _create_callable_reference(function) self.schedule = schedule self.priority = priority self._active: bool = False self._paused: bool = False self._current_event: Event | None = None self._execution_count: int = 0 @property def is_active(self) -> bool: """Return whether the generator is currently active.""" return self._active @property def execution_count(self) -> int: """Return the number of times this generator has executed.""" return self._execution_count @property def next_scheduled_time(self) -> float | None: """Return the time of the next scheduled execution, or None if not scheduled.""" if self._current_event is None: return None return self._current_event.time def _get_interval(self) -> float | int: """Get the next interval value.""" if callable(self.schedule.interval): interval = self.schedule.interval(self.model) if interval < 0: raise ValueError(f"Interval must be > 0, got {interval}") return interval return self.schedule.interval def _should_stop(self, next_time: float) -> bool: """Check if the generator should stop before scheduling the next event.""" return ( self.schedule.count is not None and self._execution_count >= self.schedule.count ) or (self.schedule.end is not None and next_time > self.schedule.end) def _execute_and_reschedule(self) -> None: """Execute the function and schedule the next event.""" if not self._active or self._paused: return # Check weakref HERE (execution time), not in property getter # This matches Event class behavior - weakref check during execution fn = self.function() if fn is None: # Stop the generator if weakref is dead self.stop() return # Silent no-op (no error raised) # Execute the function fn() self._execution_count += 1 # Schedule next event if we shouldn't stop next_time = self.model.time + self._get_interval() if not self._should_stop(next_time): self._schedule_next(next_time) else: self._active = False self._current_event = None self.model._event_generators.discard(self) def _schedule_next(self, time: float) -> None: """Schedule the next event at the given time.""" self._current_event = Event( time, self._execute_and_reschedule, priority=self.priority, ) self.model._event_list.add_event(self._current_event)
[docs] def start(self) -> EventGenerator: """Start the event generator. Returns: Self for method chaining """ if self._active: return self if self.schedule.start is not None: start_time = self.schedule.start else: # Default: start at next interval from now start_time = self.model.time + self._get_interval() self._active = True self.model._event_generators.add(self) self._schedule_next(start_time) return self
[docs] def stop(self) -> None: """Stop the event generator immediately.""" self._active = False self._paused = False if self._current_event is not None: self._current_event.cancel() self._current_event = None self.model._event_generators.discard(self)
[docs] def pause(self) -> None: """Pause the event generator temporarily. This cancels the currently scheduled event but keeps the generator active in the model. Execution can be resumed later using resume(). """ if not self._active or self._paused: return self._paused = True if self._current_event is not None: self._current_event.cancel() self._current_event = None
[docs] def resume(self) -> None: """Resume a paused event generator.""" if not self._active or not self._paused: return self._paused = False next_time = self.model.time + self._get_interval() if not self._should_stop(next_time): self._schedule_next(next_time) else: self._active = False self.model._event_generators.discard(self)
def __getstate__(self) -> dict[str, Any]: """Prepare state for pickling.""" state = self.__dict__.copy() fn = self.function() if self.function is not None else None state["_fn_strong"] = fn state["function"] = None return state def __setstate__(self, state: dict[str, Any]) -> None: """Restore state after unpickling.""" # Keep strong reference alive during entire method fn = state.pop("_fn_strong") # Update state first (keeps references alive) self.__dict__.update(state) # Now recreate weak reference if fn is not None: if isinstance(fn, MethodType): self.function = WeakMethod(fn) else: self.function = ref(fn) else: self.function = None
[docs] class EventList: """An event list. This is a heap queue sorted list of events. Events are always removed from the left, so heapq is a performant and appropriate data structure. Events are sorted based on their time stamp, their priority, and their unique_id as a tie-breaker, guaranteeing a complete ordering. """ def __init__(self): """Initialize an event list.""" self._events: list[Event] = [] heapify(self._events)
[docs] def add_event(self, event: Event): """Add the event to the event list. Args: event (Event): The event to be added """ heappush(self._events, event)
[docs] def peek_ahead(self, n: int = 1) -> list[Event]: """Look at the first n non-canceled event in the event list. Args: n (int): The number of events to look ahead Returns: list[Event] Raises: IndexError: If the eventlist is empty Notes: this method can return a list shorted then n if the number of non-canceled events on the event list is less than n. """ # look n events ahead if self.is_empty(): raise IndexError("event list is empty") # Filter out canceled events and get n smallest in correct chronological order return nsmallest(n, (e for e in self._events if not e.CANCELED))
[docs] def pop_event(self) -> Event: """Pop the first element from the event list.""" while self._events: event = heappop(self._events) if not event.CANCELED: return event raise IndexError("Event list is empty")
[docs] def compact(self) -> None: """Remove all canceled events from the heap and rebuild it. If there are many canceled events, compaction can speed up performance substantially. """ self._events = [e for e in self._events if not e.CANCELED] heapify(self._events)
[docs] def is_empty(self) -> bool: """Return whether the event list is empty.""" return len(self) == 0
def __contains__(self, event: Event) -> bool: # noqa if event.CANCELED: return False return event in self._events def __len__(self) -> int: # noqa return sum(1 for e in self._events if not e.CANCELED) def __repr__(self) -> str: """Return a string representation of the event list.""" events_str = ", ".join( [ f"Event(time={e.time}, priority={e.priority}, id={e.unique_id})" for e in self._events if not e.CANCELED ] ) return f"EventList([{events_str}])"
[docs] def remove(self, event: Event) -> None: """Remove an event from the event list. Args: event (Event): The event to be removed """ # We use lazy deletion: mark the event as canceled without # removing it from the heap to preserve heap invariants. # Canceled events are skipped during pop and may trigger # adaptive compaction if they dominate the heap. if not event.CANCELED: event.cancel()
[docs] def clear(self) -> None: """Clear the event list.""" self._events.clear()