Source code for aiopnsense.telemetry

"""Telemetry and interface statistics methods for OPNsenseClient."""

from collections.abc import MutableMapping
from datetime import datetime, timedelta
import re
from typing import Any

from dateutil.parser import ParserError, UnknownTimezoneWarning, parse

from ._typing import AiopnsenseClientProtocol
from .const import AMBIGUOUS_TZINFOS
from .helpers import _LOGGER, _log_errors, dict_get, try_to_float, try_to_int


class TelemetryMixin(AiopnsenseClientProtocol):
    """Telemetry methods for OPNsenseClient."""

    @staticmethod
    def _usage_percent(
        used: int | None,
        total: int | None,
        default: int | None = None,
    ) -> int | None:
        """Calculate a rounded usage percentage for valid integer counters.

        Args:
            used (int | None): Used amount.
            total (int | None): Total capacity.
            default (int | None, optional): Value returned for invalid or zero totals.

        Returns:
            int | None: Rounded percentage, or ``default`` when counters are invalid.
        """
        if isinstance(used, int) and isinstance(total, int) and total > 0:
            return round(used / total * 100)
        return default

    @_log_errors
    async def get_telemetry(self) -> MutableMapping[str, Any]:
        """Return consolidated system telemetry from OPNsense.

        Returns:
            MutableMapping[str, Any]: Mapping containing ``mbuf``,
                ``pfstate``, ``memory``, ``system``, ``cpu``,
                ``filesystems``, and ``temps`` sections populated from the
                corresponding diagnostics endpoints.
        """
        telemetry: dict[str, Any] = {}
        telemetry["mbuf"] = await self._get_telemetry_mbuf()
        telemetry["pfstate"] = await self._get_telemetry_pfstate()
        telemetry["memory"] = await self._get_telemetry_memory()
        telemetry["system"] = await self._get_telemetry_system()
        telemetry["cpu"] = await self._get_telemetry_cpu()
        telemetry["filesystems"] = await self._get_telemetry_filesystems()
        telemetry["temps"] = await self._get_telemetry_temps()
        return telemetry

    @_log_errors
    async def get_interfaces(self) -> MutableMapping[str, Any]:
        """Return normalized interface status and counters.

        Returns:
            MutableMapping[str, Any]: Mapping keyed by OPNsense interface
                identifier. Each interface includes packet and byte counters,
                error counters, status, addresses, media, gateway and route
                metadata, MAC address, enabled flag, and VLAN tag when
                available.
        """
        interfaces_endpoint = "/api/interfaces/overview/export"
        if not await self.is_endpoint_available(interfaces_endpoint):
            _LOGGER.debug("Interface overview endpoint unavailable")
            return {}
        interface_info = await self._safe_list_get(interfaces_endpoint)
        if not len(interface_info) > 0:
            return {}
        interfaces: dict[str, Any] = {}
        for ifinfo in interface_info:
            interface: dict[str, Any] = {}
            if not isinstance(ifinfo, MutableMapping) or ifinfo.get("identifier", "") == "":
                continue
            statistics = ifinfo.get("statistics", {})
            if not isinstance(statistics, MutableMapping):
                statistics = {}
            packets_received = try_to_int(statistics.get("packets received"))
            packets_transmitted = try_to_int(statistics.get("packets transmitted"))
            bytes_received = try_to_int(statistics.get("bytes received"))
            bytes_transmitted = try_to_int(statistics.get("bytes transmitted"))
            interface["inpkts"] = packets_received
            interface["outpkts"] = packets_transmitted
            interface["inbytes"] = bytes_received
            interface["outbytes"] = bytes_transmitted
            interface["inbytes_frmt"] = bytes_received
            interface["outbytes_frmt"] = bytes_transmitted
            interface["inerrs"] = try_to_int(statistics.get("input errors"))
            interface["outerrs"] = try_to_int(statistics.get("output errors"))
            interface["collisions"] = try_to_int(statistics.get("collisions"))
            interface["interface"] = ifinfo.get("identifier", "")
            interface["name"] = ifinfo.get("description", "")
            interface["status"] = ""
            if ifinfo.get("status", "") in {"down", "no carrier", "up"}:
                interface["status"] = ifinfo.get("status", "")
            elif ifinfo.get("status", "") == "associated":
                interface["status"] = "up"
            interface["ipv4"] = ifinfo.get("addr4", None)
            interface["ipv6"] = ifinfo.get("addr6", None)
            interface["media"] = ifinfo.get("media", None)
            interface["gateways"] = ifinfo.get("gateways", [])
            interface["routes"] = ifinfo.get("routes", [])
            interface["device"] = ifinfo.get("device", None)
            if ifinfo.get("macaddr", None) and ifinfo.get("macaddr", None) != "00:00:00:00:00:00":
                interface["mac"] = ifinfo.get("macaddr", None)
            interface["enabled"] = ifinfo.get("enabled", None)
            interface["vlan_tag"] = ifinfo.get("vlan_tag", None)
            interfaces[ifinfo.get("identifier", "")] = interface
        return interfaces

    @_log_errors
    async def _get_telemetry_mbuf(self) -> MutableMapping[str, Any]:
        """Collect mbuf usage telemetry.

        Returns:
            MutableMapping[str, Any]: Mapping containing current and total mbuf
                counts plus ``used_percent``.
        """
        mbuf_endpoint = "/api/diagnostics/system/system_mbuf"
        if not await self.is_endpoint_available(mbuf_endpoint):
            _LOGGER.debug("Telemetry mbuf endpoint unavailable")
            return {}
        mbuf_info = await self._safe_dict_get(mbuf_endpoint)
        mbuf: dict[str, Any] = {}
        mbuf["used"] = try_to_int(dict_get(mbuf_info, "mbuf-statistics.mbuf-current"))
        mbuf["total"] = try_to_int(dict_get(mbuf_info, "mbuf-statistics.mbuf-total"))
        mbuf["used_percent"] = self._usage_percent(mbuf["used"], mbuf["total"])
        return mbuf

    @_log_errors
    async def _get_telemetry_pfstate(self) -> MutableMapping[str, Any]:
        """Collect PF state table telemetry.

        Returns:
            MutableMapping[str, Any]: Mapping containing current and maximum PF
                state counts plus ``used_percent``.
        """
        pfstate_endpoint = "/api/diagnostics/firewall/pf_states"
        if not await self.is_endpoint_available(pfstate_endpoint):
            _LOGGER.debug("Telemetry pfstate endpoint unavailable")
            return {}
        pfstate_info = await self._safe_dict_get(pfstate_endpoint)
        pfstate: dict[str, Any] = {}
        pfstate["used"] = try_to_int(pfstate_info.get("current", None))
        pfstate["total"] = try_to_int(pfstate_info.get("limit", None))
        pfstate["used_percent"] = self._usage_percent(pfstate["used"], pfstate["total"])
        return pfstate

    @_log_errors
    async def _get_telemetry_memory(self) -> MutableMapping[str, Any]:
        """Collect memory and swap telemetry.

        Returns:
            MutableMapping[str, Any]: Mapping containing physical memory,
                memory used, and ``used_percent``. Adds swap totals and usage
                percentage when swap telemetry is available.
        """
        memory_endpoint = await self._get_endpoint_path(
            snake_case_path="/api/diagnostics/system/system_resources",
            camel_case_path="/api/diagnostics/system/systemResources",
        )
        if not await self.is_endpoint_available(memory_endpoint):
            _LOGGER.debug("Telemetry memory endpoint unavailable")
            return {
                "physmem": None,
                "used": None,
                "used_percent": None,
            }
        memory_info = await self._safe_dict_get(memory_endpoint)
        memory: dict[str, Any] = {}
        memory["physmem"] = try_to_int(dict_get(memory_info, "memory.total"))
        memory["used"] = try_to_int(dict_get(memory_info, "memory.used"))
        memory["used_percent"] = self._usage_percent(memory["used"], memory["physmem"])
        swap_endpoint = "/api/diagnostics/system/system_swap"
        if not await self.is_endpoint_available(swap_endpoint):
            _LOGGER.debug("Telemetry swap endpoint unavailable")
            return memory

        swap_info = await self._safe_dict_get(swap_endpoint)
        swap_rows = swap_info.get("swap")
        if not isinstance(swap_rows, list) or not swap_rows:
            return memory
        swap_row = swap_rows[0]
        if not isinstance(swap_row, MutableMapping):
            return memory
        memory["swap_total"] = try_to_int(swap_row.get("total"))
        memory["swap_reserved"] = try_to_int(swap_row.get("used"))
        memory["swap_used_percent"] = self._usage_percent(
            memory["swap_reserved"], memory["swap_total"], default=0
        )
        return memory

    @_log_errors
    async def _get_telemetry_system(self) -> MutableMapping[str, Any]:
        """Collect system time, uptime, boottime, and load telemetry.

        Returns:
            MutableMapping[str, Any]: Mapping containing boot time, uptime, and
                one-, five-, and fifteen-minute load averages when parseable.
        """
        time_endpoint = await self._get_endpoint_path(
            snake_case_path="/api/diagnostics/system/system_time",
            camel_case_path="/api/diagnostics/system/systemTime",
        )
        if not await self.is_endpoint_available(time_endpoint):
            _LOGGER.debug("Telemetry system time endpoint unavailable")
            return {}
        time_info = await self._safe_dict_get(time_endpoint)
        system: dict[str, Any] = {}
        opnsense_tz = await self._get_opnsense_timezone(time_info.get("datetime"))

        try:
            systemtime: datetime = parse(time_info["datetime"], tzinfos=AMBIGUOUS_TZINFOS)
            if systemtime.tzinfo is None:
                systemtime = systemtime.replace(tzinfo=opnsense_tz)
        except (KeyError, ValueError, TypeError, ParserError, UnknownTimezoneWarning) as e:
            _LOGGER.warning(
                "Failed to parse opnsense system time (aka. datetime), using HA system time instead: %s. %s: %s",
                time_info.get("datetime"),
                type(e).__name__,
                e,
            )
            systemtime = datetime.now().astimezone()

        pattern = re.compile(r"^(?:(\d+)\s+days?,\s+)?(\d{2}):(\d{2}):(\d{2})$")
        match = pattern.match(time_info.get("uptime", ""))
        if match:
            days_str, hours_str, minutes_str, seconds_str = match.groups()
            days = try_to_int(days_str, 0) or 0
            hours = try_to_int(hours_str, 0) or 0
            minutes = try_to_int(minutes_str, 0) or 0
            seconds = try_to_int(seconds_str, 0) or 0

            uptime = days * 86400 + hours * 3600 + minutes * 60 + seconds

        boottime: datetime | None = None
        if "boottime" in time_info:
            try:
                boottime = parse(time_info["boottime"], tzinfos=AMBIGUOUS_TZINFOS)
                if boottime and boottime.tzinfo is None:
                    boottime = boottime.replace(tzinfo=opnsense_tz)
            except (ValueError, TypeError, ParserError, UnknownTimezoneWarning) as e:
                _LOGGER.info(
                    "Failed to parse opnsense boottime: %s. %s: %s",
                    time_info["boottime"],
                    type(e).__name__,
                    e,
                )

        if boottime:
            system["boottime"] = boottime.timestamp()
            if match:
                system["uptime"] = uptime
            else:
                system["uptime"] = int((systemtime - boottime).total_seconds())
        elif match:
            system["uptime"] = uptime
            boottime = systemtime - timedelta(seconds=system["uptime"])
            system["boottime"] = boottime.timestamp()
        else:
            _LOGGER.warning("Invalid uptime format")

        load_str: str = time_info.get("loadavg", "")
        load_list: list[str] = load_str.split(", ")
        if len(load_list) == 3:
            system["load_average"] = {
                "one_minute": try_to_float(load_list[0]),
                "five_minute": try_to_float(load_list[1]),
                "fifteen_minute": try_to_float(load_list[2]),
            }
        else:
            system["load_average"] = {
                "one_minute": None,
                "five_minute": None,
                "fifteen_minute": None,
            }
        return system

    @_log_errors
    async def _get_telemetry_cpu(self) -> MutableMapping[str, Any]:
        """Collect CPU core count and usage telemetry.

        Returns:
            MutableMapping[str, Any]: Mapping containing CPU core count and
                total/user/nice/system/interrupt/idle usage percentages when
                available.
        """
        cpu_type_endpoint = await self._get_endpoint_path(
            snake_case_path="/api/diagnostics/cpu_usage/get_c_p_u_type",
            camel_case_path="/api/diagnostics/cpu_usage/getCPUType",
        )
        if not await self.is_endpoint_available(cpu_type_endpoint):
            _LOGGER.debug("Telemetry CPU type endpoint unavailable")
            return {}
        cputype_info = await self._safe_list_get(cpu_type_endpoint)
        if not len(cputype_info) > 0:
            return {}
        cpu: dict[str, Any] = {}
        cores_match = re.search(r"\((\d+) cores", cputype_info[0])
        cpu["count"] = try_to_int(cores_match.group(1)) if cores_match else 0

        cpu_stream_endpoint = "/api/diagnostics/cpu_usage/stream"
        if not await self.is_endpoint_available(cpu_stream_endpoint):
            _LOGGER.debug("Telemetry CPU stream endpoint unavailable")
            return cpu
        cpustream_info = await self._get_from_stream(cpu_stream_endpoint)
        # {"total":29,"user":2,"nice":0,"sys":27,"intr":0,"idle":70}
        cpu["usage_total"] = try_to_int(cpustream_info.get("total", None))
        cpu["usage_user"] = try_to_int(cpustream_info.get("user", None))
        cpu["usage_nice"] = try_to_int(cpustream_info.get("nice", None))
        cpu["usage_system"] = try_to_int(cpustream_info.get("sys", None))
        cpu["usage_interrupt"] = try_to_int(cpustream_info.get("intr", None))
        cpu["usage_idle"] = try_to_int(cpustream_info.get("idle", None))
        return cpu

    @_log_errors
    async def _get_telemetry_filesystems(self) -> list:
        """Collect filesystem telemetry entries from diagnostics.

        Returns:
            list: Filesystem device rows from OPNsense disk diagnostics, or an
                empty list when the endpoint is unavailable.
        """
        filesystems_endpoint = await self._get_endpoint_path(
            snake_case_path="/api/diagnostics/system/system_disk",
            camel_case_path="/api/diagnostics/system/systemDisk",
        )
        if not await self.is_endpoint_available(filesystems_endpoint):
            _LOGGER.debug("Telemetry filesystem endpoint unavailable")
            return []
        filesystems_info = await self._safe_dict_get(filesystems_endpoint)
        filesystems: list = filesystems_info.get("devices", [])
        return filesystems

    @_log_errors
    async def get_gateways(self) -> MutableMapping[str, Any]:
        """Return OPNsense gateway status details.

        Returns:
            MutableMapping[str, Any]: Mapping keyed by gateway name, with each
                gateway row preserved from OPNsense and ``status`` normalized
                from the translated status when available.
        """
        gateway_endpoint = "/api/routes/gateway/status"
        if not await self.is_endpoint_available(gateway_endpoint):
            _LOGGER.debug("Gateway status endpoint unavailable")
            return {}
        gateways_info = await self._safe_dict_get(gateway_endpoint)
        gateways: dict[str, Any] = {}
        for gw_info in gateways_info.get("items", []):
            if isinstance(gw_info, MutableMapping) and "name" in gw_info:
                gateways[gw_info["name"]] = gw_info
        for gateway in gateways.values():
            gateway["status"] = gateway.pop("status_translated", gateway.get("status", "")).lower()
        return gateways

    @_log_errors
    async def _get_telemetry_temps(self) -> MutableMapping[str, Any]:
        """Collect temperature sensor telemetry.

        Returns:
            MutableMapping[str, Any]: Mapping keyed by temperature device id,
                with each value containing the numeric temperature, display
                name, and original device id.
        """
        temperature_endpoint = await self._get_endpoint_path(
            snake_case_path="/api/diagnostics/system/system_temperature",
            camel_case_path="/api/diagnostics/system/systemTemperature",
        )
        if not await self.is_endpoint_available(temperature_endpoint):
            _LOGGER.debug("Telemetry temperature endpoint unavailable")
            return {}
        temps_info = await self._safe_list_get(temperature_endpoint)
        if not len(temps_info) > 0:
            return {}
        temps: dict[str, Any] = {}
        for i, temp_info in enumerate(temps_info):
            temp: dict[str, Any] = {}
            temp["temperature"] = try_to_float(temp_info.get("temperature", 0), 0)
            temp["name"] = (
                f"{temp_info.get('type_translated', 'Num')} {temp_info.get('device_seq', i)}"
            )
            temp["device_id"] = temp_info.get("device", str(i))
            temps[temp_info.get("device", str(i)).replace(".", "_")] = temp
        return temps