Class meant to handle supported endpoints Json file.
Reloads endpoints, keeps track of API args, and possible 'do' actions
TODO: Write updates to JSON file?
Source code in dod/SupporEndsHandler.py
| def __init__(self, file : str, conn : HTTPConnection):
self.file = file
self.__queue__ = Queue()
self.__queue_ready__ = Semaphore(value=0)
self.__conn__ = conn
self.supported_ends = {
'get' : [],
'do' : {},
'conn' : []
}
self.transceiver = HTTPTransceiver(self.__conn__, self.__queue__, self.__queue_ready__)
|
Attributes
__conn__
instance-attribute
__queue__
instance-attribute
__queue_ready__
instance-attribute
__queue_ready__ = Semaphore(value=0)
supported_ends
instance-attribute
supported_ends = {'get': [], 'do': {}, 'conn': []}
transceiver
instance-attribute
transceiver = HTTPTransceiver(__conn__, __queue__, __queue_ready__)
Functions
get_endpoints
Source code in dod/SupporEndsHandler.py
| def get_endpoints(self):
return self.supported_ends
|
reload_all
Reloads ALL endpoints by asking server
Source code in dod/SupporEndsHandler.py
| def reload_all(self):
"""
Reloads ALL endpoints by asking server
"""
try:
f = open(self.file)
except FileNotFoundError:
logger.error("File supported.json not found")
with f:
json_data = json.load(f)["endpoints"]
self.supported_ends['get'] = [x['API'] for x in json_data if x['API'][5:8] == 'get']
self.supported_ends['do'] = {x['API'] : None for x in json_data if x['API'][5:7] == 'do'}
self.supported_ends['conn'] = [x['API'] for x in json_data if 'connect' in x['API'][5:].lower()]
for ent in self.supported_ends['do'].keys():
self.reload_endpoint(ent)
|
reload_endpoint
reload_endpoint(endpoint: str)
Reloads endpoints by asking server
Source code in dod/SupporEndsHandler.py
| def reload_endpoint(self, endpoint : str):
"""
Reloads endpoints by asking server
"""
if endpoint in self.supported_ends['do'].keys():
# check this do endpoint takes an argument
if '?' in endpoint:
# Any float is acceptable for pure moves
if 'MoveX' in endpoint or 'MoveY' in endpoint or "MoveZ" in endpoint:
return
cursed = f"/DoD/get/{endpoint.split('?')[1].split('=')[0]}s"
self.transceiver.send(cursed)
self.supported_ends['do'][endpoint] = self.transceiver.get_response().RESULTS
|