Source code for glpi_python_client.clients.custom._statistics

"""Lightweight statistics helpers built from the API mixins.

The mixin exposes simple aggregations over ticket and ticket-task results
returned by the contract-aligned helpers in
:mod:`glpi_python_client.clients.api`. These operations are intentionally
kept small and do not perform name resolution or rich label formatting; the
caller can correlate the returned numeric identifiers with the dedicated
``search_*`` helpers when required.
"""

from __future__ import annotations

from collections import defaultdict
from datetime import date, timedelta
from typing import TypedDict

from glpi_python_client.clients.commons._filters import (
    rsql_all_filter,
    rsql_any_filter,
    rsql_contains_filter,
)
from glpi_python_client.clients.commons._transport import TransportMixin
from glpi_python_client.models.api_schema._common import (
    IdNameCompletenameRef,
    IdNameRef,
)
from glpi_python_client.models.api_schema.assistance._ticket import GetTicket
from glpi_python_client.models.api_schema.assistance.timeline._task import (
    GetTicketTask,
)
from glpi_python_client.models.api_schema.enums import (
    GlpiPriority,
    GlpiTicketType,
)


class TaskStatisticsResult(TypedDict):
    """Typed shape returned by :meth:`StatisticsMixin.get_task_statistics`."""

    ticket_count: int
    task_count: int
    total_duration: int
    duration_by_user: dict[str, int]
    duration_by_ticket: dict[int, int]


class TaskDurationsResult(TypedDict):
    """Typed shape returned by :meth:`StatisticsMixin.get_task_durations`."""

    start_date: str
    end_date: str
    total_duration: int
    task_count: int
    duration_by_user: dict[str, int]
    duration_by_entity: dict[str, int]
    tasks: list[dict[str, object]] | None


class UserActivityEntry(TypedDict):
    """One per-user activity bucket inside :class:`UserActivityResult`."""

    user_ids: list[int]
    tickets_as_technician: int
    tickets_as_recipient: int
    task_durations: TaskDurationsResult


class UserActivityResult(TypedDict):
    """Typed shape returned by :meth:`StatisticsMixin.get_user_activity`."""

    users: dict[str, UserActivityEntry]


class StatisticsMixin(TransportMixin):
    """Synchronous custom statistics built on the contract API mixins."""

    def get_ticket_statistics(
        self,
        *,
        start_date: str | None = None,
        end_date: str | None = None,
        default_days: int = 30,
        entity_id: int | None = None,
        entity_name: str | None = None,
        extra_filter: str | None = None,
    ) -> dict[str, object]:
        """Return ticket counts grouped by entity, status, priority, and type.

        The date window is applied to the GLPI ``date_creation`` field
        and results are aggregated locally in Python. Returned
        identifiers are the raw GLPI numeric values that callers can
        resolve with the dedicated ``search_*`` helpers when human
        labels are needed.

        Parameters
        ----------
        start_date : str | None, optional
            ISO ``YYYY-MM-DD`` start of the window (inclusive from
            00:00:00). Defaults to ``end_date - default_days + 1``
            when omitted.
        end_date : str | None, optional
            ISO ``YYYY-MM-DD`` end of the window (inclusive through
            23:59:59). Defaults to today.
        default_days : int, optional
            Span in days used when ``start_date`` is omitted (defaults
            to 30 and must be a positive integer).
        entity_id : int | None, optional
            When provided, restricts results to tickets belonging to the
            entity with this GLPI identifier.
        entity_name : str | None, optional
            When provided (and ``entity_id`` is ``None``), the name is
            resolved via ``search_entities`` and the matched entity IDs
            are used to filter tickets. If no entity matches,
            ``{"entities": {}}`` is returned immediately.
        extra_filter : str | None, optional
            Optional raw RSQL fragment to ``AND`` with the date window
            on the server side.

        Returns
        -------
        dict[str, object]
            Mapping with one ``entities`` key listing per-entity
            aggregates. Each entity bucket exposes ``total``,
            ``by_status``, ``by_priority``, and ``by_type`` counters.

        Raises
        ------
        ValueError
            If ``default_days < 1`` or ``start_date > end_date``.
        """

        start, end = _resolve_window(
            start_date=start_date,
            end_date=end_date,
            default_days=default_days,
        )

        entity_filter: str | None = None
        if entity_id is not None:
            entity_filter = f"entities_id=={entity_id}"
        elif entity_name is not None:
            name_filter = rsql_contains_filter("name", entity_name) or ""
            entities = self.search_entities(  # type: ignore[attr-defined]
                rsql_filter=name_filter,
                limit=200,
            )
            if not entities:
                return {"entities": {}}
            entity_filter = rsql_any_filter(
                *(f"entities_id=={e.id}" for e in entities if e.id is not None)
            )
        date_filter = f"date_creation=ge={start.isoformat()};"
        date_filter += f"date_creation=le={end.isoformat()} 23:59:59"
        query = rsql_all_filter(
            date_filter,
            entity_filter,
            extra_filter,
        )
        tickets: list[GetTicket] = self.search_tickets(  # type: ignore[attr-defined]
            rsql_filter=query or "",
            limit=200,
        )
        return _summarize_tickets(tickets)

    def get_task_statistics(
        self,
        ticket_ids: list[int],
    ) -> TaskStatisticsResult:
        """Return task duration totals grouped by user and ticket.

        The helper expects a list of ticket identifiers because GLPI
        does not publish a global task collection endpoint. Callers
        typically gather the relevant ticket identifiers through
        ``search_tickets`` first.

        Parameters
        ----------
        ticket_ids : list[int]
            Identifiers of the tickets whose tasks should be aggregated.
            An empty list returns zeroed totals without any HTTP call.

        Returns
        -------
        TaskStatisticsResult
            Mapping with ``ticket_count``, ``task_count``,
            ``total_duration``, ``duration_by_user``, and
            ``duration_by_ticket`` entries (durations are integer
            seconds, matching the GLPI ``duration`` field).
        """

        if not ticket_ids:
            return TaskStatisticsResult(
                ticket_count=0,
                task_count=0,
                total_duration=0,
                duration_by_user={},
                duration_by_ticket={},
            )

        results: list[list[GetTicketTask]] = [
            self.list_ticket_tasks(ticket_id)  # type: ignore[attr-defined]
            for ticket_id in ticket_ids
        ]
        flattened: list[GetTicketTask] = [task for batch in results for task in batch]
        return _summarize_tasks(ticket_ids, flattened)

    def get_task_durations(
        self,
        *,
        start_date: str | None = None,
        end_date: str | None = None,
        default_days: int = 30,
        entity_id: int | None = None,
        entity_name: str | None = None,
        user_id: int | None = None,
        user_editor_id: int | None = None,
        user_recipient_id: int | None = None,
        extra_filter: str | None = None,
        return_task_details: bool = False,
    ) -> TaskDurationsResult:
        """Return task duration totals with optional per-task detail.

        Builds an RSQL filter from the supplied parameters, collects all
        matching tickets by iterating :meth:`iter_search_tickets`, computes
        ``duration_by_entity`` by grouping :meth:`get_task_statistics`
        results against the per-ticket entity map, and optionally returns a
        flat list of individual task records.

        Parameters
        ----------
        start_date : str | None, optional
            ISO ``YYYY-MM-DD`` start of the window (inclusive from
            00:00:00). Defaults to ``end_date - default_days + 1``
            when omitted.
        end_date : str | None, optional
            ISO ``YYYY-MM-DD`` end of the window (inclusive through
            23:59:59). Defaults to today.
        default_days : int, optional
            Span in days used when ``start_date`` is omitted (defaults
            to 30 and must be a positive integer).
        entity_id : int | None, optional
            Restrict to tickets in this entity.
        entity_name : str | None, optional
            Resolve entity by name and restrict to matched entities
            (ignored when ``entity_id`` is given).
        user_id : int | None, optional
            Restrict to tickets where the user is an assignee or
            requester (OR semantics across both roles).
        user_editor_id : int | None, optional
            Restrict to tickets last updated by this user.
        user_recipient_id : int | None, optional
            Restrict to tickets where this user is the requester.
        extra_filter : str | None, optional
            Optional raw RSQL fragment appended as an AND clause.
        return_task_details : bool, optional
            When ``True``, include a ``tasks`` list of individual task
            records in the returned mapping (default ``False``).

        Returns
        -------
        TaskDurationsResult
            Mapping with ``start_date``, ``end_date``, ``total_duration``,
            ``task_count``, ``duration_by_user``, ``duration_by_entity``,
            and ``tasks`` (``None`` when ``return_task_details=False``).

        Raises
        ------
        ValueError
            If ``default_days < 1`` or ``start_date > end_date``.
        """

        start, end = _resolve_window(
            start_date=start_date,
            end_date=end_date,
            default_days=default_days,
        )
        date_filter = f"date_creation=ge={start.isoformat()};"
        date_filter += f"date_creation=le={end.isoformat()} 23:59:59"

        entity_filter: str | None = None
        if entity_id is not None:
            entity_filter = f"entities_id=={entity_id}"
        elif entity_name is not None:
            name_filter = rsql_contains_filter("name", entity_name) or ""
            entities = self.search_entities(  # type: ignore[attr-defined]
                rsql_filter=name_filter,
                limit=200,
            )
            if not entities:
                return TaskDurationsResult(
                    start_date=start.isoformat(),
                    end_date=end.isoformat(),
                    total_duration=0,
                    task_count=0,
                    duration_by_user={},
                    duration_by_entity={},
                    tasks=None,
                )
            entity_filter = rsql_any_filter(
                *(f"entities_id=={e.id}" for e in entities if e.id is not None)
            )

        user_filter: str | None = None
        if user_id is not None:
            user_filter = rsql_any_filter(
                f"users_id_assign=={user_id}",
                f"users_id_requester=={user_id}",
            )

        editor_filter: str | None = None
        if user_editor_id is not None:
            editor_filter = f"users_id_lastupdater=={user_editor_id}"

        recipient_filter: str | None = None
        if user_recipient_id is not None:
            recipient_filter = f"users_id_requester=={user_recipient_id}"

        rsql_filter = (
            rsql_all_filter(
                date_filter,
                entity_filter,
                user_filter,
                editor_filter,
                recipient_filter,
                extra_filter,
            )
            or ""
        )

        ticket_ids: list[int] = []
        ticket_entity_map: dict[int, str] = {}
        for batch in self.iter_search_tickets(  # type: ignore[attr-defined]
            rsql_filter,
            batch_size=200,
        ):
            for ticket in batch:
                if ticket.id is not None:
                    ticket_ids.append(ticket.id)
                    ticket_entity_map[ticket.id] = _entity_key(ticket.entity)

        result = self.get_task_statistics(ticket_ids)
        duration_by_ticket = result["duration_by_ticket"]

        duration_by_entity: defaultdict[str, int] = defaultdict(int)
        for tid, dur in duration_by_ticket.items():
            entity_key = ticket_entity_map.get(int(tid), "unknown")
            duration_by_entity[entity_key] += int(dur)

        task_details: list[dict[str, object]] | None = None
        if return_task_details:
            task_details = []
            for tid, dur in duration_by_ticket.items():
                if int(dur) == 0:
                    continue
                for task in self.list_ticket_tasks(int(tid)):  # type: ignore[attr-defined]
                    task_details.append(
                        {
                            "task_id": task.id,
                            "ticket_id": int(tid),
                            "duration": int(task.duration or 0),
                            "user_id": task.user.id if task.user else None,
                            "user_name": task.user.name if task.user else None,
                            "date": str(task.date_creation or ""),
                        }
                    )

        return TaskDurationsResult(
            start_date=start.isoformat(),
            end_date=end.isoformat(),
            total_duration=int(result["total_duration"]),
            task_count=int(result["task_count"]),
            duration_by_user=result["duration_by_user"],
            duration_by_entity=dict(duration_by_entity),
            tasks=task_details,
        )

    def get_user_activity(
        self,
        *,
        user_id: int | None = None,
        username: str | None = None,
        realname: str | None = None,
        firstname: str | None = None,
        start_date: str | None = None,
        end_date: str | None = None,
        default_days: int = 30,
    ) -> UserActivityResult:
        """Return per-user GLPI activity aggregated across tickets and tasks.

        Aggregates tickets where each matched user is an assignee, tickets
        where the user is a requester, and task durations over the requested
        date window. When multiple users resolve to the same display key
        their results are merged.

        Parameters
        ----------
        user_id : int | None, optional
            Identify the user by GLPI numeric identifier.
        username : str | None, optional
            Filter by username (substring match).
        realname : str | None, optional
            Filter by family name (substring match).
        firstname : str | None, optional
            Filter by given name (substring match).
        start_date : str | None, optional
            ISO ``YYYY-MM-DD`` start of the activity window (inclusive
            from 00:00:00).
        end_date : str | None, optional
            ISO ``YYYY-MM-DD`` end of the activity window (inclusive
            through 23:59:59). Defaults to today.
        default_days : int, optional
            Span in days used when ``start_date`` is omitted (default 30).

        Returns
        -------
        UserActivityResult
            Mapping with one ``users`` key. Each user key maps to a
            :class:`UserActivityEntry` with ``user_ids``,
            ``tickets_as_technician``, ``tickets_as_recipient``, and
            ``task_durations``.

        Raises
        ------
        ValueError
            If none of ``user_id``, ``username``, ``realname``, or
            ``firstname`` are supplied, or if the supplied criteria match
            no GLPI users.
        """

        if all(v is None for v in (user_id, username, realname, firstname)):
            raise ValueError(
                "At least one of user_id, username, realname, or "
                "firstname must be supplied"
            )

        start, end = _resolve_window(
            start_date=start_date,
            end_date=end_date,
            default_days=default_days,
        )

        if user_id is not None:
            resolved_user_ids: list[int] = [user_id]
            user_display_map: dict[int, str] = {user_id: str(user_id)}
        else:
            name_parts = [
                rsql_contains_filter("username", username) if username else None,
                rsql_contains_filter("realname", realname) if realname else None,
                rsql_contains_filter("firstname", firstname) if firstname else None,
            ]
            user_rsql = rsql_all_filter(*name_parts) or ""
            matched_users = self.search_users(  # type: ignore[attr-defined]
                rsql_filter=user_rsql,
                limit=200,
            )
            if not matched_users:
                raise ValueError("No users matched the supplied criteria")
            resolved_user_ids = [u.id for u in matched_users if u.id is not None]
            user_display_map = {
                u.id: (
                    f"{u.firstname or ''} {u.realname or ''}".strip()
                    or u.username
                    or str(u.id)
                )
                for u in matched_users
                if u.id is not None
            }

        date_range = f"date_creation=ge={start.isoformat()};"
        date_range += f"date_creation=le={end.isoformat()} 23:59:59"

        users_output: dict[str, UserActivityEntry] = {}
        for uid in resolved_user_ids:
            display_key = user_display_map.get(uid, str(uid))
            tech_count = 0
            for batch in self.iter_search_tickets(  # type: ignore[attr-defined]
                f"users_id_assign=={uid};{date_range}",
                batch_size=200,
            ):
                tech_count += len(batch)
            recipient_count = 0
            for batch in self.iter_search_tickets(  # type: ignore[attr-defined]
                f"users_id_requester=={uid};{date_range}",
                batch_size=200,
            ):
                recipient_count += len(batch)
            task_dur = self.get_task_durations(
                start_date=start_date,
                end_date=end_date,
                default_days=default_days,
                user_id=uid,
            )
            # Drop the optional ``tasks`` payload before storing on the
            # per-user entry; the activity summary keeps only aggregated
            # counters per user.
            task_dur_clean: TaskDurationsResult = TaskDurationsResult(
                start_date=task_dur["start_date"],
                end_date=task_dur["end_date"],
                total_duration=task_dur["total_duration"],
                task_count=task_dur["task_count"],
                duration_by_user=dict(task_dur["duration_by_user"]),
                duration_by_entity=dict(task_dur["duration_by_entity"]),
                tasks=None,
            )

            if display_key in users_output:
                existing = users_output[display_key]
                existing["user_ids"] = [*existing["user_ids"], uid]
                existing["tickets_as_technician"] += tech_count
                existing["tickets_as_recipient"] += recipient_count
                existing["task_durations"] = _merge_task_durations(
                    existing["task_durations"], task_dur_clean
                )
            else:
                users_output[display_key] = UserActivityEntry(
                    user_ids=[uid],
                    tickets_as_technician=tech_count,
                    tickets_as_recipient=recipient_count,
                    task_durations=task_dur_clean,
                )

        return UserActivityResult(users=users_output)


def _merge_task_durations(
    prev: TaskDurationsResult, new: TaskDurationsResult
) -> TaskDurationsResult:
    """Merge two task-duration aggregates summing every counter.

    The returned ``start_date`` / ``end_date`` are inherited from
    ``prev`` since the helper is only used to fold per-user results that
    were computed over the same window. The ``tasks`` payload is dropped
    because the merged aggregate is part of a user activity report and
    not a detail listing.
    """

    merged_by_user: dict[str, int] = dict(prev["duration_by_user"])
    for k, v in new["duration_by_user"].items():
        merged_by_user[k] = merged_by_user.get(k, 0) + int(v)
    merged_by_entity: dict[str, int] = dict(prev["duration_by_entity"])
    for k, v in new["duration_by_entity"].items():
        merged_by_entity[k] = merged_by_entity.get(k, 0) + int(v)
    return TaskDurationsResult(
        start_date=prev["start_date"],
        end_date=prev["end_date"],
        total_duration=prev["total_duration"] + new["total_duration"],
        task_count=prev["task_count"] + new["task_count"],
        duration_by_user=merged_by_user,
        duration_by_entity=merged_by_entity,
        tasks=None,
    )


def _resolve_window(
    *,
    start_date: str | None,
    end_date: str | None,
    default_days: int,
) -> tuple[date, date]:
    """Resolve a date window from optional ISO inputs and a default span.

    Validation matches the legacy analytics helper: positive default span,
    parsed ISO dates, and ``start <= end``.
    """

    if default_days < 1:
        raise ValueError("default_days must be a positive integer")
    parsed_end = date.fromisoformat(end_date) if end_date else date.today()
    parsed_start = (
        date.fromisoformat(start_date)
        if start_date
        else parsed_end - timedelta(days=default_days - 1)
    )
    if parsed_start > parsed_end:
        raise ValueError("start_date must be less than or equal to end_date")
    return parsed_start, parsed_end


def _summarize_tickets(tickets: list[GetTicket]) -> dict[str, object]:
    """Group tickets by entity and break each entity down by attribute."""

    entities: dict[str, dict[str, object]] = defaultdict(
        lambda: {
            "total": 0,
            "by_status": defaultdict(int),
            "by_priority": defaultdict(int),
            "by_type": defaultdict(int),
        }
    )
    for ticket in tickets:
        entity_key = _entity_key(ticket.entity)
        bucket = entities[entity_key]
        bucket["total"] = int(bucket["total"]) + 1  # type: ignore[call-overload]
        _count_status(bucket["by_status"], ticket.status)  # type: ignore[arg-type]
        _count_enum(bucket["by_priority"], ticket.priority, GlpiPriority)  # type: ignore[arg-type]
        _count_enum(bucket["by_type"], ticket.type, GlpiTicketType)  # type: ignore[arg-type]
    return {"entities": {key: _freeze_bucket(value) for key, value in entities.items()}}


def _summarize_tasks(
    ticket_ids: list[int], tasks: list[GetTicketTask]
) -> TaskStatisticsResult:
    """Aggregate one task list by user and parent ticket identifier."""

    duration_by_user: defaultdict[str, int] = defaultdict(int)
    duration_by_ticket: defaultdict[int, int] = defaultdict(int)
    total_duration = 0
    for task in tasks:
        duration = int(task.duration or 0)
        total_duration += duration
        duration_by_user[_user_key(task.user)] += duration
        if task.tickets_id is not None:
            duration_by_ticket[task.tickets_id] += duration
    return TaskStatisticsResult(
        ticket_count=len(ticket_ids),
        task_count=len(tasks),
        total_duration=total_duration,
        duration_by_user=dict(duration_by_user),
        duration_by_ticket=dict(duration_by_ticket),
    )


def _entity_key(entity: IdNameCompletenameRef | None) -> str:
    """Return one stable identifier string for the provided entity reference.

    Numeric entity identifiers are preferred so the output stays stable when
    the entity name changes between runs.
    """

    if entity is None:
        return "unknown"
    if entity.id is not None:
        return str(entity.id)
    return entity.name or "unknown"


def _user_key(user: IdNameRef | None) -> str:
    """Return one stable identifier string for the provided user reference."""

    if user is None:
        return "unknown"
    if user.id is not None:
        return str(user.id)
    return user.name or "unknown"


def _count_status(counter: defaultdict[str, int], status: IdNameRef | None) -> None:
    """Increment one status counter using the GLPI numeric identifier."""

    if status is None:
        counter["UNKNOWN"] += 1
        return
    counter[str(status.id) if status.id is not None else status.name or "UNKNOWN"] += 1


def _count_enum(counter: defaultdict[str, int], value: object, enum_type: type) -> None:
    """Increment one counter using the IntEnum member name when possible."""

    if value is None:
        counter["UNKNOWN"] += 1
        return
    try:
        counter[enum_type(value).name] += 1
    except ValueError:
        counter[str(value)] += 1


def _freeze_bucket(bucket: dict[str, object]) -> dict[str, object]:
    """Convert defaultdict counters into plain dicts for the public output."""

    return {
        "total": bucket["total"],
        "by_status": dict(bucket["by_status"]),  # type: ignore[call-overload]
        "by_priority": dict(bucket["by_priority"]),  # type: ignore[call-overload]
        "by_type": dict(bucket["by_type"]),  # type: ignore[call-overload]
    }


__all__ = ["StatisticsMixin"]