Source code for caspia.node.components.sensors.tsl258x

import struct
from collections import namedtuple
from dataclasses import dataclass, field
from enum import IntEnum
from typing import Optional, Tuple

import arrow

from ..base import Component
from .base import SensorBase


[docs]class TSL258XSensor(SensorBase): """I2C Light Sensor.""" type = 0x0E PartIdentification = namedtuple('PartIdentification', ['part', 'revision'])
[docs] async def read_part_id(self): response = await self.request(b'\x10') part_number = response[0] >> 4 if part_number == 0b1000: part = 'TSL2580' elif part_number == 0b1001: part = 'TSL2581' else: raise ValueError('invalid part number') revision = response[0] & 0x0F return TSL258XSensor.PartIdentification(part=part, revision=revision)
[docs] class GainControl(IntEnum): Gain1x = 0b00 Gain8x = 0b01 Gain16x = 0b10 Gain111x = 0b11
[docs] @classmethod def raw_to_lux(cls, raw): ch0, ch1 = raw if ch0 == 0: return 0 ch1_ch0 = ch1 / ch0 if ch1_ch0 <= 0.3: return 0.130 * ch0 - 0.24 * ch1 elif ch1_ch0 <= 0.38: return 0.1649 * ch0 - 0.3562 * ch1 elif ch1_ch0 <= 0.45: return 0.0974 * ch0 - 0.1786 * ch1 elif ch1_ch0 <= 0.54: return 0.062 * ch0 - 0.1 * ch1 else: return 0
[docs] @staticmethod def parse_measurement_data(data): return struct.unpack('<HH', data)
[docs] @dataclass class Config(SensorBase.Config): address: int = field(default=0b0111001) integration_cycles: int = field(default=1) gain_control: 'TSL258XSensor.GainControl' = field( default_factory=lambda: TSL258XSensor.GainControl.Gain1x)
[docs] def to_config_dict(self): assert self.integration_cycles != 0 cfg = super().to_config_dict() cfg[0x11] = struct.pack('B', self.address) cfg[0x21] = struct.pack('B', self.integration_cycles) cfg[0x22] = struct.pack('B', self.gain_control) return cfg
[docs] @classmethod def params_from_config_dict(cls, cfg): params = SensorBase.Config.params_from_config_dict(cfg) if 0x11 in cfg: params['address'], = struct.unpack('B', cfg[0x11]) if 0x21 in cfg: params['integration_cycles'], = struct.unpack('B', cfg[0x21]) if 0x22 in cfg: params['gain_control'] = TSL258XSensor.GainControl(cfg[0x22][0]) return params
def __repr_fields__(self): return dict(address=hex(self.address), integration_cycles=self.integration_cycles, gain_control=self.gain_control, **super().__repr_fields__())
[docs] @dataclass(repr=False) class MeasurementEvent(SensorBase.MeasurementEvent): raw: Tuple[int, int] = field(init=False, repr=False)
[docs] def parse(self): super().parse() self.raw: Tuple[int, int] = self.measurement_data if not self.error else (None, None)
@property def lux(self): if self.raw != (None, None): return TSL258XSensor.raw_to_lux(self.raw) else: return None def __repr_fields__(self): return dict(time=self.timestamp, raw=self.raw, error=self.error, lux=self.lux)
[docs] @dataclass class State(Component.State): raw: Tuple[Optional[int], Optional[int]] = field(default=(None, None)) error: Optional[int] = field(default=None) timestamp: Optional[arrow.Arrow] = field(default=None)
[docs] def update_from_bytes(self, data): self.raw = TSL258XSensor.parse_measurement_data(data) self.error = None self.timestamp = arrow.now()
[docs] def update_from_error(self, error_code, data): self.raw = (None, None) self.error = error_code self.timestamp = arrow.now()
[docs] def update_from_event(self, event): if isinstance(event, TSL258XSensor.MeasurementEvent): self.timestamp = event.timestamp if event.error: self.error = event.error self.raw = (None, None) else: self.error = None self.raw = event.measurement_data
@property def lux(self): if self.raw != (None, None): return TSL258XSensor.raw_to_lux(self.raw) else: return None def __repr_fields__(self): return dict(raw=self.raw, error=self.error, timestamp=self.timestamp, lux=self.lux)