Source code for caspia.gateway.services.meters.electricity

import asyncio
import time

from caspia.gateway.services import GatewayService
from caspia.meadow.client import characteristic_read_handler, characteristic_write_handler
from caspia.meadow.services import ElectricityMeterBase
from caspia.node.components import DigitalInput


[docs]class ElectricityMeter(GatewayService, ElectricityMeterBase): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.input_c = None # Linked digitalinput component with self.storage: self.total_consumption_value = self.storage.get('total', 0.0) # Total consumption value self.current_consumption_value = 0.0 # Current consumption value self.last_interval = None # Interval between last two pulses self.last_impuls_timestamp = None # Timestamp of last impuls asyncio.ensure_future(self.maintain_current_consumptions())
[docs] def configure(self): self.input_c = self.network.get_configured_component(self.get_node(), DigitalInput, self.config['input']) self.impulses_per_kwh = self.config['impulses_per_kwh']
@property def dependant_components(self): return {self.input_c}
[docs] async def update_current_consumption(self, timestamp=None): timestamp = timestamp or time.time() interval = timestamp - self.last_impuls_timestamp self.current_consumption_value = 3600 / (interval * (self.impulses_per_kwh / 1000)) await self.notify(self.current_consumption, self.current_consumption_value) return interval
[docs] async def maintain_current_consumptions(self): """Enter infinite loop and maintain current_consumption.""" while True: await asyncio.sleep(5) if self.last_interval is None or self.last_impuls_timestamp is None: continue if time.time() <= self.last_impuls_timestamp + self.last_interval: continue await self.update_current_consumption()
[docs] async def on_component_event(self, component, event): if component == self.input_c and isinstance(event, DigitalInput.LowToHighEvent): # Increment total consumption try: with self.storage: total = self.total_consumption_value total += 1 / (self.impulses_per_kwh / 1000) self.storage['total'] = total self.total_consumption_value = total except Exception: # pylint: disable=broad-except import logging logging.getLogger(__name__).exception('Failure when updating consumption') await self.notify(self.total_consumption, self.total_consumption_value) # Recalculate current consumption if self.last_impuls_timestamp is not None: self.last_interval = await self.update_current_consumption( timestamp=event.broadcast.timestamp) self.last_impuls_timestamp = event.broadcast.timestamp
[docs] @characteristic_read_handler('total_consumption') async def total_consumption_read(self, **kwargs): return self.total_consumption_value
[docs] @characteristic_read_handler('current_consumption') async def current_consumption_read(self, **kwargs): return self.current_consumption_value
[docs] @characteristic_write_handler('total_consumption') async def total_consumption_write(self, value, **kwargs): with self.storage: self.total_consumption_value = value self.storage['total'] = value