Source code for scsgate.messages
""" This module contains the definition of all the messages known by
SCSGate """
from functools import reduce
[docs]class SCSMessage:
""" Base class for all SCS messages """
def __init__(self, data):
self._data = data
@property
def bytes(self):
""" A list containing all the bytes of the message """
return self._data
@property
def data(self):
""" The raw message """
return "".join(self._data)
@property
def entity(self):
""" The ID of the subject of this message """
return None
[docs]class AckMessage(SCSMessage):
""" Ack message """
def __init__(self):
SCSMessage.__init__(self, "")
def __repr__(self):
return "AckMessage()"
def __str__(self):
return "AckMessage"
[docs]class UnknownMessage(SCSMessage):
""" Message unknown """
def __init__(self, data):
SCSMessage.__init__(self, data)
def __repr__(self):
return "UnknownMessage()"
def __str__(self):
return "UnknownMessage: {0}".format(self._data)
[docs]class StateMessage(SCSMessage):
""" Message issued to notify a change of state """
def __init__(self, data):
SCSMessage.__init__(self, data)
self._source = data[2]
self._status = "off"
if data[4] == "00":
self._status = "on"
def __repr__(self):
return "StateMessage()"
def __str__(self):
return ("StateMessage: source {src} - "
"status {status} - "
"raw: {raw}").format(
src=self._source,
status=self._status,
raw=self._data)
@property
def toggled(self):
""" True if the light is toggled, False otherwise """
return self._status == "on"
@property
def source(self):
""" The source of the message """
return self._source
@property
def status(self):
""" Current status """
return self._status
@property
def entity(self):
""" The ID of the subject of this message """
return self._source
[docs]class CommandMessage(SCSMessage):
""" Message issued to turn on/off a switch """
def __init__(self, data):
SCSMessage.__init__(self, data)
self._destination = data[1]
self._source = data[2]
self._status = "off"
if data[4] == "00":
self._status = "on"
@property
def destination(self):
""" The target of the message """
return self._destination
@property
def entity(self):
""" The ID of the subject of this message """
return self._destination
@property
def source(self):
""" The source of the message """
return self._source
@property
def status(self):
""" Current status """
return self._status
def __repr__(self):
return "CommandMessage()"
def __str__(self):
message = ("CommandMessage: destination {dest} - "
"source {src} - status {status} - "
"raw: {raw}")
return message.format(
src=self._source,
status=self._status,
raw=self._data,
dest=self._destination)
[docs]class ScenarioTriggeredMessage(SCSMessage):
""" Message issued when a scenario switch is pressed """
def __init__(self, data):
SCSMessage.__init__(self, data)
self._source = data[1]
self._scenario = data[4]
@property
def scenario(self):
""" The scenario ID """
return self._scenario
@property
def entity(self):
""" The ID of the subject of this message """
return self._source
@property
def source(self):
""" The source of the message """
return self._source
def __repr__(self):
return "ScenarioTriggeredMessage()"
def __str__(self):
message = ("ScenarioTriggeredMessage: source {src} - "
"scenario {scen} - raw: {raw}")
return message.format(
src=self._source,
raw=self._data,
scen=self._scenario)
[docs]class RequestStatusMessage(SCSMessage):
""" Message sent to request the status of a switch """
def __init__(self, data):
SCSMessage.__init__(self, data)
self._destination = data[1]
self._source = data[2]
@property
def destination(self):
""" The target of the message """
return self._destination
@property
def entity(self):
""" The ID of the subject of this message """
return self._destination
@property
def source(self):
""" The source of the message """
return self._source
def __repr__(self):
return "RequestStatusMessage()"
def __str__(self):
return ("RequestStatusMessage: destination {dest} - "
"source {src} - "
"raw: {raw}").format(
src=self._source,
raw=self._data,
dest=self._destination)
[docs]def parse(data):
""" Parses a raw datagram and return the right type of message """
# convert to string
data = data.decode("ascii")
if len(data) == 2 and data == "A5":
return AckMessage()
# split into bytes
raw = [data[i:i+2] for i in range(len(data)) if i % 2 == 0]
if len(raw) != 7:
return UnknownMessage(raw)
if raw[1] == "B8":
return StateMessage(raw)
elif raw[3] == "12":
return CommandMessage(raw)
elif raw[3] == "14":
return ScenarioTriggeredMessage(raw)
elif raw[3] == "15":
return RequestStatusMessage(raw)
else:
return UnknownMessage(raw)
[docs]def checksum_bytes(data):
""" Returns a XOR of all the bytes specified inside of the given list """
int_values = [int(x, 16) for x in data]
int_xor = reduce(lambda x, y: x ^ y, int_values)
hex_xor = "{:X}".format(int_xor)
if len(hex_xor) % 2 != 0:
hex_xor = "0" + hex_xor
return str.encode(hex_xor)
[docs]def compose_telegram(body):
""" Compose a SCS message
body: list containing the body of the message.
returns: full telegram expressed (bytes instance)
"""
msg = [b"A8"] + body + [checksum_bytes(body)] + [b"A3"]
return str.encode("".join([x.decode() for x in msg]))