Source code for caspia.meadow.services.weather

import enum

from .base import Characteristic, ServiceBase


[docs]class WeatherBase(ServiceBase): """ Provides information about current weather and its forecast. The characteristic's values are dictionaries looking like: .. code-block:: json { 'temperature': 24.5, 'precip_probability: 1, ... } """ type = 'weather' current = Characteristic('json', 'RN', description='Current weather') upcoming = Characteristic('json', 'RN', description='Short-term forecast for next few hours') today = Characteristic('json', 'RN', description="Today's forecast") tomorrow = Characteristic('json', 'RN', description="Tomorrow's forecast") update = Characteristic('json', 'W', description="Endpoint for updating the weather's data.")
[docs] async def post_report(self, report: 'WeatherReport'): """ Update the service with given `WeatherReport`. This just writes the report to the `update` characteristic. """ await self.update.write(report.data, extra={ 'time': report.time, 'report_type': report.report_type, 'priority': report.priority, 'source': report.source })
datapoint_schemas = dict()
# # Weather Report #
[docs]class WeatherReportType(enum.Enum): CURRENT = 'current' UPCOMING = 'upcoming' TODAY = 'today' TOMORROW = 'tomorrow'
[docs]class WeatherReport: def __init__(self, report_type: WeatherReportType, time: float, data: dict, priority: int, source: str): self.report_type = report_type self.time = time self.data = data self.priority = priority or 100 self.source = source
# # Datapoints Definition #
[docs]class DataPointSchema: def __init__(self, key, description, **schema): self.key = key self.description = description self.schema = schema
WeatherBase.datapoint_schemas = { s.key: s for s in [ DataPointSchema( 'precip_type', 'Type of expected precipitation', enum=['rain', 'snow', 'sleet', None]), DataPointSchema('precip_probability', 'Probability of the preciprotion', type='number', minimum=0.0, maximum=1.0), DataPointSchema('precip_intensity', 'The preciprotion intensity in centimeters per hour', type='number', minimum=0.0), DataPointSchema('precip_accumulation', 'The amount of expected snowfall accumulation in centimeters', type='number', minimum=0.0), DataPointSchema('temperature', 'The air temperature in degrees celsius', type='number'), DataPointSchema('wind_bearing', 'The azimuth of where the wind is coming from', type='integer', minimum=0, maximum=360), DataPointSchema('wind_speed', 'The wind speed in km/h', type='integer', minimum=0), DataPointSchema('wind_gust', 'The wind gust in km/h', type='integer', minimum=0), DataPointSchema( 'pressure', 'The sea-level air pressure in millibars', type='number', minimum=0), DataPointSchema('humidity', 'The relative humidity', type='number', minimum=0, maximum=1), DataPointSchema('cloud_cover', 'The percentage of sky occluded by clouds', type='number', minimum=0, maximum=1), ] }