Class meant to handle supported endpoints Json file.
Reloads endpoints, keeps track of API args, and possible 'do' actions
TODO: Write updates to JSON file?
Source code in dod/SupporEndsHandler.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 | class SupportedEndsHandler:
"""
Class meant to handle supported endpoints Json file.
Reloads endpoints, keeps track of API args, and possible 'do' actions
TODO: Write updates to JSON file?
"""
def __init__(self, file : str, conn : HTTPConnection):
self.file = file
self.__queue__ = Queue()
self.__queue_ready__ = Semaphore(value=0)
self.__conn__ = conn
self.supported_ends = {
'get' : [],
'do' : {},
'conn' : []
}
self.transceiver = HTTPTransceiver(self.__conn__, self.__queue__, self.__queue_ready__)
def get_endpoints(self):
return self.supported_ends
def reload_endpoint(self, endpoint : str):
"""
Reloads endpoints by asking server
"""
if endpoint in self.supported_ends['do'].keys():
# check this do endpoint takes an argument
if '?' in endpoint:
# Any float is acceptable for pure moves
if 'MoveX' in endpoint or 'MoveY' in endpoint or "MoveZ" in endpoint:
return
cursed = f"/DoD/get/{endpoint.split('?')[1].split('=')[0]}s"
self.transceiver.send(cursed)
self.supported_ends['do'][endpoint] = self.transceiver.get_response().RESULTS
def reload_all(self):
"""
Reloads ALL endpoints by asking server
"""
try:
f = open(self.file)
except FileNotFoundError:
logger.error("File supported.json not found")
with f:
json_data = json.load(f)["endpoints"]
self.supported_ends['get'] = [x['API'] for x in json_data if x['API'][5:8] == 'get']
self.supported_ends['do'] = {x['API'] : None for x in json_data if x['API'][5:7] == 'do'}
self.supported_ends['conn'] = [x['API'] for x in json_data if 'connect' in x['API'][5:].lower()]
for ent in self.supported_ends['do'].keys():
self.reload_endpoint(ent)
|
reload_all()
Reloads ALL endpoints by asking server
Source code in dod/SupporEndsHandler.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 | def reload_all(self):
"""
Reloads ALL endpoints by asking server
"""
try:
f = open(self.file)
except FileNotFoundError:
logger.error("File supported.json not found")
with f:
json_data = json.load(f)["endpoints"]
self.supported_ends['get'] = [x['API'] for x in json_data if x['API'][5:8] == 'get']
self.supported_ends['do'] = {x['API'] : None for x in json_data if x['API'][5:7] == 'do'}
self.supported_ends['conn'] = [x['API'] for x in json_data if 'connect' in x['API'][5:].lower()]
for ent in self.supported_ends['do'].keys():
self.reload_endpoint(ent)
|
reload_endpoint(endpoint)
Reloads endpoints by asking server
Source code in dod/SupporEndsHandler.py
37
38
39
40
41
42
43
44
45
46
47
48
49 | def reload_endpoint(self, endpoint : str):
"""
Reloads endpoints by asking server
"""
if endpoint in self.supported_ends['do'].keys():
# check this do endpoint takes an argument
if '?' in endpoint:
# Any float is acceptable for pure moves
if 'MoveX' in endpoint or 'MoveY' in endpoint or "MoveZ" in endpoint:
return
cursed = f"/DoD/get/{endpoint.split('?')[1].split('=')[0]}s"
self.transceiver.send(cursed)
self.supported_ends['do'][endpoint] = self.transceiver.get_response().RESULTS
|