Skip to content

Door watcher

Watchman

Source code in scripts/door_watcher/door_watcher.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class Watchman:
    cbid = None
    retries = 3

    def __init__(self, door_state, voltage_read, voltage_write):
        self.door_state = EpicsSignalRO(door_state)
        self.mesh_voltage = EpicsSignal(voltage_read,
                                        write_pv=voltage_write)
        print('Using door pv {}'.format(door_state))

    def start(self):
        """Sets up self.callback to be run when door state changes"""
        # Make sure we don't start 2 processes
        self.stop()
        print('Starting door watcher')
        self.door_state.subscribe(self.callback)

    def callback(self, old_value=None, value=None, **kwargs):
        """To be run every time the door state changes"""
        print('Door PV changed from {} to {}'.format(old_value, value))
        if old_value == 1 and value == 0:
            print('Zeroing the mesh voltage because door was opened')
            # Let's retry a few times in case of a network error
            for i in range(self.retries):
                try:
                    self.mesh_voltage.put(0)
                except Exception:
                    pass

    def stop(self):
        """Shutdown the watcher"""
        if self.cbid is not None:
            print('Stopping door watcher')
            self.door_state.unsubscribe(self.cbid)
            self.cbid = None

callback(old_value=None, value=None, **kwargs)

To be run every time the door state changes

Source code in scripts/door_watcher/door_watcher.py
23
24
25
26
27
28
29
30
31
32
33
def callback(self, old_value=None, value=None, **kwargs):
    """To be run every time the door state changes"""
    print('Door PV changed from {} to {}'.format(old_value, value))
    if old_value == 1 and value == 0:
        print('Zeroing the mesh voltage because door was opened')
        # Let's retry a few times in case of a network error
        for i in range(self.retries):
            try:
                self.mesh_voltage.put(0)
            except Exception:
                pass

start()

Sets up self.callback to be run when door state changes

Source code in scripts/door_watcher/door_watcher.py
16
17
18
19
20
21
def start(self):
    """Sets up self.callback to be run when door state changes"""
    # Make sure we don't start 2 processes
    self.stop()
    print('Starting door watcher')
    self.door_state.subscribe(self.callback)

stop()

Shutdown the watcher

Source code in scripts/door_watcher/door_watcher.py
35
36
37
38
39
40
def stop(self):
    """Shutdown the watcher"""
    if self.cbid is not None:
        print('Stopping door watcher')
        self.door_state.unsubscribe(self.cbid)
        self.cbid = None