import asyncio
import logging
from collections import defaultdict
from time import time
import jsonschema
from caspia.meadow.client.gateway import ServiceGatewayMixin
from caspia.meadow.services import weather
logger = logging.getLogger(__name__)
[docs]class Weather(ServiceGatewayMixin, weather.WeatherBase):
"""Implements the Weather service."""
REPORT_VALIDITY = {
weather.WeatherReportType.CURRENT: 60 * 30,
weather.WeatherReportType.UPCOMING: 60 * 60,
weather.WeatherReportType.TODAY: 60 * 60 * 24,
weather.WeatherReportType.TOMORROW: 60 * 60 * 24
}
def __init__(self, name, loop=None):
super().__init__(name)
self._loop = loop or asyncio.get_event_loop()
self._reports = defaultdict(list)
self._weather = defaultdict(dict)
self._summarize_handle = None
self._summarize_task = asyncio.ensure_future(self._summarize(), loop=self._loop)
def _invalidate_old_reports(self):
reports = defaultdict(list)
for report_type in weather.WeatherReportType:
by_sources = defaultdict(list)
for report in self._reports[report_type]:
by_sources[report.source].append(report)
for source in list(by_sources):
by_sources[source].sort(key=lambda r: r.time, reverse=True)
for source, source_reports in by_sources.items():
report = source_reports[0] if source_reports else None
if not self._is_outdated(report):
reports[report_type].append(report)
self._reports = reports
def _schedule_next_summarization(self):
if self._summarize_handle:
self._summarize_handle.cancel()
next_timeout = self._closest_timeout()
if next_timeout is None:
return
def summarize():
self._summarize_task = asyncio.ensure_future(self._summarize(), loop=self._loop)
delay = next_timeout - time()
self._summarize_handle = self._loop.call_later(delay, summarize)
async def _summarize(self):
try:
self._invalidate_old_reports()
for report_type in weather.WeatherReportType:
summary = dict()
for report in sorted(self._reports[report_type], key=lambda r: r.priority):
summary.update(report.data)
if summary != self._weather[report_type]:
self._weather[report_type] = summary
await self.notify(self.characteristic_for_report_type(report_type), summary)
except Exception: # pylint: disable=broad-except
logger.exception('Failure when summarizing weather')
finally:
self._schedule_next_summarization()
[docs] async def characteristic_write(self, characteristic, value, **kwargs):
if characteristic is self.update:
extra = kwargs['extra']
report_type = weather.WeatherReportType(extra['report_type'])
timestamp = extra['time']
priority = extra.get('priority', 100)
source = extra['source']
report = weather.WeatherReport(report_type=report_type,
time=timestamp,
data=value,
priority=priority,
source=source)
self._cleanup_report(report)
self._reports[report_type].append(report)
await self._summarize()
else:
raise NotImplementedError
[docs] async def characteristic_read(self, characteristic, **kwargs):
if characteristic is self.today:
return self._weather[weather.WeatherReportType.TODAY]
elif characteristic is self.tomorrow:
return self._weather[weather.WeatherReportType.TOMORROW]
elif characteristic is self.upcoming:
return self._weather[weather.WeatherReportType.UPCOMING]
elif characteristic is self.current:
return self._weather[weather.WeatherReportType.CURRENT]
else:
raise NotImplementedError
def _cleanup_report(self, report):
keys = list(report.data.keys())
for key in keys:
if key not in self.datapoint_schemas:
continue
schema = self.datapoint_schemas[key]
try:
jsonschema.validate(report.data[key], schema.schema)
except jsonschema.ValidationError:
fmt = 'Provided weather datapoint from %r is not valid: %r.'
logger.error(fmt, report.source, report.data[key])
del report.data[key]
def _timeout_of(self, report):
validity_interval = Weather.REPORT_VALIDITY[report.report_type]
return report.time + validity_interval
def _is_outdated(self, report):
return self._timeout_of(report) < time()
def _closest_timeout(self):
try:
return min(
self._timeout_of(r) for r_type in self._reports for r in self._reports[r_type])
except ValueError:
return None
[docs] def characteristic_for_report_type(self, report_type):
if report_type == weather.WeatherReportType.UPCOMING:
return self.upcoming
elif report_type == weather.WeatherReportType.CURRENT:
return self.current
elif report_type == weather.WeatherReportType.TODAY:
return self.today
elif report_type == weather.WeatherReportType.TOMORROW:
return self.tomorrow
return None
def __enter__(self):
return self
def __exit__(self, *args):
if self._summarize_handle:
self._summarize_handle.cancel()
if self._summarize_task:
self._summarize_task.cancel()