Source code for caspia.toolbox.services.weather.service

import asyncio
import logging
from collections import defaultdict
from time import time

import jsonschema

from caspia.meadow.client.gateway import ServiceGatewayMixin
from caspia.meadow.services import weather

logger = logging.getLogger(__name__)


[docs]class Weather(ServiceGatewayMixin, weather.WeatherBase): """Implements the Weather service.""" REPORT_VALIDITY = { weather.WeatherReportType.CURRENT: 60 * 30, weather.WeatherReportType.UPCOMING: 60 * 60, weather.WeatherReportType.TODAY: 60 * 60 * 24, weather.WeatherReportType.TOMORROW: 60 * 60 * 24 } def __init__(self, name, loop=None): super().__init__(name) self._loop = loop or asyncio.get_event_loop() self._reports = defaultdict(list) self._weather = defaultdict(dict) self._summarize_handle = None self._summarize_task = asyncio.ensure_future(self._summarize(), loop=self._loop) def _invalidate_old_reports(self): reports = defaultdict(list) for report_type in weather.WeatherReportType: by_sources = defaultdict(list) for report in self._reports[report_type]: by_sources[report.source].append(report) for source in list(by_sources): by_sources[source].sort(key=lambda r: r.time, reverse=True) for source, source_reports in by_sources.items(): report = source_reports[0] if source_reports else None if not self._is_outdated(report): reports[report_type].append(report) self._reports = reports def _schedule_next_summarization(self): if self._summarize_handle: self._summarize_handle.cancel() next_timeout = self._closest_timeout() if next_timeout is None: return def summarize(): self._summarize_task = asyncio.ensure_future(self._summarize(), loop=self._loop) delay = next_timeout - time() self._summarize_handle = self._loop.call_later(delay, summarize) async def _summarize(self): try: self._invalidate_old_reports() for report_type in weather.WeatherReportType: summary = dict() for report in sorted(self._reports[report_type], key=lambda r: r.priority): summary.update(report.data) if summary != self._weather[report_type]: self._weather[report_type] = summary await self.notify(self.characteristic_for_report_type(report_type), summary) except Exception: # pylint: disable=broad-except logger.exception('Failure when summarizing weather') finally: self._schedule_next_summarization()
[docs] async def characteristic_write(self, characteristic, value, **kwargs): if characteristic is self.update: extra = kwargs['extra'] report_type = weather.WeatherReportType(extra['report_type']) timestamp = extra['time'] priority = extra.get('priority', 100) source = extra['source'] report = weather.WeatherReport(report_type=report_type, time=timestamp, data=value, priority=priority, source=source) self._cleanup_report(report) self._reports[report_type].append(report) await self._summarize() else: raise NotImplementedError
[docs] async def characteristic_read(self, characteristic, **kwargs): if characteristic is self.today: return self._weather[weather.WeatherReportType.TODAY] elif characteristic is self.tomorrow: return self._weather[weather.WeatherReportType.TOMORROW] elif characteristic is self.upcoming: return self._weather[weather.WeatherReportType.UPCOMING] elif characteristic is self.current: return self._weather[weather.WeatherReportType.CURRENT] else: raise NotImplementedError
def _cleanup_report(self, report): keys = list(report.data.keys()) for key in keys: if key not in self.datapoint_schemas: continue schema = self.datapoint_schemas[key] try: jsonschema.validate(report.data[key], schema.schema) except jsonschema.ValidationError: fmt = 'Provided weather datapoint from %r is not valid: %r.' logger.error(fmt, report.source, report.data[key]) del report.data[key] def _timeout_of(self, report): validity_interval = Weather.REPORT_VALIDITY[report.report_type] return report.time + validity_interval def _is_outdated(self, report): return self._timeout_of(report) < time() def _closest_timeout(self): try: return min( self._timeout_of(r) for r_type in self._reports for r in self._reports[r_type]) except ValueError: return None
[docs] def characteristic_for_report_type(self, report_type): if report_type == weather.WeatherReportType.UPCOMING: return self.upcoming elif report_type == weather.WeatherReportType.CURRENT: return self.current elif report_type == weather.WeatherReportType.TODAY: return self.today elif report_type == weather.WeatherReportType.TOMORROW: return self.tomorrow return None
def __enter__(self): return self def __exit__(self, *args): if self._summarize_handle: self._summarize_handle.cancel() if self._summarize_task: self._summarize_task.cancel()