Source code for scsgate.connection
""" This module contains an helper class to initiate a connection
with the SCSGate device """
import serial as pyserial
[docs]class Connection:
""" Connection to SCSGate device """
def __init__(self, device, logger):
""" Initialize the class
Arguments:
device: string containing the serial device allocated to SCSGate
logger: instance of logging
"""
self._serial = pyserial.Serial(device, 115200)
logger.info("Clearing buffers")
self._serial.write(b"@b")
ret = self._serial.read(1)
if ret != b"k":
raise RuntimeError("Error while clearing buffers")
# ensure pending operations are terminated (eg: @r, @l)
self._serial.write(b"@c")
ret = self._serial.read()
if ret != b"k":
raise RuntimeError("Error while cancelling pending operations")
logger.info("Enabling ASCII mode")
self._serial.write(b"@MA")
ret = self._serial.read(1)
if ret != b"k":
raise RuntimeError("Error while enabling ASCII mode")
logger.info("Filter Ack messages")
self._serial.write(b"@F2")
ret = self._serial.read(1)
if ret != b"k":
raise RuntimeError("Error while setting filter")
@property
def serial(self):
""" Returns the pyserial.Serial instance """
return self._serial
[docs] def close(self):
""" Closes the connection to the serial port and ensure no pending
operatoin are left """
self._serial.write(b"@c")
self._serial.read()
self._serial.close()