Source code for glpi_python_client.clients.commons._async_bridge

"""Synchronous-to-asynchronous bridge for the GLPI client.

The bridge inspects every public sync method exposed by the bases of a
subclass, then installs a coroutine wrapper on the subclass that defers
the blocking call to a worker thread. This keeps the synchronous client
as the single source of truth while still exposing a fully asynchronous
public surface.

Concurrency notes
-----------------
* Each wrapped call runs on a worker thread, which means concurrent
  callers contend on OS threads rather than only on the event loop. The
  underlying transport mixin protects shared state with a
  :class:`threading.Lock` for that reason.
* When many coroutines fan out at once (for example through
  :func:`asyncio.gather`) the default :func:`asyncio.to_thread`
  executor can become a bottleneck. The
  :class:`~glpi_python_client.clients.AsyncGlpiClient` exposes an
  optional ``executor`` constructor argument that callers can use to
  supply a dedicated :class:`concurrent.futures.ThreadPoolExecutor`.
* Cancellation is best-effort: cancelling the awaiting coroutine
  releases the awaiter immediately, but the in-flight HTTP request keeps
  running on the worker thread until ``requests`` returns. This matches
  the behaviour of the original async client.

Known limitation — internal ``self``-calls
------------------------------------------
The bridge wraps *every* public method on the async client class. As a
result, when a synchronous body that is running inside a worker thread
calls another public method through ``self`` (e.g.
``self.search_tickets(...)``), it resolves to the *bridge-wrapped*
coroutine, not the synchronous function. Calling a coroutine without
``await`` produces a dangling coroutine object, not data.

Any sync method (or generator) that internally calls other public
methods through ``self`` must therefore be given a hand-written async
override that ``await``s (or ``async for``s) those calls on the event
loop. The convention used in this codebase is to place such overrides
in a ``_*_async.py`` companion module (e.g.
``clients/custom/_statistics_async.py``) and wire them into the async
client's MRO *before* the sync mixin that defines the original method.
"""

from __future__ import annotations

import asyncio
import functools
import inspect
from collections.abc import Callable
from concurrent.futures import Executor
from typing import Any

# Sentinel used by the async-generator bridge to signal exhaustion without
# propagating StopIteration through a coroutine (which PEP 479 forbids).
_STOPPED: object = object()


def _next_or_stopped(gen: Any) -> Any:
    """Return the next item from *gen* or ``_STOPPED`` when exhausted."""

    try:
        return next(gen)
    except StopIteration:
        return _STOPPED


[docs] class AsyncBridge: """Base class that converts inherited sync methods into coroutines. The bridge is intended to be mixed into the most-derived async client class **before** the sync mixins so its :meth:`__init_subclass__` hook can observe the full MRO and install coroutine wrappers on the subclass for every public method that the sync mixins expose. Subclasses may also assign a :class:`concurrent.futures.Executor` instance to ``_executor`` to route every wrapped call through a dedicated pool. When ``_executor`` is ``None`` the bridge falls back to :func:`asyncio.to_thread`, which uses the default loop executor. """ _executor: Executor | None = None def __init_subclass__(cls, **kwargs: object) -> None: """Install async wrappers for every inherited public sync method. The hook walks the resolution order from the most-derived sync base downwards and skips: * the bridge class itself and :class:`object`, * names that start with an underscore (private/protected), * attributes that are not callable, and * attributes that are already coroutine functions (so the subclass may declare hand-written async overrides such as :meth:`get_ticket_context`). Each surviving method is wrapped with a coroutine that defers the blocking call to a worker thread. """ super().__init_subclass__(**kwargs) seen: set[str] = set() for base in cls.__mro__: if base in (cls, AsyncBridge, object): continue for name, member in vars(base).items(): if name in seen or name.startswith("_"): continue if not callable(member) or inspect.iscoroutinefunction(member): continue if inspect.isasyncgenfunction(member): continue # Skip if the subclass already overrides the method with # a coroutine function or async generator (for example async fan-outs). existing = getattr(cls, name, None) if existing is not None and ( inspect.iscoroutinefunction(existing) or inspect.isasyncgenfunction(existing) ): seen.add(name) continue seen.add(name) if inspect.isgeneratorfunction(member): setattr(cls, name, _make_async_generator_wrapper(member)) else: setattr(cls, name, _make_async_wrapper(member))
def _make_async_wrapper(sync_func: Callable[..., Any]) -> Callable[..., Any]: """Return one coroutine wrapper that runs ``sync_func`` off-thread. Parameters ---------- sync_func : Callable[..., Any] Synchronous callable inherited from a sync mixin. Returns ------- Callable[..., Any] Coroutine function that schedules ``sync_func`` on the bound client's executor (or :func:`asyncio.to_thread` when no executor is configured) and awaits its result. """ @functools.wraps(sync_func) async def wrapper(self: AsyncBridge, *args: Any, **kwargs: Any) -> Any: bound = functools.partial(sync_func, self, *args, **kwargs) if self._executor is not None: loop = asyncio.get_running_loop() return await loop.run_in_executor(self._executor, bound) return await asyncio.to_thread(bound) return wrapper def _make_async_generator_wrapper(sync_func: Callable[..., Any]) -> Callable[..., Any]: """Return an async generator wrapper for a synchronous generator function. Each call to ``next()`` on the underlying sync generator is dispatched to a worker thread so that the blocking HTTP call inside the generator body does not block the event loop. Parameters ---------- sync_func : Callable[..., Any] Synchronous generator function inherited from a sync mixin. Returns ------- Callable[..., Any] Async generator function that yields the same items as the synchronous generator, one batch at a time, off the event loop. """ @functools.wraps(sync_func) async def wrapper(self: AsyncBridge, *args: Any, **kwargs: Any) -> Any: gen = sync_func(self, *args, **kwargs) while True: if self._executor is not None: loop = asyncio.get_running_loop() item = await loop.run_in_executor(self._executor, _next_or_stopped, gen) else: item = await asyncio.to_thread(_next_or_stopped, gen) if item is _STOPPED: return yield item return wrapper __all__ = ["AsyncBridge"]