Coverage for kpoisk_bot/session.py: 0%

18 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-19 21:13 +0000

1import logging 

2from typing import Any, Awaitable, Callable, Dict 

3 

4from aiogram import BaseMiddleware 

5from aiogram.types import TelegramObject 

6from kinopoisk_unofficial_api_client import AuthenticatedClient 

7 

8from .config import API_TOKEN, API_URL 

9 

10 

11class ApiMiddleware(BaseMiddleware): 

12 def __init__(self) -> None: 

13 self.client = AuthenticatedClient(base_url=API_URL, token=API_TOKEN) 

14 

15 async def init(self) -> None: 

16 await self.client.__aenter__() 

17 logging.info(f"Established connection to API at {API_URL}") 

18 

19 async def on_shutdown(self) -> None: 

20 await self.client.__aexit__() 

21 logging.info("Shut down API connection") 

22 

23 async def __call__( 

24 self, 

25 handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]], 

26 event: TelegramObject, 

27 data: Dict[str, Any], 

28 ) -> Any: 

29 data["api_client"] = self.client 

30 return await handler(event, data)