Source code for caspia.homeserver.services.tile

import uuid

from ..exceptions import AlreadyExistsError, NotFoundError
from .base import Service
from .wall import WallService


[docs]def append(key, value): def transform(doc): doc[key].append(value) return transform
[docs]class TileService(Service): table_name = 'walls' def __init__(self, wall_service: WallService, **kwargs): super().__init__(**kwargs) self.wall_service = wall_service
[docs] def check_identifier(self, tile_identifier): if not isinstance(tile_identifier, str): raise ValueError('tile\'s identifier is expected to be a string')
[docs] async def tile_exists(self, wall_identifier, tile_identifier): try: await self.get_tile(wall_identifier, tile_identifier) return True except NotFoundError: return False
[docs] async def create_tile(self, wall_identifier, tile): tile['identifier'] = tile.pop('identifier', None) or str(uuid.uuid4()) if await self.tile_exists(wall_identifier, tile['identifier']): raise AlreadyExistsError(f'tile with identifier {tile["identifier"]} already exists') self.check_identifier(tile['identifier']) self.table.update(append('tiles', tile), doc_ids=[wall_identifier]) return await self.get_tile(wall_identifier, tile['identifier'])
[docs] async def get_tile(self, wall_identifier, tile_identifier): wall = await self.wall_service.get_wall(wall_identifier) tiles = [t for t in wall['tiles'] if t['identifier'] == tile_identifier] if tiles: return tiles[0] else: raise NotFoundError(f"tile with identifier {tile_identifier} not found.")
[docs] async def update_tile(self, wall_identifier, tile_identifier, tile): assert tile_identifier == tile['identifier'] self.check_identifier(tile_identifier) wall = await self.wall_service.get_wall(wall_identifier) for idx, existing_tile in enumerate(wall['tiles']): if tile_identifier == existing_tile['identifier']: wall['tiles'][idx] = tile break else: raise NotFoundError(f"tile with identifier {tile_identifier} not found.") await self.wall_service.update_wall(wall_identifier, wall) return tile
[docs] async def remove_tile(self, wall_identifier, tile_identifier): self.check_identifier(tile_identifier) wall = await self.wall_service.get_wall(wall_identifier) for idx, existing_tile in enumerate(wall['tiles']): if tile_identifier == existing_tile['identifier']: del wall['tiles'][idx] break else: raise NotFoundError(f"tile with identifier {tile_identifier} not found.") await self.wall_service.update_wall(wall_identifier, wall)