Source code for svcs._core

# SPDX-FileCopyrightText: 2023 Hynek Schlawack <hs@ox.cx>
#
# SPDX-License-Identifier: MIT

from __future__ import annotations

import inspect
import logging
import sys
import warnings

from collections.abc import Awaitable, Callable, Iterator
from contextlib import (
    AbstractAsyncContextManager,
    AbstractContextManager,
    asynccontextmanager,
    contextmanager,
    suppress,
)
from inspect import (
    isasyncgenfunction,
    isawaitable,
    iscoroutine,
    iscoroutinefunction,
    isgeneratorfunction,
)
from types import TracebackType
from typing import Any, TypeAlias, TypeVar, overload
from unittest.mock import MagicMock

import attrs

from .exceptions import ServiceNotFoundError


if sys.version_info < (3, 15):
    from typing_extensions import TypeForm as TypeForm  # noqa: PLC0414
else:
    from typing import TypeForm as TypeForm  # noqa: PLC0414


_ServiceType: TypeAlias = TypeForm[Any]


log = logging.getLogger("svcs")


def _full_name(obj: object) -> str:
    try:
        return f"{obj.__module__}.{obj.__qualname__}"  # type: ignore[attr-defined]  # ty: ignore[unresolved-attribute]
    except AttributeError:
        return repr(obj)


# Default names where to put the container and registry in integrations.
_KEY_REGISTRY = "svcs_registry"
_KEY_CONTAINER = "svcs_container"


[docs] @attrs.frozen class RegisteredService: """ A recipe for creating a service. .. warning:: Strictly read-only. Attributes: svc_type: The type under which the type has been registered. factory: Callable that creates the service. takes_container: Whether the factory takes a container as its first argument. enter: Whether context managers returned by the factory are entered. ping: See :ref:`health`. suppress_context_exit: Whether to suppress errors raised in the container context when exiting registered factories. """ svc_type: _ServiceType factory: Callable = attrs.field(hash=False) takes_container: bool enter: bool ping: Callable | None = attrs.field(hash=False) suppress_context_exit: bool @property def name(self) -> str: return _full_name(self.svc_type) def __repr__(self) -> str: return ( f"<RegisteredService(svc_type=" f"{self.name}, " f"factory={self.factory}, " f"takes_container={self.takes_container}, " f"enter={self.enter}, " f"has_ping={self.ping is not None}, " f"suppress_context_exit={self.suppress_context_exit}" ")>" ) def close( self, cm: AbstractContextManager, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> None: """ Close the context manager *cm* with the given exception information. """ if self.suppress_context_exit: cm.__exit__(None, None, None) else: cm.__exit__(exc_type, exc_val, exc_tb) async def aclose( self, cm: AbstractAsyncContextManager, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> None: """ Close the async context manager *cm* with the given exception information. """ if self.suppress_context_exit: await cm.__aexit__(None, None, None) else: await cm.__aexit__(exc_type, exc_val, exc_tb)
[docs] @attrs.frozen class ServicePing: """ A service health check as returned by :meth:`svcs.Container.get_pings`. Attributes: name: A fully-qualified name of the service type. is_async: Whether the service needs to be pinged using :meth:`aping`. See Also: :ref:`health` """ name: str is_async: bool _svc_type: _ServiceType _ping: Callable _container: Container
[docs] def ping(self) -> None: """ Acquire the service, schedule its cleanup, and call its ping callable with the acquired service as its only argument. """ svc: Any = self._container.get(self._svc_type) self._ping(svc)
[docs] async def aping(self) -> None: """ Same as :meth:`ping` but acquire and/or ping asynchronously, if necessary. Also works with synchronous services, so in an async application, just use this. """ svc: Any = await self._container.aget(self._svc_type) if self.is_async: await self._ping(svc) else: self._ping(svc)
[docs] @attrs.define class Registry: r""" A central registry of recipes for creating services. An instance of this should live as long as your application does. Also works as a context manager that runs ``on_registry_close`` callbacks on exit: .. doctest:: >>> import svcs >>> with svcs.Registry() as reg: ... reg.register_value( ... int, 42, ... on_registry_close=lambda: print("closed!") ... ) closed! ``async with`` is also supported. Warns: ResourceWarning: If a registry with pending cleanups is garbage-collected. .. versionadded:: 26.1.0 It is now possible to register (and get) abstract types like :class:`typing.Protocol`\ s or abstract base classes. """ _services: dict[_ServiceType, RegisteredService] = attrs.Factory(dict) _on_close: list[tuple[RegisteredService, Callable | Awaitable]] = ( attrs.Factory(list) ) def __repr__(self) -> str: return f"<svcs.Registry(num_services={len(self._services)})>"
[docs] def __contains__(self, svc_type: _ServiceType) -> bool: """ Check whether this registry knows how to create *svc_type*: .. doctest:: >>> reg = svcs.Registry() >>> reg.register_value(int, 42) >>> int in reg True >>> str in reg False """ return svc_type in self._services
[docs] def __iter__(self) -> Iterator[RegisteredService]: """ Returns: An iterator over registered services. .. versionadded:: 25.1.0 """ return iter(self._services.values())
def __enter__(self) -> Registry: return self def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> None: self.close() async def __aenter__(self) -> Registry: return self async def __aexit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> None: await self.aclose() def __del__(self) -> None: """ Warn if the registry is gc'ed before being closed. """ if getattr(self, "_on_close", None): warnings.warn( "Registry was garbage-collected with pending cleanups.", ResourceWarning, stacklevel=1, )
[docs] def register_factory( self, svc_type: _ServiceType, factory: Callable, *, enter: bool = True, ping: Callable | None = None, on_registry_close: Callable | Awaitable | None = None, suppress_context_exit: bool = True, ) -> None: """ Register *factory* to be used when asked for a *svc_type*. Repeated registrations overwrite previous ones, but the *on_registry_close* callbacks are run all together when the registry is closed. Args: svc_type: The type of the service to register. factory: A callable that is used to instantiated *svc_type* if asked. If it's a generator or a context manager, a cleanup is registered after instantiation. Can also be an async callable/generator/context manager. If *factory* takes a first argument called ``svcs_container`` or the first argument (of any name) is annotated as being :class:`svcs.Container`, the container instance that is instantiating the service is passed into the factory as the first positional argument. Note: Generally speaking, given the churn and edgecases in the typing ecosystem, we recommend using the name route to detect the container argument because it's most reliable. enter: Whether to enter context managers if one is returned by *factory*. Usually you want that, but there are occasions -- like database transaction managers -- that you want to enter manually. ping: A callable that marks the service as having a health check. See Also: :meth:`Container.get_pings` and :class:`ServicePing`. on_registry_close: A callable that is called when the :meth:`svcs.Registry.close()` method is called. Can also be an async callable or an :class:`collections.abc.Awaitable`; then :meth:`svcs.Registry.aclose()` must be called. suppress_context_exit: Whether to suppress errors raised in the container context when exiting registered factories. By default, it is True to avoid propagating errors raised in container context, so factories cleanup code is executed unconditionally. Set it to False to have control over error propagation, but note that you can't stop the exception from bubbling out of the container context by handling it in the factory cleanup context manager. .. versionchanged:: 25.1.0 *factory* now may take any amount of arguments and they are ignored. .. versionadded:: 26.1.0 *suppress_context*. """ rs = self._register_factory( svc_type, factory, enter=enter, ping=ping, on_registry_close=on_registry_close, suppress_context_exit=suppress_context_exit, ) log.debug( "registered factory %r for service type %s", factory, rs.name, extra={ "svcs_service_name": rs.name, "svcs_factory_name": _full_name(factory), }, )
[docs] def register_value( self, svc_type: _ServiceType, value: object, *, enter: bool = False, ping: Callable | None = None, on_registry_close: Callable | Awaitable | None = None, suppress_context_exit: bool = True, ) -> None: """ Syntactic sugar for:: register_factory( svc_type, lambda: value, enter=enter, ping=ping, on_registry_close=on_registry_close, suppress_context_exit=suppress_context_exit ) Please note that, unlike with :meth:`register_factory`, entering context managers is **disabled** by default. .. versionchanged:: 23.21.0 *enter* is now ``False`` by default. """ rs = self._register_factory( svc_type, lambda: value, enter=enter, ping=ping, on_registry_close=on_registry_close, suppress_context_exit=suppress_context_exit, ) log.debug( "registered value %r for service type %s", value, rs.name, extra={"svcs_service_name": rs.name, "svcs_value": value}, )
def _register_factory( self, svc_type: _ServiceType, factory: Callable, enter: bool, ping: Callable | None, on_registry_close: Callable | Awaitable | None = None, suppress_context_exit: bool = True, ) -> RegisteredService: if isgeneratorfunction(factory): factory = contextmanager(factory) elif isasyncgenfunction(factory): factory = asynccontextmanager(factory) rs = RegisteredService( svc_type, factory, _takes_container(factory), enter, ping, suppress_context_exit, ) self._services[svc_type] = rs if on_registry_close is not None: self._on_close.append((rs, on_registry_close)) return rs def get_registered_service_for( self, svc_type: _ServiceType ) -> RegisteredService: try: return self._services[svc_type] except KeyError: raise ServiceNotFoundError(svc_type) from None
[docs] def close(self) -> None: """ Clear registrations and run synchronous *on_registry_close* callbacks. Async callbacks are *not* awaited and a warning is raised Errors are logged at warning level, but otherwise ignored. """ for rs, oc in reversed(self._on_close): if iscoroutinefunction(oc) or isawaitable(oc): warnings.warn( f"Skipped async cleanup for {rs.name!r}. " "Use aclose() instead.", # stacklevel doesn't matter here; it's coming from a # framework. stacklevel=1, ) continue try: log.debug("closing %r", rs.name) oc() log.debug("closed %r", rs.name) except Exception: # noqa: BLE001 log.warning( "Registry's on_registry_close callback failed for %r.", rs.name, exc_info=True, extra={"svcs_service_name": rs.name}, ) self._services.clear() self._on_close.clear()
[docs] async def aclose(self) -> None: """ Clear registrations and run all *on_registry_close* callbacks. Errors are logged at warning level, but otherwise ignored. Also works with synchronous services, so in an async application, just use this. """ for rs, oc in reversed(self._on_close): try: if iscoroutinefunction(oc): oc = oc() # noqa: PLW2901 if isawaitable(oc): log.debug("async closing %r", rs.name) await oc log.debug("async closed %r", rs.name) else: log.debug("closing %r", rs.name) oc() log.debug("closed %r", rs.name) except Exception: # noqa: BLE001, PERF203 log.warning( "Registry's on_registry_close callback failed for %r.", rs.name, exc_info=True, extra={"svcs_service_name": rs.name}, ) self._services.clear() self._on_close.clear()
def _robust_signature(factory: Callable) -> inspect.Signature | None: with suppress(Exception): # Provide the locals so that `eval_str` will work even if the user # places the `Container` under a `if TYPE_CHECKING` block. return inspect.signature( factory, locals={"Container": Container}, eval_str=True, ) # Retry without `eval_str` since if the annotation is "svcs.Container" # the eval will fail due to it not finding the `svcs` module with suppress(Exception): return inspect.signature(factory) return None def _takes_container(factory: Callable) -> bool: """ Return True if *factory* takes a svcs.Container as its first argument. """ if not (sig := _robust_signature(factory)): return False try: (name, p) = next(iter(sig.parameters.items())) except StopIteration: return False # 0 arguments return name == "svcs_container" or p.annotation in ( Container, "svcs.Container", "Container", ) T1 = TypeVar("T1") T2 = TypeVar("T2") T3 = TypeVar("T3") T4 = TypeVar("T4") T5 = TypeVar("T5") T6 = TypeVar("T6") T7 = TypeVar("T7") T8 = TypeVar("T8") T9 = TypeVar("T9") T10 = TypeVar("T10")
[docs] @attrs.define class Container: """ A per-context container for instantiated services and cleanups. The instance of this should live as long as a request or a task. Also works as a context manager that runs clean ups on exit: .. doctest:: >>> reg = svcs.Registry() >>> def factory() -> str: ... yield "Hello World" ... print("Cleaned up!") >>> reg.register_factory(str, factory) >>> with svcs.Container(reg) as con: ... _ = con.get(str) Cleaned up! Warns: ResourceWarning: If a container with pending cleanups is garbage-collected. Attributes: registry: The :class:`Registry` instance that this container uses for service type lookup. """ registry: Registry _lazy_local_registry: Registry | None = None _instantiated: dict[_ServiceType, tuple[object, RegisteredService]] = ( attrs.Factory(dict) ) _on_close: list[ tuple[ RegisteredService, AbstractContextManager | AbstractAsyncContextManager, ] ] = attrs.Factory(list) def __repr__(self) -> str: return ( f"<Container(instantiated={len(self._instantiated)}, " f"cleanups={len(self._on_close)})>" )
[docs] def __contains__(self, svc_type: _ServiceType) -> bool: """ Check whether this container has a cached instance of *svc_type*. """ return svc_type in self._instantiated
def __enter__(self) -> Container: return self def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> None: self.close(exc_type, exc_val, exc_tb) async def __aenter__(self) -> Container: return self async def __aexit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> None: await self.aclose(exc_type, exc_val, exc_tb) def __del__(self) -> None: """ Warn if the container is gc'ed before being closed. """ if getattr(self, "_on_close", None): warnings.warn( "Container was garbage-collected with pending cleanups.", ResourceWarning, stacklevel=1, )
[docs] def close( self, exc_type: type[BaseException] | None = None, exc_val: BaseException | None = None, exc_tb: TracebackType | None = None, ) -> None: """ Run all registered *synchronous* cleanups. Async closes are *not* awaited and a warning is raised. Errors are logged at warning level, but otherwise ignored. Hint: The Container can be used again after this. Closing it is an idempotent way to reset it. """ for rs, cm in reversed(self._on_close): try: if isinstance(cm, AbstractAsyncContextManager): warnings.warn( f"Skipped async cleanup for {rs.name!r}. " "Use aclose() instead.", # stacklevel doesn't matter here; it's coming from a # framework. stacklevel=1, ) continue rs.close(cm, exc_type, exc_val, exc_tb) except Exception: # noqa: BLE001 log.warning( "Container clean up failed for %r.", rs.name, exc_info=True, extra={"svcs_service_name": rs.name}, ) if self._lazy_local_registry is not None: self._lazy_local_registry.close() self._on_close.clear() self._instantiated.clear()
[docs] async def aclose( self, exc_type: type[BaseException] | None = None, exc_val: BaseException | None = None, exc_tb: TracebackType | None = None, ) -> None: """ Run *all* registered cleanups -- synchronous **and** asynchronous. Errors are logged at warning level, but otherwise ignored. Also works with synchronous services, so in an async application, just use this. Hint: The container can be used again after this. Closing it is an idempotent way to reset it. """ for rs, cm in reversed(self._on_close): try: if isinstance(cm, AbstractContextManager): rs.close(cm, exc_type, exc_val, exc_tb) else: await rs.aclose(cm, exc_type, exc_val, exc_tb) except Exception: # noqa: BLE001, PERF203 log.warning( "Container clean up failed for %r.", rs.name, exc_info=True, extra={"svcs_service_name": rs.name}, ) if self._lazy_local_registry is not None: await self._lazy_local_registry.aclose() self._on_close.clear() self._instantiated.clear()
[docs] def get_pings(self) -> list[ServicePing]: """ Return all services that have defined a *ping* and bind them to this container. Returns: A list of services that have registered a ping callable. """ if self._lazy_local_registry is None: local_rs_types = set() pings = [] else: local_rs_types = {rs.svc_type for rs in self._lazy_local_registry} pings = [ ServicePing( rs.name, iscoroutinefunction(rs.ping), rs.svc_type, rs.ping, self, ) for rs in self._lazy_local_registry if rs.ping is not None ] pings.extend( ServicePing( rs.name, iscoroutinefunction(rs.ping), rs.svc_type, rs.ping, self, ) for rs in self.registry if rs.ping is not None and rs.svc_type not in local_rs_types ) return pings
[docs] def get_abstract(self, *svc_types: _ServiceType) -> Any: """ Like :meth:`get` but is annotated to return :data:`typing.Any` which used to be the only way to allow it to be used with abstract types like :class:`typing.Protocol` or :mod:`abc` classes. As of *svcs* 26.1.0 and :pep:`747`, this is no longer necessary. .. deprecated:: 26.1.0 """ return self.get(*svc_types)
[docs] async def aget_abstract(self, *svc_types: _ServiceType) -> Any: """ Same as :meth:`get_abstract` but instantiates asynchronously, if necessary. Also works with synchronous services, so in an async application, just use this. .. deprecated:: 26.1.0 """ return await self.aget(*svc_types)
def _lookup( self, svc_type: _ServiceType ) -> tuple[bool, object, RegisteredService]: """ Look up svc_type first in our cache, then in the registry. If it's cached, only the first two items of the returned tupled are meaningful. """ rs: RegisteredService | None = None if cached_data := self._instantiated.get(svc_type): svc, rs = cached_data return True, svc, rs if self._lazy_local_registry is not None: with suppress(ServiceNotFoundError): rs = self._lazy_local_registry.get_registered_service_for( svc_type ) if rs is None: rs = self.registry.get_registered_service_for(svc_type) svc = rs.factory(self) if rs.takes_container else rs.factory() return False, svc, rs
[docs] def register_local_factory( self, svc_type: _ServiceType, factory: Callable, *, enter: bool = True, ping: Callable | None = None, on_registry_close: Callable | Awaitable | None = None, ) -> None: """ Same as :meth:`svcs.Registry.register_factory()`, but registers the factory only for this container. A temporary :class:`svcs.Registry` is transparently created -- and closed together with the container it belongs to. See Also: :ref:`local-registries` .. versionadded:: 23.21.0 """ if self._lazy_local_registry is None: self._lazy_local_registry = Registry() self._lazy_local_registry.register_factory( svc_type=svc_type, factory=factory, enter=enter, ping=ping, on_registry_close=on_registry_close, )
[docs] def register_local_value( self, svc_type: _ServiceType, value: object, *, enter: bool = False, ping: Callable | None = None, on_registry_close: Callable | Awaitable | None = None, ) -> None: """ Syntactic sugar for:: register_local_factory( svc_type, lambda: value, enter=enter, ping=ping, on_registry_close=on_registry_close ) Please note that, unlike with :meth:`register_local_factory`, entering context managers is **disabled** by default. See Also: :ref:`local-registries` .. versionadded:: 23.21.0 """ self.register_local_factory( svc_type, lambda: value, enter=enter, ping=ping, on_registry_close=on_registry_close, )
@overload def get(self, svc_type: TypeForm[T1], /) -> T1: ... @overload def get( self, svc_type1: TypeForm[T1], svc_type2: TypeForm[T2], / ) -> tuple[T1, T2]: ... @overload def get( self, svc_type1: TypeForm[T1], svc_type2: TypeForm[T2], svc_type3: TypeForm[T3], /, ) -> tuple[T1, T2, T3]: ... @overload def get( self, svc_type1: TypeForm[T1], svc_type2: TypeForm[T2], svc_type3: TypeForm[T3], svc_type4: TypeForm[T4], /, ) -> tuple[T1, T2, T3, T4]: ... @overload def get( self, svc_type1: TypeForm[T1], svc_type2: TypeForm[T2], svc_type3: TypeForm[T3], svc_type4: TypeForm[T4], svc_type5: TypeForm[T5], /, ) -> tuple[T1, T2, T3, T4, T5]: ... @overload def get( self, svc_type1: TypeForm[T1], svc_type2: TypeForm[T2], svc_type3: TypeForm[T3], svc_type4: TypeForm[T4], svc_type5: TypeForm[T5], svc_type6: TypeForm[T6], /, ) -> tuple[T1, T2, T3, T4, T5, T6]: ... @overload def get( self, svc_type1: TypeForm[T1], svc_type2: TypeForm[T2], svc_type3: TypeForm[T3], svc_type4: TypeForm[T4], svc_type5: TypeForm[T5], svc_type6: TypeForm[T6], svc_type7: TypeForm[T7], /, ) -> tuple[T1, T2, T3, T4, T5, T6, T7]: ... @overload def get( self, svc_type1: TypeForm[T1], svc_type2: TypeForm[T2], svc_type3: TypeForm[T3], svc_type4: TypeForm[T4], svc_type5: TypeForm[T5], svc_type6: TypeForm[T6], svc_type7: TypeForm[T7], svc_type8: TypeForm[T8], /, ) -> tuple[T1, T2, T3, T4, T5, T6, T7, T8]: ... @overload def get( self, svc_type1: TypeForm[T1], svc_type2: TypeForm[T2], svc_type3: TypeForm[T3], svc_type4: TypeForm[T4], svc_type5: TypeForm[T5], svc_type6: TypeForm[T6], svc_type7: TypeForm[T7], svc_type8: TypeForm[T8], svc_type9: TypeForm[T9], /, ) -> tuple[T1, T2, T3, T4, T5, T6, T7, T8, T9]: ... @overload def get( self, svc_type1: TypeForm[T1], svc_type2: TypeForm[T2], svc_type3: TypeForm[T3], svc_type4: TypeForm[T4], svc_type5: TypeForm[T5], svc_type6: TypeForm[T6], svc_type7: TypeForm[T7], svc_type8: TypeForm[T8], svc_type9: TypeForm[T9], svc_type10: TypeForm[T10], /, ) -> tuple[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10]: ...
[docs] def get(self, *svc_types: _ServiceType) -> object: """ Get services of *svc_types*. Instantiate them if necessary and register their cleanup. Returns: ``svc_types[0]`` | ``tuple[*svc_types]``: If one service is requested, it's returned directly. If multiple are requested, a tuple of services is returned. """ rv = [] for svc_type in svc_types: cached, svc, rs = self._lookup(svc_type) if cached: rv.append(svc) continue if not isinstance(svc, MagicMock) and ( iscoroutine(svc) or isinstance( svc, AbstractAsyncContextManager, # pyrefly: ignore[unsafe-overlap] ) ): msg = "Use `aget()` for async factories." raise TypeError(msg) if rs.enter and isinstance(svc, AbstractContextManager): self._on_close.append((rs, svc)) svc = svc.__enter__() self._instantiated[svc_type] = (svc, rs) rv.append(svc) if len(rv) == 1: return rv[0] return rv
@overload async def aget(self, svc_type: TypeForm[T1], /) -> T1: ... @overload async def aget( self, svc_type1: TypeForm[T1], svc_type2: TypeForm[T2], / ) -> tuple[T1, T2]: ... @overload async def aget( self, svc_type1: TypeForm[T1], svc_type2: TypeForm[T2], svc_type3: TypeForm[T3], /, ) -> tuple[T1, T2, T3]: ... @overload async def aget( self, svc_type1: TypeForm[T1], svc_type2: TypeForm[T2], svc_type3: TypeForm[T3], svc_type4: TypeForm[T4], /, ) -> tuple[T1, T2, T3, T4]: ... @overload async def aget( self, svc_type1: TypeForm[T1], svc_type2: TypeForm[T2], svc_type3: TypeForm[T3], svc_type4: TypeForm[T4], svc_type5: TypeForm[T5], /, ) -> tuple[T1, T2, T3, T4, T5]: ... @overload async def aget( self, svc_type1: TypeForm[T1], svc_type2: TypeForm[T2], svc_type3: TypeForm[T3], svc_type4: TypeForm[T4], svc_type5: TypeForm[T5], svc_type6: TypeForm[T6], /, ) -> tuple[T1, T2, T3, T4, T5, T6]: ... @overload async def aget( self, svc_type1: TypeForm[T1], svc_type2: TypeForm[T2], svc_type3: TypeForm[T3], svc_type4: TypeForm[T4], svc_type5: TypeForm[T5], svc_type6: TypeForm[T6], svc_type7: TypeForm[T7], /, ) -> tuple[T1, T2, T3, T4, T5, T6, T7]: ... @overload async def aget( self, svc_type1: TypeForm[T1], svc_type2: TypeForm[T2], svc_type3: TypeForm[T3], svc_type4: TypeForm[T4], svc_type5: TypeForm[T5], svc_type6: TypeForm[T6], svc_type7: TypeForm[T7], svc_type8: TypeForm[T8], /, ) -> tuple[T1, T2, T3, T4, T5, T6, T7, T8]: ... @overload async def aget( self, svc_type1: TypeForm[T1], svc_type2: TypeForm[T2], svc_type3: TypeForm[T3], svc_type4: TypeForm[T4], svc_type5: TypeForm[T5], svc_type6: TypeForm[T6], svc_type7: TypeForm[T7], svc_type8: TypeForm[T8], svc_type9: TypeForm[T9], /, ) -> tuple[T1, T2, T3, T4, T5, T6, T7, T8, T9]: ... @overload async def aget( self, svc_type1: TypeForm[T1], svc_type2: TypeForm[T2], svc_type3: TypeForm[T3], svc_type4: TypeForm[T4], svc_type5: TypeForm[T5], svc_type6: TypeForm[T6], svc_type7: TypeForm[T7], svc_type8: TypeForm[T8], svc_type9: TypeForm[T9], svc_type10: TypeForm[T10], /, ) -> tuple[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10]: ...
[docs] async def aget(self, *svc_types: _ServiceType) -> object: """ Same as :meth:`get` but instantiates asynchronously, if necessary. Also works with synchronous services, so in an async application, just use this. .. versionchanged:: 25.1.0 Synchronous context managers are now entered/exited, too. """ rv = [] for svc_type in svc_types: cached, svc, rs = self._lookup(svc_type) if cached: rv.append(svc) continue if rs.enter and isinstance(svc, AbstractAsyncContextManager): self._on_close.append((rs, svc)) svc = await svc.__aenter__() elif rs.enter and isinstance(svc, AbstractContextManager): self._on_close.append((rs, svc)) svc = svc.__enter__() # _lookup() doesn't handle async factories, so we have to live with # some repetition. elif isawaitable(svc): # Execute the factory. Until now, we've only created the # awaitable. svc = await svc # Factory returned a contextmanager. if rs.enter and isinstance(svc, AbstractAsyncContextManager): self._on_close.append((rs, svc)) svc = await svc.__aenter__() elif rs.enter and isinstance(svc, AbstractContextManager): self._on_close.append((rs, svc)) svc = svc.__enter__() self._instantiated[svc_type] = (svc, rs) rv.append(svc) if len(rv) == 1: return rv[0] return rv