Source code for caspia.toolbox.integrations.darksky

import asyncio
import logging
import os
from email.utils import parsedate
from time import time

import aiohttp
import arrow

from caspia.meadow.services import weather

logger = logging.getLogger(__name__)


[docs]class DarkSkyWeatherProvider: URL = 'https://api.darksky.net/forecast/{self.secret_key}/{self.latitude},{self.longitude}' def __init__(self, latitude, longitude, secret_key=None, loop=None, interval=60 * 20, priority=100, name='darksky', weather_service=None): self._loop = loop or asyncio.get_event_loop() self.latitude = latitude self.longitude = longitude self.name = name self.priority = priority if secret_key: self.secret_key = secret_key else: try: self.secret_key = os.environ['CSP_DARKSKY_KEY'] except KeyError: raise RuntimeError('Secret key for darksky not provided.') from None self.weather_service = weather_service self._session = aiohttp.ClientSession(loop=self._loop) if interval: asyncio.ensure_future(self._runloop(interval), loop=self._loop)
[docs] def attach(self, weather_service): self.weather_service = weather_service
def _make_report(self, report_type, data): report_data = { 'temperature': data.get('temperature'), 'pressure': data.get('pressure'), 'humidity': data.get('humidity'), 'cloud_cover': data.get('cloudCover'), } if 'precipProbability' in data: report_data['precip_probability'] = data.get('precipProbability', 0.0) report_data['precip_type'] = data.get('precipType', None) report_data['precip_intensity'] = data.get('precipIntensity', 0.0) report_data['precip_accumulation'] = data.get('precipAccumulation', 0.0) if 'windSpeed' in data: report_data['wind_speed'] = int(data.get('windSpeed', 0)) report_data['wind_gust'] = int(data.get('windGust', 0)) report_data['wind_bearing'] = int(data.get('windBearing', 0)) allowed_none = ('precip_type', ) report_data = {k: v for k, v in report_data.items() if v is not None or k in allowed_none} return weather.WeatherReport(report_type=report_type, time=time(), data=report_data, priority=self.priority, source=self.name) def _find_datablock(self, blocks, shift=None, span='hour', now=None): now = now or arrow.now() start, end = now.shift(**(shift or {})).span(span) for datablock in blocks: if start <= arrow.get(datablock['time']) <= end: return datablock return None async def _report(self, report_type, datablock): report = self._make_report(report_type, datablock) await self.weather_service.post_report(report)
[docs] async def update(self, now=None): params = {'units': 'si'} async with self._session.get(type(self).URL.format(self=self), params=params) as resp: now = arrow.get(*parsedate(resp.headers['Date'])[:6]) resp.raise_for_status() data = await resp.json() if 'currently' in data: await self._report(weather.WeatherReportType.CURRENT, data['currently']) if 'hourly' in data: datablock = self._find_datablock(data['hourly']['data'], shift=dict(hours=2), span='hour', now=now) if datablock: await self._report(weather.WeatherReportType.UPCOMING, datablock) if 'daily' in data: datablock = self._find_datablock(data['daily']['data'], span='day', now=now) if datablock: await self._report(weather.WeatherReportType.TODAY, datablock) datablock = self._find_datablock(data['daily']['data'], shift=dict(days=1), span='day', now=now) if datablock: await self._report(weather.WeatherReportType.TOMORROW, datablock)
async def _runloop(self, interval): await asyncio.sleep(5.0) while True: try: logger.info('Starting weather update [%s]', self.name) await self.update() except Exception as e: # pylint: disable=broad-except logger.error('Failure when updating weather: %r', e) finally: await asyncio.sleep(interval) async def __aenter__(self): return self async def __aexit__(self, *args): await self._session.close()