Source code for scsgate.tasks

""" This module contains all the possible messages to send via
scsgate.Reactor """

from scsgate.messages import compose_telegram, parse, StateMessage


[docs]class ExecutionError(BaseException): """ Error raised when something goes wrong while executing a task """ pass
[docs]class BasicTask: """ Basic task, not to be used directly """
[docs] def execute(self, connection): """ Method to be implemented by all subclasses """ raise NotImplementedError()
[docs]class MonitorTask(BasicTask): """ Read the buffer and invokes the notification endpoint if there's a relevant message """ def __init__(self, notification_endpoint): self._notification_endpoint = notification_endpoint self._last_raw_state_message = None
[docs] def execute(self, connection): connection.serial.write(b"@r") length = int(connection.serial.read(), 16) if length == 0: return data = connection.serial.read(length * 2) message = parse(data) # Filter duplicated state messages. The filtering feature # of SCSGate is buggy and causes @r to always return 0 available # messages if isinstance(message, StateMessage): if self._last_raw_state_message == data: return else: self._last_raw_state_message = data self._notification_endpoint(message)
def __str__(self): return "Monitor Task"
[docs]class SetStatusTask(BasicTask): """ Generic task to request a status change. To not be used directly """ def __init__(self, target, action): self._target = target self._action = action
[docs] def execute(self, connection): command = "@w{action}{target}".format( action=self._action, target=self._target) connection.serial.write(str.encode(command)) ret = connection.serial.read() if ret != b'k': raise ExecutionError( "Error while setting status. Command {}, got {}".format( command, ret))
def __str__(self): return "SetStatusTask: target {} - action {}".format( self._target, self._action)
[docs]class ToggleStatusTask(SetStatusTask): """ Change the toggled status of a light or switch """ def __init__(self, target, toggled): self._toggled = toggled action = 1 if toggled: action = 0 SetStatusTask.__init__( self, target=target, action=action) def __str__(self): return "ToggleStatusTask: target {} - toggled {}".format( self._target, self._toggled)
[docs]class RaiseRollerShutterTask(SetStatusTask): """ Raise a roller shutter """ def __init__(self, target): SetStatusTask.__init__( self, target=target, action=8) def __str__(self): return "RaiseRollerShutterTask: target {}".format( self._target)
[docs]class LowerRollerShutterTask(SetStatusTask): """ Lower a roller shutter """ def __init__(self, target): SetStatusTask.__init__( self, target=target, action=9) def __str__(self): return "LowerRollerShutterTask: target {}".format( self._target)
[docs]class HaltRollerShutterTask(SetStatusTask): """ Halt a roller shutter """ def __init__(self, target): SetStatusTask.__init__( self, target=target, action="A") def __str__(self): return "HaltRollerShutterTask: target {}".format( self._target)
[docs]class GetStatusTask(BasicTask): """ Requests the current status of a device """ def __init__(self, target): self._target = target
[docs] def execute(self, connection): command = b"@W7" + compose_telegram([ str.encode(self._target), b"00", b"15", b"00"]) connection.serial.write(command) ret = connection.serial.read() if ret != b'k': raise ExecutionError( "Error while requesting status. Command {}, got {}".format( command, ret))
def __str__(self): return "GetStatusTask: target {}".format( self._target)