Source code for aiopnsense.traffic

"""Diagnostics traffic methods for OPNsenseClient."""

from __future__ import annotations

from collections.abc import AsyncIterator, Mapping
from typing import Any

from .client_transport import _STREAM_JSON_EVENT_RESET_KEY
from .const import DEFAULT_REQUEST_TIMEOUT_SECONDS
from ._typing import AiopnsenseClientProtocol
from .helpers import _LOGGER, try_to_float, try_to_int

DIAGNOSTICS_TRAFFIC_ENDPOINT = "/api/diagnostics/traffic/interface"
DIAGNOSTICS_TRAFFIC_STREAM_ENDPOINT_PREFIX = "/api/diagnostics/traffic/stream"

_INTERFACE_FIELD_ALIASES: dict[str, tuple[str, ...]] = {
    "rx_bytes": ("rx_bytes", "inbytes", "bytes received", "bytes_received"),
    "tx_bytes": ("tx_bytes", "outbytes", "bytes transmitted", "bytes_transmitted"),
    "rx_packets": ("rx_packets", "inpkts", "packets received", "packets_received"),
    "tx_packets": ("tx_packets", "outpkts", "packets transmitted", "packets_transmitted"),
    "rx_errors": ("rx_errors", "inerrs", "input errors", "input_errors"),
    "tx_errors": ("tx_errors", "outerrs", "output errors", "output_errors"),
    "collisions": ("collisions",),
}


def _coalesce_identity(
    value: object | None,
    *,
    fallback: str,
    description: object | None = None,
) -> str:
    """Return a usable interface identity value with safe fallbacks.

    Args:
        value (object | None): Primary identity value candidate.
        fallback (str): Fallback identity to return when all candidates are empty.
        description (object | None): Secondary identity candidate from description.

    Returns:
        str: Normalized interface identity.
    """
    if isinstance(value, str):
        candidate = value.strip()
        if candidate:
            return candidate

    if isinstance(description, str):
        candidate = description.strip()
        if candidate:
            return candidate

    return fallback


def _first_int(row: Mapping[str, Any], aliases: tuple[str, ...]) -> int | None:
    """Return the first parseable integer from a row for the supplied aliases.

    Args:
        row (Mapping[str, Any]): Mapping that may contain alias keys.
        aliases (tuple[str, ...]): Candidate keys to inspect in order.

    Returns:
        int | None: First parseable integer value or ``None``.
    """
    for alias in aliases:
        value = try_to_int(row.get(alias))
        if value is not None:
            return value
    return None


def _source_interfaces(payload: Mapping[str, Any]) -> Mapping[str, Mapping[str, Any]]:
    """Return the interface mapping from supported OPNsense traffic payload shapes.

    Args:
        payload (Mapping[str, Any]): Raw OPNsense payload.

    Returns:
        Mapping[str, Mapping[str, Any]]: Normalized interface-name keyed mapping.
    """
    interfaces = payload.get("interfaces")
    if isinstance(interfaces, Mapping):
        return interfaces
    return {
        key: value for key, value in payload.items() if key != "time" and isinstance(value, Mapping)
    }


def normalize_traffic_payload(
    payload: Mapping[str, Any],
    *,
    interval: float,
    include_per_second_rates: bool = True,
) -> dict[str, Any]:
    """Normalize OPNsense diagnostics traffic payloads.

    Args:
        payload: Raw traffic payload from ``/api/diagnostics/traffic`` or a stream event.
        interval: Seconds represented by the traffic counters in the payload.
        include_per_second_rates: Derive per-second rates when true.

    Returns:
        dict[str, Any]: Normalized traffic sample with an ``interfaces`` mapping keyed by interface name.
    """
    sample_interval = interval if interval > 0 else 1.0
    sample_time = try_to_float(payload.get("time"))
    normalized: dict[str, Any] = {
        "time": sample_time,
        "interfaces": {},
    }

    for interface_name, row in _source_interfaces(payload).items():
        if not isinstance(interface_name, str) or not isinstance(row, Mapping):
            continue
        normalized_row: dict[str, Any] = {
            "interface": _coalesce_identity(row.get("interface"), fallback=interface_name),
            "name": _coalesce_identity(
                row.get("name"),
                fallback=interface_name,
                description=row.get("description"),
            ),
        }
        for normalized_name, aliases in _INTERFACE_FIELD_ALIASES.items():
            value = _first_int(row, aliases)
            if include_per_second_rates and value is not None and value < 0:
                continue
            if value is not None:
                normalized_row[normalized_name] = value

        rx_bytes = normalized_row.get("rx_bytes")
        tx_bytes = normalized_row.get("tx_bytes")
        rx_packets = normalized_row.get("rx_packets")
        tx_packets = normalized_row.get("tx_packets")
        if not any(
            isinstance(value, int) for value in (rx_bytes, tx_bytes, rx_packets, tx_packets)
        ):
            continue

        if include_per_second_rates:
            normalized_row["interval"] = sample_interval
            if isinstance(rx_bytes, int):
                normalized_row["rx_bytes_per_second"] = rx_bytes / sample_interval
                normalized_row["rx_bits_per_second"] = rx_bytes * 8 / sample_interval
            if isinstance(tx_bytes, int):
                normalized_row["tx_bytes_per_second"] = tx_bytes / sample_interval
                normalized_row["tx_bits_per_second"] = tx_bytes * 8 / sample_interval
            if isinstance(rx_packets, int):
                normalized_row["rx_packets_per_second"] = rx_packets / sample_interval
            if isinstance(tx_packets, int):
                normalized_row["tx_packets_per_second"] = tx_packets / sample_interval
        normalized["interfaces"][interface_name] = normalized_row

    return normalized


class TrafficMixin(AiopnsenseClientProtocol):
    """Diagnostics traffic methods for OPNsenseClient."""

    async def get_interface_traffic(self) -> dict[str, Any]:
        """Return a normalized diagnostics traffic snapshot.

        Returns:
            dict[str, Any]: Normalized diagnostics traffic sample. Returns an
                empty traffic sample when endpoint probing or response parsing
                fails.
        """
        empty_sample: dict[str, Any] = {"time": None, "interfaces": {}}

        try:
            if not await self._is_get_endpoint_available(DIAGNOSTICS_TRAFFIC_ENDPOINT):
                _LOGGER.debug("Diagnostics traffic endpoint unavailable")
                return empty_sample
            payload = await self._safe_dict_get(DIAGNOSTICS_TRAFFIC_ENDPOINT)
            return normalize_traffic_payload(payload, interval=1.0, include_per_second_rates=False)
        except (TimeoutError, RuntimeError, TypeError, ValueError, AttributeError) as exc:
            if self._throw_errors:
                raise
            _LOGGER.debug(
                "Falling back to empty interface traffic sample due to %s: %s",
                type(exc).__name__,
                exc,
            )
            return empty_sample

    async def stream_interface_traffic(
        self,
        poll_interval: int = 1,
    ) -> AsyncIterator[dict[str, Any]]:
        """Yield normalized diagnostics traffic stream samples.

        Args:
            poll_interval: OPNsense stream sample interval in seconds. Values
                less than 1 are clamped to 1.

        Yields:
            Normalized traffic samples. The first stream event is discarded because
            OPNsense stream endpoints commonly emit an initialization sample
            before interval deltas stabilize.
        """
        interval = max(poll_interval, 1)
        endpoint = f"{DIAGNOSTICS_TRAFFIC_STREAM_ENDPOINT_PREFIX}/{interval}"
        try:
            if not await self._is_get_endpoint_available(endpoint):
                _LOGGER.debug("Diagnostics traffic stream endpoint unavailable")
                return
        except (TimeoutError, RuntimeError, TypeError, ValueError, AttributeError) as exc:
            if self._throw_errors:
                raise
            _LOGGER.debug(
                "Falling back to empty interface traffic stream due to %s: %s",
                type(exc).__name__,
                exc,
            )
            return

        event_count = 0
        previous_time: float | None = None
        stream_read_timeout = max(interval + 1, DEFAULT_REQUEST_TIMEOUT_SECONDS)
        stream_events = self._stream_json_events(
            endpoint,
            yield_reset_events=True,
            sock_read_timeout_seconds=stream_read_timeout,
        )
        try:
            async for event in stream_events:
                if event.get(_STREAM_JSON_EVENT_RESET_KEY) is True:
                    previous_time = None
                    continue

                event_time = try_to_float(event.get("time"))
                sample_interval = float(interval)
                if event_time is None or (
                    previous_time is not None and event_time <= previous_time
                ):
                    previous_time = None
                    continue
                event_count += 1
                if previous_time is None:
                    previous_time = event_time
                else:
                    sample_interval = event_time - previous_time
                    previous_time = event_time

                if event_count == 1:
                    continue
                yield normalize_traffic_payload(event, interval=sample_interval)
        finally:
            await stream_events.aclose()