src/iohmm_evac/dgp/feedback.py

# SPDX-License-Identifier: AGPL-3.0-only
# Copyright (C) 2026 SWGY, Inc.
"""Endogenous feedback signals (network congestion, peer-departure share).

Both signals at time ``t`` are computed from the lagged state vector at ``t-1``.
Lagging keeps the model strictly IO-HMM-compatible: a household's transition
at step ``t`` depends only on inputs known *before* ``t``.
"""

from __future__ import annotations

import numpy as np

from iohmm_evac.types import IntArray, State

__all__ = ["congestion", "peer_share"]


def congestion(prev_state: IntArray, n_cap: int) -> float:
    """Fraction of households en-route at ``t-1``, capped at 1.

    Expects integer state codes (matching :class:`State` values).
    """
    if n_cap <= 0:
        msg = "n_cap must be a positive integer"
        raise ValueError(msg)
    en_route = int(np.count_nonzero(prev_state == State.ER))
    return float(min(en_route / n_cap, 1.0))


def peer_share(prev_state: IntArray, evac_path: IntArray) -> float:
    """Share of households visibly evacuating at ``t-1``.

    Visible evacuators are those currently en-route plus those already
    sheltered *away* from home. ``evac_path`` is encoded as 0=NONE, 1=AWAY,
    2=HOME (matching :func:`encode_evac_path`).
    """
    n = prev_state.shape[0]
    if n == 0:
        return 0.0
    en_route = prev_state == State.ER
    shelter_away = (prev_state == State.SH) & (evac_path == 1)
    visible = int(np.count_nonzero(en_route | shelter_away))
    return float(visible / n)