Skip to content

SupporEndsHandler

SupportedEndsHandler

Class meant to handle supported endpoints Json file. Reloads endpoints, keeps track of API args, and possible 'do' actions

TODO: Write updates to JSON file?
Source code in dod/SupporEndsHandler.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class SupportedEndsHandler:
    """
        Class meant to handle supported endpoints Json file.
            Reloads endpoints, keeps track of API args, and possible 'do' actions

            TODO: Write updates to JSON file?
    """
    def __init__(self, file : str, conn : HTTPConnection):
        self.file = file
        self.__queue__ = Queue()
        self.__queue_ready__ = Semaphore(value=0)
        self.__conn__ = conn
        self.supported_ends = {
          'get' : [],
          'do' : {},
          'conn' : []
        }
        self.transceiver = HTTPTransceiver(self.__conn__, self.__queue__, self.__queue_ready__)

    def get_endpoints(self):
        return self.supported_ends

    def reload_endpoint(self, endpoint : str):
        """
            Reloads endpoints by asking server
        """
        if endpoint in self.supported_ends['do'].keys():
          # check this do endpoint takes an argument
          if '?' in endpoint:
            # Any float is acceptable for pure moves
            if 'MoveX' in endpoint or 'MoveY' in endpoint or "MoveZ" in endpoint:
              return
            cursed = f"/DoD/get/{endpoint.split('?')[1].split('=')[0]}s"
            self.transceiver.send(cursed)
            self.supported_ends['do'][endpoint] = self.transceiver.get_response().RESULTS

    def reload_all(self):
        """
            Reloads ALL endpoints by asking server
        """
        try:
          f = open(self.file)
        except FileNotFoundError:
          logger.error("File supported.json not found")

        with f:
          json_data = json.load(f)["endpoints"]
          self.supported_ends['get'] = [x['API'] for x in json_data if x['API'][5:8] == 'get']
          self.supported_ends['do'] = {x['API'] : None for x in json_data if x['API'][5:7] == 'do'}
          self.supported_ends['conn'] = [x['API'] for x in json_data if 'connect' in x['API'][5:].lower()]

        for ent in self.supported_ends['do'].keys():
          self.reload_endpoint(ent)

reload_all()

Reloads ALL endpoints by asking server

Source code in dod/SupporEndsHandler.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def reload_all(self):
    """
        Reloads ALL endpoints by asking server
    """
    try:
      f = open(self.file)
    except FileNotFoundError:
      logger.error("File supported.json not found")

    with f:
      json_data = json.load(f)["endpoints"]
      self.supported_ends['get'] = [x['API'] for x in json_data if x['API'][5:8] == 'get']
      self.supported_ends['do'] = {x['API'] : None for x in json_data if x['API'][5:7] == 'do'}
      self.supported_ends['conn'] = [x['API'] for x in json_data if 'connect' in x['API'][5:].lower()]

    for ent in self.supported_ends['do'].keys():
      self.reload_endpoint(ent)

reload_endpoint(endpoint)

Reloads endpoints by asking server

Source code in dod/SupporEndsHandler.py
37
38
39
40
41
42
43
44
45
46
47
48
49
def reload_endpoint(self, endpoint : str):
    """
        Reloads endpoints by asking server
    """
    if endpoint in self.supported_ends['do'].keys():
      # check this do endpoint takes an argument
      if '?' in endpoint:
        # Any float is acceptable for pure moves
        if 'MoveX' in endpoint or 'MoveY' in endpoint or "MoveZ" in endpoint:
          return
        cursed = f"/DoD/get/{endpoint.split('?')[1].split('=')[0]}s"
        self.transceiver.send(cursed)
        self.supported_ends['do'][endpoint] = self.transceiver.get_response().RESULTS