Coverage for kpoisk_bot/session.py: 0%
18 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-19 21:13 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-19 21:13 +0000
1import logging
2from typing import Any, Awaitable, Callable, Dict
4from aiogram import BaseMiddleware
5from aiogram.types import TelegramObject
6from kinopoisk_unofficial_api_client import AuthenticatedClient
8from .config import API_TOKEN, API_URL
11class ApiMiddleware(BaseMiddleware):
12 def __init__(self) -> None:
13 self.client = AuthenticatedClient(base_url=API_URL, token=API_TOKEN)
15 async def init(self) -> None:
16 await self.client.__aenter__()
17 logging.info(f"Established connection to API at {API_URL}")
19 async def on_shutdown(self) -> None:
20 await self.client.__aexit__()
21 logging.info("Shut down API connection")
23 async def __call__(
24 self,
25 handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
26 event: TelegramObject,
27 data: Dict[str, Any],
28 ) -> Any:
29 data["api_client"] = self.client
30 return await handler(event, data)