from caspia.gateway.services import GatewayService
from caspia.meadow.client import characteristic_read_handler
from caspia.meadow.errors import SensorFailureError
from caspia.meadow.services import Characteristic, LightSensorBase
from caspia.node.components.sensors import TSL258XSensor
[docs]class TSL258XLightSensor(GatewayService, LightSensorBase):
auto_discovery = False
chip_part_name = Characteristic('string', 'R')
chip_revision = Characteristic('integer', 'R')
@property
def component_config(self) -> TSL258XSensor.Config:
return self.config['tsl258x']
@property
def is_updated_periodically(self):
return self.component_config.interval != 0
@property
def dependant_components(self):
return {self.tsl258x_c}
[docs] async def on_component_event(self, component, event):
if self.tsl258x_c == component:
if isinstance(event, TSL258XSensor.MeasurementEvent):
if not event.error:
value = event.lux
else:
value = SensorFailureError('sensor reported an error')
await self.notify(self.light, value)
[docs] @characteristic_read_handler('light')
async def light_read(self, **kwargs):
if not self.is_updated_periodically or self.tsl258x_c.state.lux is None:
raw_data = await self.tsl258x_c.measure()
lux = self.tsl258x_c.raw_to_lux(raw_data)
error_code = self.tsl258x_c.state.error
else:
lux = self.tsl258x_c.state.lux
error_code = self.tsl258x_c.state.error
if error_code:
raise SensorFailureError(message=f'sensor failure (code {error_code})')
return round(float(lux), 2)
[docs] @characteristic_read_handler('chip_part_name')
async def chip_part_name_read(self, **kwargs):
identification = await self.tsl258x_c.read_part_id()
return identification.part
[docs] @characteristic_read_handler('chip_revision')
async def chip_revision_read(self, **kwargs):
identification = await self.tsl258x_c.read_part_id()
return identification.revision