"""Diagnostics traffic methods for OPNsenseClient."""
from __future__ import annotations
from collections.abc import AsyncIterator, Mapping
from typing import Any
from .client_transport import _STREAM_JSON_EVENT_RESET_KEY
from .const import DEFAULT_REQUEST_TIMEOUT_SECONDS
from ._typing import AiopnsenseClientProtocol
from .helpers import _LOGGER, try_to_float, try_to_int
DIAGNOSTICS_TRAFFIC_ENDPOINT = "/api/diagnostics/traffic/interface"
DIAGNOSTICS_TRAFFIC_STREAM_ENDPOINT_PREFIX = "/api/diagnostics/traffic/stream"
_INTERFACE_FIELD_ALIASES: dict[str, tuple[str, ...]] = {
"rx_bytes": ("rx_bytes", "inbytes", "bytes received", "bytes_received"),
"tx_bytes": ("tx_bytes", "outbytes", "bytes transmitted", "bytes_transmitted"),
"rx_packets": ("rx_packets", "inpkts", "packets received", "packets_received"),
"tx_packets": ("tx_packets", "outpkts", "packets transmitted", "packets_transmitted"),
"rx_errors": ("rx_errors", "inerrs", "input errors", "input_errors"),
"tx_errors": ("tx_errors", "outerrs", "output errors", "output_errors"),
"collisions": ("collisions",),
}
def _coalesce_identity(
value: object | None,
*,
fallback: str,
description: object | None = None,
) -> str:
"""Return a usable interface identity value with safe fallbacks.
Args:
value (object | None): Primary identity value candidate.
fallback (str): Fallback identity to return when all candidates are empty.
description (object | None): Secondary identity candidate from description.
Returns:
str: Normalized interface identity.
"""
if isinstance(value, str):
candidate = value.strip()
if candidate:
return candidate
if isinstance(description, str):
candidate = description.strip()
if candidate:
return candidate
return fallback
def _first_int(row: Mapping[str, Any], aliases: tuple[str, ...]) -> int | None:
"""Return the first parseable integer from a row for the supplied aliases.
Args:
row (Mapping[str, Any]): Mapping that may contain alias keys.
aliases (tuple[str, ...]): Candidate keys to inspect in order.
Returns:
int | None: First parseable integer value or ``None``.
"""
for alias in aliases:
value = try_to_int(row.get(alias))
if value is not None:
return value
return None
def _source_interfaces(payload: Mapping[str, Any]) -> Mapping[str, Mapping[str, Any]]:
"""Return the interface mapping from supported OPNsense traffic payload shapes.
Args:
payload (Mapping[str, Any]): Raw OPNsense payload.
Returns:
Mapping[str, Mapping[str, Any]]: Normalized interface-name keyed mapping.
"""
interfaces = payload.get("interfaces")
if isinstance(interfaces, Mapping):
return interfaces
return {
key: value for key, value in payload.items() if key != "time" and isinstance(value, Mapping)
}
def normalize_traffic_payload(
payload: Mapping[str, Any],
*,
interval: float,
include_per_second_rates: bool = True,
) -> dict[str, Any]:
"""Normalize OPNsense diagnostics traffic payloads.
Args:
payload: Raw traffic payload from ``/api/diagnostics/traffic`` or a stream event.
interval: Seconds represented by the traffic counters in the payload.
include_per_second_rates: Derive per-second rates when true.
Returns:
dict[str, Any]: Normalized traffic sample with an ``interfaces`` mapping keyed by interface name.
"""
sample_interval = interval if interval > 0 else 1.0
sample_time = try_to_float(payload.get("time"))
normalized: dict[str, Any] = {
"time": sample_time,
"interfaces": {},
}
for interface_name, row in _source_interfaces(payload).items():
if not isinstance(interface_name, str) or not isinstance(row, Mapping):
continue
normalized_row: dict[str, Any] = {
"interface": _coalesce_identity(row.get("interface"), fallback=interface_name),
"name": _coalesce_identity(
row.get("name"),
fallback=interface_name,
description=row.get("description"),
),
}
for normalized_name, aliases in _INTERFACE_FIELD_ALIASES.items():
value = _first_int(row, aliases)
if include_per_second_rates and value is not None and value < 0:
continue
if value is not None:
normalized_row[normalized_name] = value
rx_bytes = normalized_row.get("rx_bytes")
tx_bytes = normalized_row.get("tx_bytes")
rx_packets = normalized_row.get("rx_packets")
tx_packets = normalized_row.get("tx_packets")
if not any(
isinstance(value, int) for value in (rx_bytes, tx_bytes, rx_packets, tx_packets)
):
continue
if include_per_second_rates:
normalized_row["interval"] = sample_interval
if isinstance(rx_bytes, int):
normalized_row["rx_bytes_per_second"] = rx_bytes / sample_interval
normalized_row["rx_bits_per_second"] = rx_bytes * 8 / sample_interval
if isinstance(tx_bytes, int):
normalized_row["tx_bytes_per_second"] = tx_bytes / sample_interval
normalized_row["tx_bits_per_second"] = tx_bytes * 8 / sample_interval
if isinstance(rx_packets, int):
normalized_row["rx_packets_per_second"] = rx_packets / sample_interval
if isinstance(tx_packets, int):
normalized_row["tx_packets_per_second"] = tx_packets / sample_interval
normalized["interfaces"][interface_name] = normalized_row
return normalized
class TrafficMixin(AiopnsenseClientProtocol):
"""Diagnostics traffic methods for OPNsenseClient."""
async def get_interface_traffic(self) -> dict[str, Any]:
"""Return a normalized diagnostics traffic snapshot.
Returns:
dict[str, Any]: Normalized diagnostics traffic sample. Returns an
empty traffic sample when endpoint probing or response parsing
fails.
"""
empty_sample: dict[str, Any] = {"time": None, "interfaces": {}}
try:
if not await self._is_get_endpoint_available(DIAGNOSTICS_TRAFFIC_ENDPOINT):
_LOGGER.debug("Diagnostics traffic endpoint unavailable")
return empty_sample
payload = await self._safe_dict_get(DIAGNOSTICS_TRAFFIC_ENDPOINT)
return normalize_traffic_payload(payload, interval=1.0, include_per_second_rates=False)
except (TimeoutError, RuntimeError, TypeError, ValueError, AttributeError) as exc:
if self._throw_errors:
raise
_LOGGER.debug(
"Falling back to empty interface traffic sample due to %s: %s",
type(exc).__name__,
exc,
)
return empty_sample
async def stream_interface_traffic(
self,
poll_interval: int = 1,
) -> AsyncIterator[dict[str, Any]]:
"""Yield normalized diagnostics traffic stream samples.
Args:
poll_interval: OPNsense stream sample interval in seconds. Values
less than 1 are clamped to 1.
Yields:
Normalized traffic samples. The first stream event is discarded because
OPNsense stream endpoints commonly emit an initialization sample
before interval deltas stabilize.
"""
interval = max(poll_interval, 1)
endpoint = f"{DIAGNOSTICS_TRAFFIC_STREAM_ENDPOINT_PREFIX}/{interval}"
try:
if not await self._is_get_endpoint_available(endpoint):
_LOGGER.debug("Diagnostics traffic stream endpoint unavailable")
return
except (TimeoutError, RuntimeError, TypeError, ValueError, AttributeError) as exc:
if self._throw_errors:
raise
_LOGGER.debug(
"Falling back to empty interface traffic stream due to %s: %s",
type(exc).__name__,
exc,
)
return
event_count = 0
previous_time: float | None = None
stream_read_timeout = max(interval + 1, DEFAULT_REQUEST_TIMEOUT_SECONDS)
stream_events = self._stream_json_events(
endpoint,
yield_reset_events=True,
sock_read_timeout_seconds=stream_read_timeout,
)
try:
async for event in stream_events:
if event.get(_STREAM_JSON_EVENT_RESET_KEY) is True:
previous_time = None
continue
event_time = try_to_float(event.get("time"))
sample_interval = float(interval)
if event_time is None or (
previous_time is not None and event_time <= previous_time
):
previous_time = None
continue
event_count += 1
if previous_time is None:
previous_time = event_time
else:
sample_interval = event_time - previous_time
previous_time = event_time
if event_count == 1:
continue
yield normalize_traffic_payload(event, interval=sample_interval)
finally:
await stream_events.aclose()