Source code for aiopnsense.telemetry

"""Telemetry and interface statistics methods for OPNsenseClient."""

from collections.abc import MutableMapping
from datetime import datetime, timedelta
import re
from typing import Any

from dateutil.parser import ParserError, UnknownTimezoneWarning, parse

from ._typing import AiopnsenseClientProtocol
from .const import AMBIGUOUS_TZINFOS
from .helpers import _LOGGER, _log_errors, dict_get, try_to_float, try_to_int


class TelemetryMixin(AiopnsenseClientProtocol):
    """Telemetry methods for OPNsenseClient."""

    @staticmethod
    def _usage_percent(
        used: int | None,
        total: int | None,
        default: int | None = None,
    ) -> int | None:
        """Calculate a rounded usage percentage for valid integer counters.

        Args:
            used (int | None): Used amount.
            total (int | None): Total capacity.
            default (int | None, optional): Value returned for invalid or zero totals.

        Returns:
            int | None: Rounded percentage, or ``default`` when counters are invalid.
        """
        if isinstance(used, int) and isinstance(total, int) and total > 0:
            return round(used / total * 100)
        return default

    @_log_errors
    async def get_telemetry(self) -> MutableMapping[str, Any]:
        """Get telemetry data from OPNsense.

        Returns:
            MutableMapping[str, Any]: Normalized data returned by the related OPNsense endpoint.
        """
        telemetry: dict[str, Any] = {}
        telemetry["mbuf"] = await self._get_telemetry_mbuf()
        telemetry["pfstate"] = await self._get_telemetry_pfstate()
        telemetry["memory"] = await self._get_telemetry_memory()
        telemetry["system"] = await self._get_telemetry_system()
        telemetry["cpu"] = await self._get_telemetry_cpu()
        telemetry["filesystems"] = await self._get_telemetry_filesystems()
        telemetry["temps"] = await self._get_telemetry_temps()
        return telemetry

    @_log_errors
    async def get_interfaces(self) -> MutableMapping[str, Any]:
        """Return all OPNsense interfaces.

        Returns:
            MutableMapping[str, Any]: Normalized data returned by the related OPNsense endpoint.
        """
        interfaces_endpoint = "/api/interfaces/overview/export"
        if not await self.is_endpoint_available(interfaces_endpoint):
            _LOGGER.debug("Interface overview endpoint unavailable")
            return {}
        interface_info = await self._safe_list_get(interfaces_endpoint)
        if not len(interface_info) > 0:
            return {}
        interfaces: dict[str, Any] = {}
        for ifinfo in interface_info:
            interface: dict[str, Any] = {}
            if not isinstance(ifinfo, MutableMapping) or ifinfo.get("identifier", "") == "":
                continue
            statistics = ifinfo.get("statistics", {})
            if not isinstance(statistics, MutableMapping):
                statistics = {}
            packets_received = try_to_int(statistics.get("packets received"))
            packets_transmitted = try_to_int(statistics.get("packets transmitted"))
            bytes_received = try_to_int(statistics.get("bytes received"))
            bytes_transmitted = try_to_int(statistics.get("bytes transmitted"))
            interface["inpkts"] = packets_received
            interface["outpkts"] = packets_transmitted
            interface["inbytes"] = bytes_received
            interface["outbytes"] = bytes_transmitted
            interface["inbytes_frmt"] = bytes_received
            interface["outbytes_frmt"] = bytes_transmitted
            interface["inerrs"] = try_to_int(statistics.get("input errors"))
            interface["outerrs"] = try_to_int(statistics.get("output errors"))
            interface["collisions"] = try_to_int(statistics.get("collisions"))
            interface["interface"] = ifinfo.get("identifier", "")
            interface["name"] = ifinfo.get("description", "")
            interface["status"] = ""
            if ifinfo.get("status", "") in {"down", "no carrier", "up"}:
                interface["status"] = ifinfo.get("status", "")
            elif ifinfo.get("status", "") == "associated":
                interface["status"] = "up"
            interface["ipv4"] = ifinfo.get("addr4", None)
            interface["ipv6"] = ifinfo.get("addr6", None)
            interface["media"] = ifinfo.get("media", None)
            interface["gateways"] = ifinfo.get("gateways", [])
            interface["routes"] = ifinfo.get("routes", [])
            interface["device"] = ifinfo.get("device", None)
            if ifinfo.get("macaddr", None) and ifinfo.get("macaddr", None) != "00:00:00:00:00:00":
                interface["mac"] = ifinfo.get("macaddr", None)
            interface["enabled"] = ifinfo.get("enabled", None)
            interface["vlan_tag"] = ifinfo.get("vlan_tag", None)
            interfaces[ifinfo.get("identifier", "")] = interface
        return interfaces

    @_log_errors
    async def _get_telemetry_mbuf(self) -> MutableMapping[str, Any]:
        """Collect mbuf usage telemetry.

        Returns:
            MutableMapping[str, Any]: Mapping containing normalized fields for downstream use.
        """
        mbuf_endpoint = "/api/diagnostics/system/system_mbuf"
        if not await self.is_endpoint_available(mbuf_endpoint):
            _LOGGER.debug("Telemetry mbuf endpoint unavailable")
            return {}
        mbuf_info = await self._safe_dict_get(mbuf_endpoint)
        mbuf: dict[str, Any] = {}
        mbuf["used"] = try_to_int(dict_get(mbuf_info, "mbuf-statistics.mbuf-current"))
        mbuf["total"] = try_to_int(dict_get(mbuf_info, "mbuf-statistics.mbuf-total"))
        mbuf["used_percent"] = self._usage_percent(mbuf["used"], mbuf["total"])
        return mbuf

    @_log_errors
    async def _get_telemetry_pfstate(self) -> MutableMapping[str, Any]:
        """Collect PF state table telemetry.

        Returns:
            MutableMapping[str, Any]: Mapping containing normalized fields for downstream use.
        """
        pfstate_endpoint = "/api/diagnostics/firewall/pf_states"
        if not await self.is_endpoint_available(pfstate_endpoint):
            _LOGGER.debug("Telemetry pfstate endpoint unavailable")
            return {}
        pfstate_info = await self._safe_dict_get(pfstate_endpoint)
        pfstate: dict[str, Any] = {}
        pfstate["used"] = try_to_int(pfstate_info.get("current", None))
        pfstate["total"] = try_to_int(pfstate_info.get("limit", None))
        pfstate["used_percent"] = self._usage_percent(pfstate["used"], pfstate["total"])
        return pfstate

    @_log_errors
    async def _get_telemetry_memory(self) -> MutableMapping[str, Any]:
        """Collect memory and swap telemetry.

        Returns:
            MutableMapping[str, Any]: Mapping containing normalized fields for downstream use.
        """
        memory_endpoint = await self._get_endpoint_path(
            snake_case_path="/api/diagnostics/system/system_resources",
            camel_case_path="/api/diagnostics/system/systemResources",
        )
        if not await self.is_endpoint_available(memory_endpoint):
            _LOGGER.debug("Telemetry memory endpoint unavailable")
            return {
                "physmem": None,
                "used": None,
                "used_percent": None,
            }
        memory_info = await self._safe_dict_get(memory_endpoint)
        memory: dict[str, Any] = {}
        memory["physmem"] = try_to_int(dict_get(memory_info, "memory.total"))
        memory["used"] = try_to_int(dict_get(memory_info, "memory.used"))
        memory["used_percent"] = self._usage_percent(memory["used"], memory["physmem"])
        swap_endpoint = "/api/diagnostics/system/system_swap"
        if not await self.is_endpoint_available(swap_endpoint):
            _LOGGER.debug("Telemetry swap endpoint unavailable")
            return memory

        swap_info = await self._safe_dict_get(swap_endpoint)
        swap_rows = swap_info.get("swap")
        if not isinstance(swap_rows, list) or not swap_rows:
            return memory
        swap_row = swap_rows[0]
        if not isinstance(swap_row, MutableMapping):
            return memory
        memory["swap_total"] = try_to_int(swap_row.get("total"))
        memory["swap_reserved"] = try_to_int(swap_row.get("used"))
        memory["swap_used_percent"] = self._usage_percent(
            memory["swap_reserved"], memory["swap_total"], default=0
        )
        return memory

    @_log_errors
    async def _get_telemetry_system(self) -> MutableMapping[str, Any]:
        """Collect system time, uptime, boottime, and load telemetry.

        Returns:
            MutableMapping[str, Any]: Mapping containing normalized fields for downstream use.
        """
        time_endpoint = await self._get_endpoint_path(
            snake_case_path="/api/diagnostics/system/system_time",
            camel_case_path="/api/diagnostics/system/systemTime",
        )
        if not await self.is_endpoint_available(time_endpoint):
            _LOGGER.debug("Telemetry system time endpoint unavailable")
            return {}
        time_info = await self._safe_dict_get(time_endpoint)
        system: dict[str, Any] = {}
        opnsense_tz = await self._get_opnsense_timezone(time_info.get("datetime"))

        try:
            systemtime: datetime = parse(time_info["datetime"], tzinfos=AMBIGUOUS_TZINFOS)
            if systemtime.tzinfo is None:
                systemtime = systemtime.replace(tzinfo=opnsense_tz)
        except (KeyError, ValueError, TypeError, ParserError, UnknownTimezoneWarning) as e:
            _LOGGER.warning(
                "Failed to parse opnsense system time (aka. datetime), using HA system time instead: %s. %s: %s",
                time_info.get("datetime"),
                type(e).__name__,
                e,
            )
            systemtime = datetime.now().astimezone()

        pattern = re.compile(r"^(?:(\d+)\s+days?,\s+)?(\d{2}):(\d{2}):(\d{2})$")
        match = pattern.match(time_info.get("uptime", ""))
        if match:
            days_str, hours_str, minutes_str, seconds_str = match.groups()
            days = try_to_int(days_str, 0) or 0
            hours = try_to_int(hours_str, 0) or 0
            minutes = try_to_int(minutes_str, 0) or 0
            seconds = try_to_int(seconds_str, 0) or 0

            uptime = days * 86400 + hours * 3600 + minutes * 60 + seconds

        boottime: datetime | None = None
        if "boottime" in time_info:
            try:
                boottime = parse(time_info["boottime"], tzinfos=AMBIGUOUS_TZINFOS)
                if boottime and boottime.tzinfo is None:
                    boottime = boottime.replace(tzinfo=opnsense_tz)
            except (ValueError, TypeError, ParserError, UnknownTimezoneWarning) as e:
                _LOGGER.info(
                    "Failed to parse opnsense boottime: %s. %s: %s",
                    time_info["boottime"],
                    type(e).__name__,
                    e,
                )

        if boottime:
            system["boottime"] = boottime.timestamp()
            if match:
                system["uptime"] = uptime
            else:
                system["uptime"] = int((systemtime - boottime).total_seconds())
        elif match:
            system["uptime"] = uptime
            boottime = systemtime - timedelta(seconds=system["uptime"])
            system["boottime"] = boottime.timestamp()
        else:
            _LOGGER.warning("Invalid uptime format")

        load_str: str = time_info.get("loadavg", "")
        load_list: list[str] = load_str.split(", ")
        if len(load_list) == 3:
            system["load_average"] = {
                "one_minute": try_to_float(load_list[0]),
                "five_minute": try_to_float(load_list[1]),
                "fifteen_minute": try_to_float(load_list[2]),
            }
        else:
            system["load_average"] = {
                "one_minute": None,
                "five_minute": None,
                "fifteen_minute": None,
            }
        return system

    @_log_errors
    async def _get_telemetry_cpu(self) -> MutableMapping[str, Any]:
        """Collect CPU core count and usage telemetry.

        Returns:
            MutableMapping[str, Any]: Mapping containing normalized fields for downstream use.
        """
        cpu_type_endpoint = await self._get_endpoint_path(
            snake_case_path="/api/diagnostics/cpu_usage/get_c_p_u_type",
            camel_case_path="/api/diagnostics/cpu_usage/getCPUType",
        )
        if not await self.is_endpoint_available(cpu_type_endpoint):
            _LOGGER.debug("Telemetry CPU type endpoint unavailable")
            return {}
        cputype_info = await self._safe_list_get(cpu_type_endpoint)
        if not len(cputype_info) > 0:
            return {}
        cpu: dict[str, Any] = {}
        cores_match = re.search(r"\((\d+) cores", cputype_info[0])
        cpu["count"] = try_to_int(cores_match.group(1)) if cores_match else 0

        cpu_stream_endpoint = "/api/diagnostics/cpu_usage/stream"
        if not await self.is_endpoint_available(cpu_stream_endpoint):
            _LOGGER.debug("Telemetry CPU stream endpoint unavailable")
            return cpu
        cpustream_info = await self._get_from_stream(cpu_stream_endpoint)
        # {"total":29,"user":2,"nice":0,"sys":27,"intr":0,"idle":70}
        cpu["usage_total"] = try_to_int(cpustream_info.get("total", None))
        cpu["usage_user"] = try_to_int(cpustream_info.get("user", None))
        cpu["usage_nice"] = try_to_int(cpustream_info.get("nice", None))
        cpu["usage_system"] = try_to_int(cpustream_info.get("sys", None))
        cpu["usage_interrupt"] = try_to_int(cpustream_info.get("intr", None))
        cpu["usage_idle"] = try_to_int(cpustream_info.get("idle", None))
        return cpu

    @_log_errors
    async def _get_telemetry_filesystems(self) -> list:
        """Collect filesystem telemetry entries from diagnostics.

        Returns:
            list: List of normalized entries produced by this method.
        """
        filesystems_endpoint = await self._get_endpoint_path(
            snake_case_path="/api/diagnostics/system/system_disk",
            camel_case_path="/api/diagnostics/system/systemDisk",
        )
        if not await self.is_endpoint_available(filesystems_endpoint):
            _LOGGER.debug("Telemetry filesystem endpoint unavailable")
            return []
        filesystems_info = await self._safe_dict_get(filesystems_endpoint)
        filesystems: list = filesystems_info.get("devices", [])
        return filesystems

    @_log_errors
    async def get_gateways(self) -> MutableMapping[str, Any]:
        """Return OPNsense Gateway details.

        Returns:
            MutableMapping[str, Any]: Normalized data returned by the related OPNsense endpoint.
        """
        gateway_endpoint = "/api/routes/gateway/status"
        if not await self.is_endpoint_available(gateway_endpoint):
            _LOGGER.debug("Gateway status endpoint unavailable")
            return {}
        gateways_info = await self._safe_dict_get(gateway_endpoint)
        gateways: dict[str, Any] = {}
        for gw_info in gateways_info.get("items", []):
            if isinstance(gw_info, MutableMapping) and "name" in gw_info:
                gateways[gw_info["name"]] = gw_info
        for gateway in gateways.values():
            gateway["status"] = gateway.pop("status_translated", gateway.get("status", "")).lower()
        return gateways

    @_log_errors
    async def _get_telemetry_temps(self) -> MutableMapping[str, Any]:
        """Collect temperature sensor telemetry.

        Returns:
            MutableMapping[str, Any]: Mapping containing normalized fields for downstream use.
        """
        temperature_endpoint = await self._get_endpoint_path(
            snake_case_path="/api/diagnostics/system/system_temperature",
            camel_case_path="/api/diagnostics/system/systemTemperature",
        )
        if not await self.is_endpoint_available(temperature_endpoint):
            _LOGGER.debug("Telemetry temperature endpoint unavailable")
            return {}
        temps_info = await self._safe_list_get(temperature_endpoint)
        if not len(temps_info) > 0:
            return {}
        temps: dict[str, Any] = {}
        for i, temp_info in enumerate(temps_info):
            temp: dict[str, Any] = {}
            temp["temperature"] = try_to_float(temp_info.get("temperature", 0), 0)
            temp["name"] = (
                f"{temp_info.get('type_translated', 'Num')} {temp_info.get('device_seq', i)}"
            )
            temp["device_id"] = temp_info.get("device", str(i))
            temps[temp_info.get("device", str(i)).replace(".", "_")] = temp
        return temps