Source code for glpi_python_client.clients.custom._statistics_async

"""Asynchronous overrides for the statistics aggregation helper.

The async mixin overrides :meth:`get_ticket_statistics`,
:meth:`get_task_statistics`, :meth:`get_task_durations`, and
:meth:`get_user_activity` so that every internal ``self.*`` call that
would otherwise be dispatched as a bridge-wrapped coroutine is properly
awaited. Without these overrides the bridge runs the synchronous mixin
bodies in a worker thread where the bridge-wrapped methods on ``self``
return unawaited coroutines instead of data, producing a
``RuntimeWarning: coroutine … was never awaited`` error.

:meth:`get_task_statistics` and :meth:`get_task_durations` additionally
dispatch their per-ticket ``list_ticket_tasks`` fan-outs concurrently
using :func:`asyncio.gather`.
"""

from __future__ import annotations

import asyncio

from glpi_python_client.clients.custom._statistics import (
    StatisticsMixin,
    TaskDurationsResult,
    TaskStatisticsResult,
    UserActivityResult,
    _entity_key,
    _summarize_tasks,
)
from glpi_python_client.models.api_schema.assistance.timeline._task import (
    GetTicketTask,
)


class AsyncStatisticsMixin(StatisticsMixin):
    """Asynchronous custom statistics with concurrent task fan-out.

    The override calls the bridge-wrapped ``list_ticket_tasks`` for each
    ticket identifier and awaits the resulting coroutines together via
    :func:`asyncio.gather`. Empty inputs return zeroed totals without
    any HTTP traffic.
    """

    async def get_task_statistics(  # type: ignore[override]
        self,
        ticket_ids: list[int],
    ) -> TaskStatisticsResult:
        """Return task duration totals with concurrent per-ticket fetches.

        Parameters
        ----------
        ticket_ids : list[int]
            Identifiers of the tickets whose tasks should be aggregated.
            An empty list returns zeroed totals without any HTTP call.

        Returns
        -------
        TaskStatisticsResult
            Mapping with ``ticket_count``, ``task_count``,
            ``total_duration``, ``duration_by_user``, and
            ``duration_by_ticket`` entries.
        """

        if not ticket_ids:
            return TaskStatisticsResult(
                ticket_count=0,
                task_count=0,
                total_duration=0,
                duration_by_user={},
                duration_by_ticket={},
            )
        results = await asyncio.gather(
            *(
                self.list_ticket_tasks(ticket_id)  # type: ignore[attr-defined]
                for ticket_id in ticket_ids
            )
        )
        flattened: list[GetTicketTask] = [task for batch in results for task in batch]
        return _summarize_tasks(ticket_ids, flattened)

    async def get_task_durations(  # type: ignore[override]
        self,
        *,
        start_date: str | None = None,
        end_date: str | None = None,
        default_days: int = 30,
        entity_id: int | None = None,
        entity_name: str | None = None,
        user_id: int | None = None,
        user_editor_id: int | None = None,
        user_recipient_id: int | None = None,
        extra_filter: str | None = None,
        return_task_details: bool = False,
    ) -> TaskDurationsResult:
        """Return task duration totals with concurrent per-ticket fetches.

        Overrides the synchronous implementation so that when
        ``return_task_details=True`` the per-ticket
        :meth:`list_ticket_tasks` calls are dispatched concurrently using
        :func:`asyncio.gather`. The date-window, entity, and user filter
        logic is identical to the synchronous version.

        Parameters
        ----------
        start_date : str | None, optional
            ISO ``YYYY-MM-DD`` start of the window (inclusive from
            00:00:00).
        end_date : str | None, optional
            ISO ``YYYY-MM-DD`` end of the window (inclusive through
            23:59:59). Defaults to today.
        default_days : int, optional
            Span in days used when ``start_date`` is omitted (default 30).
        entity_id : int | None, optional
            Restrict to tickets in this entity.
        entity_name : str | None, optional
            Resolve entity by name and restrict to matched entities.
        user_id : int | None, optional
            Restrict to tickets where the user is assignee or requester.
        user_editor_id : int | None, optional
            Restrict to tickets last updated by this user.
        user_recipient_id : int | None, optional
            Restrict to tickets where this user is the requester.
        extra_filter : str | None, optional
            Optional raw RSQL fragment appended as an AND clause.
        return_task_details : bool, optional
            When ``True``, fan-out per-ticket task fetches concurrently
            and include a ``tasks`` list in the result.

        Returns
        -------
        TaskDurationsResult
            Same shape as the synchronous :meth:`get_task_durations`.
        """

        from collections import defaultdict

        from glpi_python_client.clients.commons._filters import (
            rsql_all_filter,
            rsql_any_filter,
            rsql_contains_filter,
        )
        from glpi_python_client.clients.custom._statistics import _resolve_window

        start, end = _resolve_window(
            start_date=start_date,
            end_date=end_date,
            default_days=default_days,
        )
        date_filter = f"date_creation=ge={start.isoformat()};"
        date_filter += f"date_creation=le={end.isoformat()} 23:59:59"
        entity_filter: str | None = None
        if entity_id is not None:
            entity_filter = f"entities_id=={entity_id}"
        elif entity_name is not None:
            name_filter = rsql_contains_filter("name", entity_name) or ""
            entities = await self.search_entities(  # type: ignore[attr-defined]
                rsql_filter=name_filter,
                limit=200,
            )
            if not entities:
                return TaskDurationsResult(
                    start_date=start.isoformat(),
                    end_date=end.isoformat(),
                    total_duration=0,
                    task_count=0,
                    duration_by_user={},
                    duration_by_entity={},
                    tasks=None,
                )
            entity_filter = rsql_any_filter(
                *(f"entities_id=={e.id}" for e in entities if e.id is not None)
            )

        user_filter: str | None = None
        if user_id is not None:
            user_filter = rsql_any_filter(
                f"users_id_assign=={user_id}",
                f"users_id_requester=={user_id}",
            )

        editor_filter: str | None = None
        if user_editor_id is not None:
            editor_filter = f"users_id_lastupdater=={user_editor_id}"

        recipient_filter: str | None = None
        if user_recipient_id is not None:
            recipient_filter = f"users_id_requester=={user_recipient_id}"

        rsql_filter = (
            rsql_all_filter(
                date_filter,
                entity_filter,
                user_filter,
                editor_filter,
                recipient_filter,
                extra_filter,
            )
            or ""
        )

        ticket_ids: list[int] = []
        ticket_entity_map: dict[int, str] = {}
        async for batch in self.iter_search_tickets(  # type: ignore[attr-defined]
            rsql_filter,
            batch_size=200,
        ):
            for ticket in batch:
                if ticket.id is not None:
                    ticket_ids.append(ticket.id)
                    ticket_entity_map[ticket.id] = _entity_key(ticket.entity)

        result = await self.get_task_statistics(ticket_ids)

        duration_by_entity: defaultdict[str, int] = defaultdict(int)
        for tid, dur in result["duration_by_ticket"].items():
            entity_key = ticket_entity_map.get(int(tid), "unknown")
            duration_by_entity[entity_key] += int(dur)

        task_details: list[dict[str, object]] | None = None
        if return_task_details:
            tasks_per_ticket: list[list[GetTicketTask]] = await asyncio.gather(
                *(
                    self.list_ticket_tasks(int(tid))  # type: ignore[attr-defined]
                    for tid, dur in result["duration_by_ticket"].items()
                    if int(dur) > 0
                )
            )
            non_zero_tids = [
                int(tid)
                for tid, dur in result["duration_by_ticket"].items()
                if int(dur) > 0
            ]
            task_details = []
            for tid, tasks in zip(non_zero_tids, tasks_per_ticket, strict=True):
                for task in tasks:
                    task_details.append(
                        {
                            "task_id": task.id,
                            "ticket_id": tid,
                            "duration": int(task.duration or 0),
                            "user_id": task.user.id if task.user else None,
                            "user_name": task.user.name if task.user else None,
                            "date": str(task.date_creation or ""),
                        }
                    )

        return TaskDurationsResult(
            start_date=start.isoformat(),
            end_date=end.isoformat(),
            total_duration=result["total_duration"],
            task_count=result["task_count"],
            duration_by_user=result["duration_by_user"],
            duration_by_entity=dict(duration_by_entity),
            tasks=task_details,
        )

    async def get_ticket_statistics(  # type: ignore[override]
        self,
        *,
        start_date: str | None = None,
        end_date: str | None = None,
        default_days: int = 30,
        entity_id: int | None = None,
        entity_name: str | None = None,
        extra_filter: str | None = None,
    ) -> dict[str, object]:
        """Return ticket counts grouped by entity, status, priority, and type.

        Async override of :meth:`StatisticsMixin.get_ticket_statistics`.
        The synchronous base runs in a worker thread where ``self.search_tickets``
        and ``self.search_entities`` resolve to bridge-wrapped coroutines that
        would be silently dropped. This override awaits those calls directly on
        the event loop instead.

        Parameters
        ----------
        start_date : str | None, optional
            ISO ``YYYY-MM-DD`` start of the window (inclusive from
            00:00:00).
        end_date : str | None, optional
            ISO ``YYYY-MM-DD`` end of the window (inclusive through
            23:59:59). Defaults to today.
        default_days : int, optional
            Span in days used when ``start_date`` is omitted (default 30).
        entity_id : int | None, optional
            Restrict to tickets in this entity.
        entity_name : str | None, optional
            Resolve entity by name and restrict to matched entities.
        extra_filter : str | None, optional
            Optional raw RSQL fragment appended as an AND clause.

        Returns
        -------
        dict[str, object]
            Same shape as the synchronous :meth:`get_ticket_statistics`.

        Raises
        ------
        ValueError
            If ``default_days < 1`` or ``start_date > end_date``.
        """

        from glpi_python_client.clients.commons._filters import (
            rsql_all_filter,
            rsql_any_filter,
            rsql_contains_filter,
        )
        from glpi_python_client.clients.custom._statistics import (
            _resolve_window,
            _summarize_tickets,
        )

        start, end = _resolve_window(
            start_date=start_date,
            end_date=end_date,
            default_days=default_days,
        )

        entity_filter: str | None = None
        if entity_id is not None:
            entity_filter = f"entities_id=={entity_id}"
        elif entity_name is not None:
            name_filter = rsql_contains_filter("name", entity_name) or ""
            entities = await self.search_entities(  # type: ignore[attr-defined]
                rsql_filter=name_filter,
                limit=200,
            )
            if not entities:
                return {"entities": {}}
            entity_filter = rsql_any_filter(
                *(f"entities_id=={e.id}" for e in entities if e.id is not None)
            )

        date_filter = f"date_creation=ge={start.isoformat()};"
        date_filter += f"date_creation=le={end.isoformat()} 23:59:59"
        query = rsql_all_filter(
            date_filter,
            entity_filter,
            extra_filter,
        )
        tickets = await self.search_tickets(  # type: ignore[attr-defined]
            rsql_filter=query or "",
            limit=200,
        )
        return _summarize_tickets(tickets)

    async def get_user_activity(  # type: ignore[override]
        self,
        *,
        user_id: int | None = None,
        username: str | None = None,
        realname: str | None = None,
        firstname: str | None = None,
        start_date: str | None = None,
        end_date: str | None = None,
        default_days: int = 30,
    ) -> UserActivityResult:
        """Return per-user GLPI activity aggregated across tickets and tasks.

        Async override of :meth:`StatisticsMixin.get_user_activity`. The
        synchronous base calls ``self.search_users``, ``self.iter_search_tickets``,
        and ``self.get_task_durations`` through ``self``, which on the async
        client resolve to bridge-wrapped coroutines. This override awaits those
        calls and uses ``async for`` on the async generator.

        Parameters
        ----------
        user_id : int | None, optional
            Identify the user by GLPI numeric identifier.
        username : str | None, optional
            Filter by username (substring match).
        realname : str | None, optional
            Filter by family name (substring match).
        firstname : str | None, optional
            Filter by given name (substring match).
        start_date : str | None, optional
            ISO ``YYYY-MM-DD`` start of the activity window (inclusive
            from 00:00:00).
        end_date : str | None, optional
            ISO ``YYYY-MM-DD`` end of the activity window (inclusive
            through 23:59:59). Defaults to today.
        default_days : int, optional
            Span in days used when ``start_date`` is omitted (default 30).

        Returns
        -------
        UserActivityResult
            Same shape as the synchronous :meth:`get_user_activity`.

        Raises
        ------
        ValueError
            If none of ``user_id``, ``username``, ``realname``, or
            ``firstname`` are supplied, or if the supplied criteria match
            no GLPI users.
        """

        from glpi_python_client.clients.commons._filters import (
            rsql_all_filter,
            rsql_contains_filter,
        )
        from glpi_python_client.clients.custom._statistics import (
            UserActivityEntry,
            UserActivityResult,
            _merge_task_durations,
            _resolve_window,
        )

        if all(v is None for v in (user_id, username, realname, firstname)):
            raise ValueError(
                "At least one of user_id, username, realname, or "
                "firstname must be supplied"
            )

        start, end = _resolve_window(
            start_date=start_date,
            end_date=end_date,
            default_days=default_days,
        )

        if user_id is not None:
            resolved_user_ids: list[int] = [user_id]
            user_display_map: dict[int, str] = {user_id: str(user_id)}
        else:
            name_parts = [
                rsql_contains_filter("username", username) if username else None,
                rsql_contains_filter("realname", realname) if realname else None,
                rsql_contains_filter("firstname", firstname) if firstname else None,
            ]
            user_rsql = rsql_all_filter(*name_parts) or ""
            matched_users = await self.search_users(  # type: ignore[attr-defined]
                rsql_filter=user_rsql,
                limit=200,
            )
            if not matched_users:
                raise ValueError("No users matched the supplied criteria")
            resolved_user_ids = [u.id for u in matched_users if u.id is not None]
            user_display_map = {
                u.id: (
                    f"{u.firstname or ''} {u.realname or ''}".strip()
                    or u.username
                    or str(u.id)
                )
                for u in matched_users
                if u.id is not None
            }

        date_range = f"date_creation=ge={start.isoformat()};"
        date_range += f"date_creation=le={end.isoformat()} 23:59:59"

        users_output: dict[str, UserActivityEntry] = {}
        for uid in resolved_user_ids:
            display_key = user_display_map.get(uid, str(uid))
            tech_count = 0
            async for batch in self.iter_search_tickets(  # type: ignore[attr-defined]
                f"users_id_assign=={uid};{date_range}",
                batch_size=200,
            ):
                tech_count += len(batch)
            recipient_count = 0
            async for batch in self.iter_search_tickets(  # type: ignore[attr-defined]
                f"users_id_requester=={uid};{date_range}",
                batch_size=200,
            ):
                recipient_count += len(batch)
            task_dur = await self.get_task_durations(
                start_date=start_date,
                end_date=end_date,
                default_days=default_days,
                user_id=uid,
            )
            task_dur_clean: TaskDurationsResult = TaskDurationsResult(
                start_date=task_dur["start_date"],
                end_date=task_dur["end_date"],
                total_duration=task_dur["total_duration"],
                task_count=task_dur["task_count"],
                duration_by_user=dict(task_dur["duration_by_user"]),
                duration_by_entity=dict(task_dur["duration_by_entity"]),
                tasks=None,
            )

            if display_key in users_output:
                existing = users_output[display_key]
                existing["user_ids"] = [*existing["user_ids"], uid]
                existing["tickets_as_technician"] += tech_count
                existing["tickets_as_recipient"] += recipient_count
                existing["task_durations"] = _merge_task_durations(
                    existing["task_durations"], task_dur_clean
                )
            else:
                users_output[display_key] = UserActivityEntry(
                    user_ids=[uid],
                    tickets_as_technician=tech_count,
                    tickets_as_recipient=recipient_count,
                    task_durations=task_dur_clean,
                )

        return UserActivityResult(users=users_output)


__all__ = ["AsyncStatisticsMixin"]