Skip to content

Supporendshandler

SupporEndsHandler

Attributes

logger module-attribute

logger = getLogger(__name__)

Classes

SupportedEndsHandler

SupportedEndsHandler(file: str, conn: HTTPConnection)

Class meant to handle supported endpoints Json file. Reloads endpoints, keeps track of API args, and possible 'do' actions

TODO: Write updates to JSON file?
Source code in dod/SupporEndsHandler.py
def __init__(self, file : str, conn : HTTPConnection):
    self.file = file
    self.__queue__ = Queue()
    self.__queue_ready__ = Semaphore(value=0)
    self.__conn__ = conn
    self.supported_ends = {
      'get' : [],
      'do' : {},
      'conn' : []
    }
    self.transceiver = HTTPTransceiver(self.__conn__, self.__queue__, self.__queue_ready__)
Attributes
__conn__ instance-attribute
__conn__ = conn
__queue__ instance-attribute
__queue__ = Queue()
__queue_ready__ instance-attribute
__queue_ready__ = Semaphore(value=0)
file instance-attribute
file = file
supported_ends instance-attribute
supported_ends = {'get': [], 'do': {}, 'conn': []}
transceiver instance-attribute
transceiver = HTTPTransceiver(__conn__, __queue__, __queue_ready__)
Functions
get_endpoints
get_endpoints()
Source code in dod/SupporEndsHandler.py
def get_endpoints(self):
    return self.supported_ends
reload_all
reload_all()

Reloads ALL endpoints by asking server

Source code in dod/SupporEndsHandler.py
def reload_all(self):
    """
        Reloads ALL endpoints by asking server
    """
    try:
      f = open(self.file)
    except FileNotFoundError:
      logger.error("File supported.json not found")

    with f:
      json_data = json.load(f)["endpoints"]
      self.supported_ends['get'] = [x['API'] for x in json_data if x['API'][5:8] == 'get']
      self.supported_ends['do'] = {x['API'] : None for x in json_data if x['API'][5:7] == 'do'}
      self.supported_ends['conn'] = [x['API'] for x in json_data if 'connect' in x['API'][5:].lower()]

    for ent in self.supported_ends['do'].keys():
      self.reload_endpoint(ent)
reload_endpoint
reload_endpoint(endpoint: str)

Reloads endpoints by asking server

Source code in dod/SupporEndsHandler.py
def reload_endpoint(self, endpoint : str):
    """
        Reloads endpoints by asking server
    """
    if endpoint in self.supported_ends['do'].keys():
      # check this do endpoint takes an argument
      if '?' in endpoint:
        # Any float is acceptable for pure moves
        if 'MoveX' in endpoint or 'MoveY' in endpoint or "MoveZ" in endpoint:
          return
        cursed = f"/DoD/get/{endpoint.split('?')[1].split('=')[0]}s"
        self.transceiver.send(cursed)
        self.supported_ends['do'][endpoint] = self.transceiver.get_response().RESULTS