Skip to content

Beamline

CoDI

Source code in dod/codi.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
class CoDI: 
    def __init__(self, reload_presets = False):
        """
        Class definition of the DoD codi injector 
        Parameters
        reload_presets : Boolean
            force resetting the presets (overwriting existing hutch presets)
        ----------
        """

        from pcdsdevices.device import ObjectComponent as OCpt
        from pcdsdevices.epics_motor import SmarAct, Motor
        import time

        # CoDI motor PVs loading 
        self.CoDI_rot_left = SmarAct('MFX:MCS2:01:m3', name='CoDI_rot_left')
        self.CoDI_rot_right = SmarAct('MFX:MCS2:01:m1', name='CoDI_rot_right')
        self.CoDI_rot_base = SmarAct('MFX:MCS2:01:m2', name='CoDI_rot_base')
        self.CoDI_trans_z = SmarAct('MFX:MCS2:01:m4', name='CoDI_trans_z')

        #Predefined positions CoDI
        self.CoDI_pos_predefined = dict()

        if reload_presets == True: 
            # self.CoDI_pos_predefined['aspiration'] = (0.0,0.0,0.0,0.0) 
            # self.CoDI_pos_predefined['angled_vert'] = (0.0,45.0,45.0,0.0)
            # self.CoDI_pos_predefined['angled_hor'] = (90.0,45.0,45.0,0.0)

            self.set_CoDI_predefined('aspiration',0.0,0.0,0.0,0.0)
            self.set_CoDI_predefined('angled_vert',0.0,45.0,45.0,0.0)
            self.set_CoDI_predefined('angled_hor',90.0,45.0,45.0,0.0)
        else: 
            all_presets = vars(self.CoDI_rot_left.presets.positions) #Needs to be fixed
            for preset, preset_value in all_presets.items(): 
                try: 
                    # get preset position
                    exec_base = "preset_rot_base = self.CoDI_rot_base.presets.positions." + preset + '.pos'
                    exec_rot_left = "preset_rot_left = self.CoDI_rot_left.presets.positions." + preset + '.pos'
                    exec_rot_right = "preset_rot_right = self.CoDI_rot_right.presets.positions." + preset + '.pos'
                    exec_trans_z = "preset_trans_z = self.CoDI_trans_z.presets.positions." + preset + '.pos'
                    exec(exec_base)
                    exec(exec_rot_left)
                    exec(exec_rot_right)
                    exec(exec_trans_z)

                    # Save to local database
                    print(preset)
                    self.set_CoDI_predefined(preset, preset_rot_base, preset_rot_left, preset_rot_right, preset_trans_z)
                except: 
                    print('skipping preset '+ preset + ', as it is not defined in all motors')    


        # # create config parser handler
        # json_handler = JsonFileHandler(supported_json)
        # # load configs and launch web server
        # json_handler.reload_endpoints()

        # Flag that can be used later on for safety aborts during task execution
        self.safety_abort = False


    def get_CoDI_predefined(self):


        return self.CoDI_pos_predefined


    def update_CoDI_predefined(self): 
        'reloads all the hutch python presets for motors and overwrites local position preset dict'

        #Predefined positions CoDI
        self.CoDI_pos_predefined = dict()

        all_presets = vars(self.CoDI_rot_left.presets.positions) #Needs to be fixed
        for preset in all_presets.keys(): 
            # try: 
            # get preset position
            self.exec_base = "preset_rot_base = self.CoDI_rot_base.presets.positions." + preset + '.pos'
            self.exec_rot_left = "preset_rot_left = self.CoDI_rot_left.presets.positions." + preset + '.pos'
            self.exec_rot_right = "preset_rot_right = self.CoDI_rot_right.presets.positions." + preset + '.pos'
            self.exec_trans_z = "preset_trans_z = self.CoDI_trans_z.presets.positions." + preset + '.pos'
            print(self.exec_base)
            exec(self.exec_base)

            exec(self.exec_rot_left)
            print(self.exec_rot_left)

            exec(self.exec_rot_right)
            print(self.exec_rot_left)

            exec(self.exec_trans_z)
            print(self.exec_trans_z)
            print(preset_trans_z)

            # Save to local database
            print(preset)
            self.set_CoDI_predefined(preset, preset_rot_base, preset_rot_left, preset_rot_right, preset_trans_z)
            # except: 
            #     print('skipping preset '+ preset + ', as it is not defined in all motors')    

    def set_CoDI_predefined(self, name, base, left, right, z):
        """
        defines or updates a predefined combination for CoDI

        Parameters
        name : str
            name of the pre-definition
        base : float
            rotation of base
        left : float
            rotation of left injector
        right : float
            rotation of right injector
        z : float
            translation of right injector
        ----------

        """
        self.CoDI_pos_predefined.update({name: (base,left, right, z)})

        # Presets using MFX presets functionalities 
        self.CoDI_rot_left.presets.add_hutch(name, value = left)
        self.CoDI_rot_right.presets.add_hutch(name, value = right)
        self.CoDI_rot_base.presets.add_hutch(name, value = base)
        self.CoDI_trans_z.presets.add_hutch(name, value = z)



    def get_CoDI_pos(self, precision_digits = 1): 
        """
        gets the colliding droplet injector motor positions as tuple

        Parameters
        precision_digits : int
            precision with which the pre-defined positions are checked 

        ----------
        Return : (tuple, 5)
            motor positions of the CoDI motors. (name of preset, rot_base, rot_left, rot_right, trans_z)
        """
        pos_rot_base  = self.CoDI_rot_base.wm()
        pos_rot_left  = self.CoDI_rot_left.wm()
        pos_rot_right = self.CoDI_rot_right.wm()
        pos_trans_z   = self.CoDI_trans_z.wm()

        # # for testing purposes
        # pos_rot_base  = 0
        # pos_rot_left  = 45
        # pos_rot_right = 45
        # pos_trans_z   = 0

        pos_tuple = (pos_rot_base,pos_rot_left, pos_rot_right,pos_trans_z)

        pos_rounded = tuple([float(round(each_pos,1)) for each_pos in pos_tuple])

        # Test if this is one of the preset positions: 
        pos_name = 'undefined'
        for preset in self.CoDI_pos_predefined:
            preset_rounded = tuple([float(round(each_pos,1)) for each_pos in self.CoDI_pos_predefined[preset]])
            if preset_rounded[:-1] == pos_rounded[:-1]: 
                pos_name = preset

        return pos_name, pos_rot_base, pos_rot_left, pos_rot_right, pos_trans_z


    def set_CoDI_pos(self, pos_name, wait = True): 
        """
        Moves the colliding droplet injector into a pre-defined position. 

        Parameters
        pos_name : string
            name of the pre-defined position
        wait : boolean
            if the robot waits before continuing further steps

        ----------
        Return : 

        """
        # pos_rot_base  = CoDI_base.wm()
        # pos_rot_left  = CoDI_left.wm()
        # pos_rot_right = CoDI_right.wm()
        # pos_trans_z   = CoDI_z.wm()
        import time

        # get target positions
        pos_rot_base, pos_rot_left, pos_rot_right, pos_trans_z = self.CoDI_pos_predefined[pos_name]

        # Move motors

        # Old wayy 
        # self.CoDI_rot_base.mv(pos_rot_base, wait=False)
        # self.CoDI_rot_left.mv(pos_rot_left,  wait=False)
        # self.CoDI_rot_right.mv(pos_rot_right, wait=False)
        # self.CoDI_trans_z.mv(pos_trans_z, wait=False)

        # Move using hutch python presets
        exec_base = "self.CoDI_rot_base.mv_" + pos_name + '()'
        exec(exec_base)
        exec_left = "self.CoDI_rot_left.mv_" + pos_name + '()'
        exec(exec_left)
        exec_right = "self.CoDI_rot_right.mv_" + pos_name + '()'
        exec(exec_right)
        exec_z = "self.CoDI_trans_z.mv_" + pos_name + '()'
        exec(exec_z)

        if wait == True: 
            test_name, test_pos_rot_base, test_pos_rot_left, test_pos_rot_right, test_pos_trans_z = self.get_CoDI_pos()
            i = 0
            while pos_name != test_name: 
                time.sleep(1)
                print('\r waiting for motion to end: %i s' %i, end="\r")
                i = i+1
                test_name, test_pos_rot_base, test_pos_rot_left, test_pos_rot_right, test_pos_trans_z = self.get_CoDI_pos()
            print('Motion ended')


    def set_CoDI_current_pos(self, name):
        """
        defines or updates the current motor combination for CoDI

        Parameters
        name : str
            name of the pre-definition
        ----------

        """
        pos_name, pos_rot_base, pos_rot_left, pos_rot_right, pos_trans_z = self.get_CoDI_pos()
        self.set_CoDI_predefined(name, pos_rot_base, pos_rot_left, pos_rot_right, pos_trans_z)


    def set_CoDI_current_z(self, verbose = True):
        """
        sets the current z position to all preloaded positions
        Useful for global change of z after aligning

        Parameters
        verbose : Boolean
            prints the new positions as a sanity check if True
        ----------

        """
        # get current z position:
        pos_trans_z_new  = self.CoDI_trans_z.wm()

        # get all keys from the positions
        position_keys = self.CoDI_pos_predefined.keys

        #go through all positions and change the z-value to the current z value
        for key in position_keys: 
            pos_name, pos_rot_base, pos_rot_left, pos_rot_right, pos_trans_z = self.CoDI_pos_predefined[key]
            self.set_CoDI_predefined(pos_name,pos_rot_base, pos_rot_left, pos_rot_right, pos_trans_z_new)

        #Print the new predefined positions as a sanity check
        if verbose == True:
            print(self.get_CoDI_predefined)


    def remove_CoDI_pos(self, name):
        """
        removes the defined motor combination for CoDI from local database

        Parameters
        name : str
            name of the pre-definition to be deleted
        ----------

        """
        del self.CoDI_pos_predefined[name]


    def move_z_rel(self, z_rel):
        """
        moves the z position relative to the current position 

        Parameters
        z_rel : float
            relative motion in mm
        ----------

        """
        # get current z position:
        pos_trans_z  = self.CoDI_trans_z.wm()

        # set new position
        self.CoDI_trans_z.umvr(z_rel)

    def move_rot_left_rel(self, rot_rel):
        """
        moves the rotation of the left nozzle relative to the current position in degree

        Parameters
        rot_rel : float
            relative motion in degree
        ----------

        """
        # get current position:
        pos_rot_left  = self.CoDI_rot_left.wm()

        # set new position: 
        self.CoDI_rot_left.umvr(rot_rel)


    def move_rot_right_rel(self, rot_rel):
        """
        moves the rotation of the right nozzle relative to the current position in degree

        Parameters
        rot_rel : float
            relative motion in degree
        ----------

        """
        # get current position:
        pos_rot_right  = self.CoDI_rot_right.wm()

        # set new position: 
        self.CoDI_rot_right.umvr(rot_rel)


    def move_rot_base_rel(self, rot_rel):
        """
        moves the rotation of the base relative to the current position in degree

        Parameters
        rot_rel : float
            relative motion in degree
        ----------

        """
        # get current position:
        pos_rot_base  = self.CoDI_rot_base.wm()

        # set new position: 
        self.CoDI_rot_base.umvr(rot_rel)


    def move_z_abs(self, z_abs):
        """
        moves the z position to the absolute position in mm

        Parameters
        z_abs : float
            absolute motion in mm
        ----------

        """
        # move z position:
        self.CoDI_trans_z.umv(z_abs)


    def move_rot_left_abs(self, rot_abs):
            """
            moves the rotation of the left nozzle to the absolute position in degree

            Parameters
            rot_abs : float
                abs motion in degree
            ----------

            """

            # set new position: 
            self.CoDI_rot_left.umv(rot_abs)


    def move_rot_right_abs(self, rot_abs):
            """
            moves the rotation of the right nozzle to the absolute position in degree

            Parameters
            rot_abs : float
                abs motion in degree
            ----------

            """

            # set new position: 
            self.CoDI_rot_right.umv(rot_abs)


    def move_rot_base_abs(self, rot_abs):
            """
            moves the rotation of the base to the absolute position in degree

            Parameters
            rot_abs : float
                abs motion in degree
            ----------

            """

            # set new position: 
            self.CoDI_rot_base.umv(rot_abs)

__init__(reload_presets=False)

Class definition of the DoD codi injector Parameters reload_presets : Boolean force resetting the presets (overwriting existing hutch presets)


Source code in dod/codi.py
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def __init__(self, reload_presets = False):
    """
    Class definition of the DoD codi injector 
    Parameters
    reload_presets : Boolean
        force resetting the presets (overwriting existing hutch presets)
    ----------
    """

    from pcdsdevices.device import ObjectComponent as OCpt
    from pcdsdevices.epics_motor import SmarAct, Motor
    import time

    # CoDI motor PVs loading 
    self.CoDI_rot_left = SmarAct('MFX:MCS2:01:m3', name='CoDI_rot_left')
    self.CoDI_rot_right = SmarAct('MFX:MCS2:01:m1', name='CoDI_rot_right')
    self.CoDI_rot_base = SmarAct('MFX:MCS2:01:m2', name='CoDI_rot_base')
    self.CoDI_trans_z = SmarAct('MFX:MCS2:01:m4', name='CoDI_trans_z')

    #Predefined positions CoDI
    self.CoDI_pos_predefined = dict()

    if reload_presets == True: 
        # self.CoDI_pos_predefined['aspiration'] = (0.0,0.0,0.0,0.0) 
        # self.CoDI_pos_predefined['angled_vert'] = (0.0,45.0,45.0,0.0)
        # self.CoDI_pos_predefined['angled_hor'] = (90.0,45.0,45.0,0.0)

        self.set_CoDI_predefined('aspiration',0.0,0.0,0.0,0.0)
        self.set_CoDI_predefined('angled_vert',0.0,45.0,45.0,0.0)
        self.set_CoDI_predefined('angled_hor',90.0,45.0,45.0,0.0)
    else: 
        all_presets = vars(self.CoDI_rot_left.presets.positions) #Needs to be fixed
        for preset, preset_value in all_presets.items(): 
            try: 
                # get preset position
                exec_base = "preset_rot_base = self.CoDI_rot_base.presets.positions." + preset + '.pos'
                exec_rot_left = "preset_rot_left = self.CoDI_rot_left.presets.positions." + preset + '.pos'
                exec_rot_right = "preset_rot_right = self.CoDI_rot_right.presets.positions." + preset + '.pos'
                exec_trans_z = "preset_trans_z = self.CoDI_trans_z.presets.positions." + preset + '.pos'
                exec(exec_base)
                exec(exec_rot_left)
                exec(exec_rot_right)
                exec(exec_trans_z)

                # Save to local database
                print(preset)
                self.set_CoDI_predefined(preset, preset_rot_base, preset_rot_left, preset_rot_right, preset_trans_z)
            except: 
                print('skipping preset '+ preset + ', as it is not defined in all motors')    


    # # create config parser handler
    # json_handler = JsonFileHandler(supported_json)
    # # load configs and launch web server
    # json_handler.reload_endpoints()

    # Flag that can be used later on for safety aborts during task execution
    self.safety_abort = False

get_CoDI_pos(precision_digits=1)

gets the colliding droplet injector motor positions as tuple

Parameters precision_digits : int precision with which the pre-defined positions are checked


Return : (tuple, 5) motor positions of the CoDI motors. (name of preset, rot_base, rot_left, rot_right, trans_z)

Source code in dod/codi.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def get_CoDI_pos(self, precision_digits = 1): 
    """
    gets the colliding droplet injector motor positions as tuple

    Parameters
    precision_digits : int
        precision with which the pre-defined positions are checked 

    ----------
    Return : (tuple, 5)
        motor positions of the CoDI motors. (name of preset, rot_base, rot_left, rot_right, trans_z)
    """
    pos_rot_base  = self.CoDI_rot_base.wm()
    pos_rot_left  = self.CoDI_rot_left.wm()
    pos_rot_right = self.CoDI_rot_right.wm()
    pos_trans_z   = self.CoDI_trans_z.wm()

    # # for testing purposes
    # pos_rot_base  = 0
    # pos_rot_left  = 45
    # pos_rot_right = 45
    # pos_trans_z   = 0

    pos_tuple = (pos_rot_base,pos_rot_left, pos_rot_right,pos_trans_z)

    pos_rounded = tuple([float(round(each_pos,1)) for each_pos in pos_tuple])

    # Test if this is one of the preset positions: 
    pos_name = 'undefined'
    for preset in self.CoDI_pos_predefined:
        preset_rounded = tuple([float(round(each_pos,1)) for each_pos in self.CoDI_pos_predefined[preset]])
        if preset_rounded[:-1] == pos_rounded[:-1]: 
            pos_name = preset

    return pos_name, pos_rot_base, pos_rot_left, pos_rot_right, pos_trans_z

move_rot_base_abs(rot_abs)

moves the rotation of the base to the absolute position in degree

Parameters rot_abs : float abs motion in degree


Source code in dod/codi.py
383
384
385
386
387
388
389
390
391
392
393
394
395
def move_rot_base_abs(self, rot_abs):
        """
        moves the rotation of the base to the absolute position in degree

        Parameters
        rot_abs : float
            abs motion in degree
        ----------

        """

        # set new position: 
        self.CoDI_rot_base.umv(rot_abs)

move_rot_base_rel(rot_rel)

moves the rotation of the base relative to the current position in degree

Parameters rot_rel : float relative motion in degree


Source code in dod/codi.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
def move_rot_base_rel(self, rot_rel):
    """
    moves the rotation of the base relative to the current position in degree

    Parameters
    rot_rel : float
        relative motion in degree
    ----------

    """
    # get current position:
    pos_rot_base  = self.CoDI_rot_base.wm()

    # set new position: 
    self.CoDI_rot_base.umvr(rot_rel)

move_rot_left_abs(rot_abs)

moves the rotation of the left nozzle to the absolute position in degree

Parameters rot_abs : float abs motion in degree


Source code in dod/codi.py
353
354
355
356
357
358
359
360
361
362
363
364
365
def move_rot_left_abs(self, rot_abs):
        """
        moves the rotation of the left nozzle to the absolute position in degree

        Parameters
        rot_abs : float
            abs motion in degree
        ----------

        """

        # set new position: 
        self.CoDI_rot_left.umv(rot_abs)

move_rot_left_rel(rot_rel)

moves the rotation of the left nozzle relative to the current position in degree

Parameters rot_rel : float relative motion in degree


Source code in dod/codi.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def move_rot_left_rel(self, rot_rel):
    """
    moves the rotation of the left nozzle relative to the current position in degree

    Parameters
    rot_rel : float
        relative motion in degree
    ----------

    """
    # get current position:
    pos_rot_left  = self.CoDI_rot_left.wm()

    # set new position: 
    self.CoDI_rot_left.umvr(rot_rel)

move_rot_right_abs(rot_abs)

moves the rotation of the right nozzle to the absolute position in degree

Parameters rot_abs : float abs motion in degree


Source code in dod/codi.py
368
369
370
371
372
373
374
375
376
377
378
379
380
def move_rot_right_abs(self, rot_abs):
        """
        moves the rotation of the right nozzle to the absolute position in degree

        Parameters
        rot_abs : float
            abs motion in degree
        ----------

        """

        # set new position: 
        self.CoDI_rot_right.umv(rot_abs)

move_rot_right_rel(rot_rel)

moves the rotation of the right nozzle relative to the current position in degree

Parameters rot_rel : float relative motion in degree


Source code in dod/codi.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def move_rot_right_rel(self, rot_rel):
    """
    moves the rotation of the right nozzle relative to the current position in degree

    Parameters
    rot_rel : float
        relative motion in degree
    ----------

    """
    # get current position:
    pos_rot_right  = self.CoDI_rot_right.wm()

    # set new position: 
    self.CoDI_rot_right.umvr(rot_rel)

move_z_abs(z_abs)

moves the z position to the absolute position in mm

Parameters z_abs : float absolute motion in mm


Source code in dod/codi.py
339
340
341
342
343
344
345
346
347
348
349
350
def move_z_abs(self, z_abs):
    """
    moves the z position to the absolute position in mm

    Parameters
    z_abs : float
        absolute motion in mm
    ----------

    """
    # move z position:
    self.CoDI_trans_z.umv(z_abs)

move_z_rel(z_rel)

moves the z position relative to the current position

Parameters z_rel : float relative motion in mm


Source code in dod/codi.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def move_z_rel(self, z_rel):
    """
    moves the z position relative to the current position 

    Parameters
    z_rel : float
        relative motion in mm
    ----------

    """
    # get current z position:
    pos_trans_z  = self.CoDI_trans_z.wm()

    # set new position
    self.CoDI_trans_z.umvr(z_rel)

remove_CoDI_pos(name)

removes the defined motor combination for CoDI from local database

Parameters name : str name of the pre-definition to be deleted


Source code in dod/codi.py
259
260
261
262
263
264
265
266
267
268
269
def remove_CoDI_pos(self, name):
    """
    removes the defined motor combination for CoDI from local database

    Parameters
    name : str
        name of the pre-definition to be deleted
    ----------

    """
    del self.CoDI_pos_predefined[name]

set_CoDI_current_pos(name)

defines or updates the current motor combination for CoDI

Parameters name : str name of the pre-definition


Source code in dod/codi.py
218
219
220
221
222
223
224
225
226
227
228
229
def set_CoDI_current_pos(self, name):
    """
    defines or updates the current motor combination for CoDI

    Parameters
    name : str
        name of the pre-definition
    ----------

    """
    pos_name, pos_rot_base, pos_rot_left, pos_rot_right, pos_trans_z = self.get_CoDI_pos()
    self.set_CoDI_predefined(name, pos_rot_base, pos_rot_left, pos_rot_right, pos_trans_z)

set_CoDI_current_z(verbose=True)

sets the current z position to all preloaded positions Useful for global change of z after aligning

Parameters verbose : Boolean prints the new positions as a sanity check if True


Source code in dod/codi.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
def set_CoDI_current_z(self, verbose = True):
    """
    sets the current z position to all preloaded positions
    Useful for global change of z after aligning

    Parameters
    verbose : Boolean
        prints the new positions as a sanity check if True
    ----------

    """
    # get current z position:
    pos_trans_z_new  = self.CoDI_trans_z.wm()

    # get all keys from the positions
    position_keys = self.CoDI_pos_predefined.keys

    #go through all positions and change the z-value to the current z value
    for key in position_keys: 
        pos_name, pos_rot_base, pos_rot_left, pos_rot_right, pos_trans_z = self.CoDI_pos_predefined[key]
        self.set_CoDI_predefined(pos_name,pos_rot_base, pos_rot_left, pos_rot_right, pos_trans_z_new)

    #Print the new predefined positions as a sanity check
    if verbose == True:
        print(self.get_CoDI_predefined)

set_CoDI_pos(pos_name, wait=True)

Moves the colliding droplet injector into a pre-defined position.

Parameters pos_name : string name of the pre-defined position wait : boolean if the robot waits before continuing further steps


Return :

Source code in dod/codi.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def set_CoDI_pos(self, pos_name, wait = True): 
    """
    Moves the colliding droplet injector into a pre-defined position. 

    Parameters
    pos_name : string
        name of the pre-defined position
    wait : boolean
        if the robot waits before continuing further steps

    ----------
    Return : 

    """
    # pos_rot_base  = CoDI_base.wm()
    # pos_rot_left  = CoDI_left.wm()
    # pos_rot_right = CoDI_right.wm()
    # pos_trans_z   = CoDI_z.wm()
    import time

    # get target positions
    pos_rot_base, pos_rot_left, pos_rot_right, pos_trans_z = self.CoDI_pos_predefined[pos_name]

    # Move motors

    # Old wayy 
    # self.CoDI_rot_base.mv(pos_rot_base, wait=False)
    # self.CoDI_rot_left.mv(pos_rot_left,  wait=False)
    # self.CoDI_rot_right.mv(pos_rot_right, wait=False)
    # self.CoDI_trans_z.mv(pos_trans_z, wait=False)

    # Move using hutch python presets
    exec_base = "self.CoDI_rot_base.mv_" + pos_name + '()'
    exec(exec_base)
    exec_left = "self.CoDI_rot_left.mv_" + pos_name + '()'
    exec(exec_left)
    exec_right = "self.CoDI_rot_right.mv_" + pos_name + '()'
    exec(exec_right)
    exec_z = "self.CoDI_trans_z.mv_" + pos_name + '()'
    exec(exec_z)

    if wait == True: 
        test_name, test_pos_rot_base, test_pos_rot_left, test_pos_rot_right, test_pos_trans_z = self.get_CoDI_pos()
        i = 0
        while pos_name != test_name: 
            time.sleep(1)
            print('\r waiting for motion to end: %i s' %i, end="\r")
            i = i+1
            test_name, test_pos_rot_base, test_pos_rot_left, test_pos_rot_right, test_pos_trans_z = self.get_CoDI_pos()
        print('Motion ended')

set_CoDI_predefined(name, base, left, right, z)

defines or updates a predefined combination for CoDI

Parameters name : str name of the pre-definition base : float rotation of base left : float rotation of left injector right : float rotation of right injector z : float translation of right injector


Source code in dod/codi.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def set_CoDI_predefined(self, name, base, left, right, z):
    """
    defines or updates a predefined combination for CoDI

    Parameters
    name : str
        name of the pre-definition
    base : float
        rotation of base
    left : float
        rotation of left injector
    right : float
        rotation of right injector
    z : float
        translation of right injector
    ----------

    """
    self.CoDI_pos_predefined.update({name: (base,left, right, z)})

    # Presets using MFX presets functionalities 
    self.CoDI_rot_left.presets.add_hutch(name, value = left)
    self.CoDI_rot_right.presets.add_hutch(name, value = right)
    self.CoDI_rot_base.presets.add_hutch(name, value = base)
    self.CoDI_trans_z.presets.add_hutch(name, value = z)

update_CoDI_predefined()

reloads all the hutch python presets for motors and overwrites local position preset dict

Source code in dod/codi.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def update_CoDI_predefined(self): 
    'reloads all the hutch python presets for motors and overwrites local position preset dict'

    #Predefined positions CoDI
    self.CoDI_pos_predefined = dict()

    all_presets = vars(self.CoDI_rot_left.presets.positions) #Needs to be fixed
    for preset in all_presets.keys(): 
        # try: 
        # get preset position
        self.exec_base = "preset_rot_base = self.CoDI_rot_base.presets.positions." + preset + '.pos'
        self.exec_rot_left = "preset_rot_left = self.CoDI_rot_left.presets.positions." + preset + '.pos'
        self.exec_rot_right = "preset_rot_right = self.CoDI_rot_right.presets.positions." + preset + '.pos'
        self.exec_trans_z = "preset_trans_z = self.CoDI_trans_z.presets.positions." + preset + '.pos'
        print(self.exec_base)
        exec(self.exec_base)

        exec(self.exec_rot_left)
        print(self.exec_rot_left)

        exec(self.exec_rot_right)
        print(self.exec_rot_left)

        exec(self.exec_trans_z)
        print(self.exec_trans_z)
        print(preset_trans_z)

        # Save to local database
        print(preset)
        self.set_CoDI_predefined(preset, preset_rot_base, preset_rot_left, preset_rot_right, preset_trans_z)

Debug

Source code in mfx/debug.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
class Debug:
    def __init__(self):
        import os
        full_ioc_serverlist = os.popen(f"netconfig search ioc-mfx* --brief").read().splitlines()
        full_daq_serverlist = os.popen(f"netconfig search daq-mfx* --brief").read().splitlines()
        self.ioc_serverlist = [ioc for ioc in full_ioc_serverlist if not ioc.endswith(
            "-ipmi") and not ioc.endswith("-fez") and not ioc.endswith("-ics")]
        self.daq_serverlist = [ioc for ioc in full_daq_serverlist if not ioc.endswith(
            "-ipmi") and not ioc.endswith("-fez") and not ioc.endswith("-ana")]


    def awr(self, hutch='mfx'):
        """
        Checks if the beamline is ready to take beam

        Parameters
        ----------
        hutch: str, optional
            Specify the hutch you want to check. Default is MFX because it is the best
        """
        import os
        import logging
        logging.info(f"{hutch} Beamline Check")
        os.system(f"/cds/group/pcds/pyps/apps/hutch-python/mfx/scripts/awr {hutch}")


    def motor_check(self):
        import os
        import logging
        logging.info(f"Powering up all available motors")
        os.system(f"/cds/group/pcds/pyps/apps/hutch-python/mfx/scripts/mfxpowerup.sh")


    def check_server(self, server):
        """
        Checks the status of an individual server

        Parameters
        ----------
        server: str, required
            Specify the server name to check. Use debug.server_list('all') to see all servers
        """
        import os
        import logging
        status = None
        if str(server) in self.ioc_serverlist or str(server) in self.daq_serverlist:
            logging.info(f"Checking the status of: {server}")
            status = os.popen(f"/reg/g/pcds/engineering_tools/latest-released/scripts/serverStat {server} status").read().splitlines()
        else:
            logging.info(f"The server you are looking for does not exist please select one of the following")
            self.server_list('ioc')
            self.server_list('daq')
        return status


    def cycle_server(self, server):
        """
        Cycles an individual server

        Parameters
        ----------
        server: str, required
            Specify the server name to cycle. Use debug.server_list('all') to see all servers
        """
        import os
        import logging
        if str(server) in self.ioc_serverlist or str(server) in self.daq_serverlist:
            logging.info(f"Power cycling: {server}")
            os.system(f"/reg/g/pcds/engineering_tools/latest-released/scripts/serverStat {server} cycle")
        else:
            logging.info(f"The server you are looking for does not exist please select one of the following")
            self.server_list('ioc')
            self.server_list('daq')


    def check_all_servers(self, server_type):
        """
        Checks the status of all servers local to MFX

        Parameters
        ----------
        server_type: str, required
            Specify the server type input either 'all', 'ioc', or 'daq
        """
        import logging
        self.error_servers = []
        if str(server_type) == 'all':
            logging.info(f"You've decided to check all {len(self.ioc_serverlist) + len(self.daq_serverlist)} servers.")
            for server in self.ioc_serverlist:
                status = self.check_server(str(server))
                if status[0].endswith('on') and status[1].split(", ")[0].endswith('1)') and status[2].endswith('up'):
                    logging.info(f"Server {server} has passed all tests")
                else:
                    logging.error(f"Server {server} has failed one or more tests and is added to the broken list")
                    logging.error(status)
                    self.error_servers.append(server)

            for server in self.daq_serverlist:
                status = self.check_server(str(server))
                if status[0].endswith('on') and status[1].split(", ")[0].endswith('1)') and status[2].endswith('up'):
                    logging.info(f"Server {server} has passed all tests")
                else:
                    logging.error(f"Server {server} has failed one or more tests and is added to the broken list")
                    logging.error(status)
                    self.error_servers.append(server)

        elif str(server_type) == 'ioc':
            logging.info(f"You've decided to check all {len(self.ioc_serverlist)} ioc servers.")
            for server in self.ioc_serverlist:
                status = self.check_server(str(server))
                if status[0].endswith('on') and status[1].endswith('1)') and status[2].endswith('up'):
                    logging.info(f"Server {server} has passed all tests")
                else:
                    logging.error(f"Server {server} has failed one or more tests and is added to the broken list")
                    self.error_servers.append(server)

        elif str(server_type) == 'daq':
            logging.info(f"You've decided to check all {len(self.daq_serverlist)} daq servers.")
            for server in self.daq_serverlist:
                status = self.check_server(str(server))
                if status[0].endswith('on') and status[1].endswith('1)') and status[2].endswith('up'):
                    logging.info(f"Server {server} has passed all tests")
                else:
                    logging.error(f"Server {server} has failed one or more tests and is added to the broken list")
                    self.error_servers.append(server)
        else:
            logging.warning(f"There is no server of the type you requested. Please use either ioc or daq or all.")

        if len(self.error_servers) != 0:
            logging.warning(f"There is something wrong with the following servers.")
            for server in self.error_servers:
                print(f'{server}')
            cycle = input("\nWould you like to power cycle all error servers? (y/n)? ")

            if cycle.lower() == "y":
                logging.info(f"You've decided to cycle all {len(self.error_servers)} broken servers.")
                for server in self.error_servers:
                    self.cycle_server(str(server))
            else:
                logging.info(f"You've decided not to cycle {len(self.error_servers)} broken servers.")
        else:
            logging.info(f"All {len(self.error_servers)} servers are ready to rock.")

        return self.error_servers


    def server_list(self, server_type):
        """
        Lists servers local to MFX

        Parameters
        ----------
        server_type: str, required
            Specify the server type input either 'all', 'ioc', or 'daq
        """
        import logging
        if str(server_type) == 'all':
            print('IOC SERVERS\n#########################')
            for server in self.ioc_serverlist:
                print(f'{server}')
            print('\nDAQ SERVERS\n#########################')
            for server in self.daq_serverlist:
                print(f'{server}')
        elif str(server_type) == 'ioc':
            print('IOC SERVERS\n#########################')
            for server in self.ioc_serverlist:
                print(f'{server}')
        elif str(server_type) == 'daq':
            print('DAQ SERVERS\n#########################')
            for server in self.daq_serverlist:
                print(f'{server}')
        else:
            logging.warning(f"There is no server of the type you requested. Please use either ioc or daq.")

awr(hutch='mfx')

Checks if the beamline is ready to take beam

Parameters

hutch: str, optional Specify the hutch you want to check. Default is MFX because it is the best

Source code in mfx/debug.py
12
13
14
15
16
17
18
19
20
21
22
23
24
def awr(self, hutch='mfx'):
    """
    Checks if the beamline is ready to take beam

    Parameters
    ----------
    hutch: str, optional
        Specify the hutch you want to check. Default is MFX because it is the best
    """
    import os
    import logging
    logging.info(f"{hutch} Beamline Check")
    os.system(f"/cds/group/pcds/pyps/apps/hutch-python/mfx/scripts/awr {hutch}")

check_all_servers(server_type)

Checks the status of all servers local to MFX

Parameters

server_type: str, required Specify the server type input either 'all', 'ioc', or 'daq

Source code in mfx/debug.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def check_all_servers(self, server_type):
    """
    Checks the status of all servers local to MFX

    Parameters
    ----------
    server_type: str, required
        Specify the server type input either 'all', 'ioc', or 'daq
    """
    import logging
    self.error_servers = []
    if str(server_type) == 'all':
        logging.info(f"You've decided to check all {len(self.ioc_serverlist) + len(self.daq_serverlist)} servers.")
        for server in self.ioc_serverlist:
            status = self.check_server(str(server))
            if status[0].endswith('on') and status[1].split(", ")[0].endswith('1)') and status[2].endswith('up'):
                logging.info(f"Server {server} has passed all tests")
            else:
                logging.error(f"Server {server} has failed one or more tests and is added to the broken list")
                logging.error(status)
                self.error_servers.append(server)

        for server in self.daq_serverlist:
            status = self.check_server(str(server))
            if status[0].endswith('on') and status[1].split(", ")[0].endswith('1)') and status[2].endswith('up'):
                logging.info(f"Server {server} has passed all tests")
            else:
                logging.error(f"Server {server} has failed one or more tests and is added to the broken list")
                logging.error(status)
                self.error_servers.append(server)

    elif str(server_type) == 'ioc':
        logging.info(f"You've decided to check all {len(self.ioc_serverlist)} ioc servers.")
        for server in self.ioc_serverlist:
            status = self.check_server(str(server))
            if status[0].endswith('on') and status[1].endswith('1)') and status[2].endswith('up'):
                logging.info(f"Server {server} has passed all tests")
            else:
                logging.error(f"Server {server} has failed one or more tests and is added to the broken list")
                self.error_servers.append(server)

    elif str(server_type) == 'daq':
        logging.info(f"You've decided to check all {len(self.daq_serverlist)} daq servers.")
        for server in self.daq_serverlist:
            status = self.check_server(str(server))
            if status[0].endswith('on') and status[1].endswith('1)') and status[2].endswith('up'):
                logging.info(f"Server {server} has passed all tests")
            else:
                logging.error(f"Server {server} has failed one or more tests and is added to the broken list")
                self.error_servers.append(server)
    else:
        logging.warning(f"There is no server of the type you requested. Please use either ioc or daq or all.")

    if len(self.error_servers) != 0:
        logging.warning(f"There is something wrong with the following servers.")
        for server in self.error_servers:
            print(f'{server}')
        cycle = input("\nWould you like to power cycle all error servers? (y/n)? ")

        if cycle.lower() == "y":
            logging.info(f"You've decided to cycle all {len(self.error_servers)} broken servers.")
            for server in self.error_servers:
                self.cycle_server(str(server))
        else:
            logging.info(f"You've decided not to cycle {len(self.error_servers)} broken servers.")
    else:
        logging.info(f"All {len(self.error_servers)} servers are ready to rock.")

    return self.error_servers

check_server(server)

Checks the status of an individual server

Parameters

server: str, required Specify the server name to check. Use debug.server_list('all') to see all servers

Source code in mfx/debug.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def check_server(self, server):
    """
    Checks the status of an individual server

    Parameters
    ----------
    server: str, required
        Specify the server name to check. Use debug.server_list('all') to see all servers
    """
    import os
    import logging
    status = None
    if str(server) in self.ioc_serverlist or str(server) in self.daq_serverlist:
        logging.info(f"Checking the status of: {server}")
        status = os.popen(f"/reg/g/pcds/engineering_tools/latest-released/scripts/serverStat {server} status").read().splitlines()
    else:
        logging.info(f"The server you are looking for does not exist please select one of the following")
        self.server_list('ioc')
        self.server_list('daq')
    return status

cycle_server(server)

Cycles an individual server

Parameters

server: str, required Specify the server name to cycle. Use debug.server_list('all') to see all servers

Source code in mfx/debug.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def cycle_server(self, server):
    """
    Cycles an individual server

    Parameters
    ----------
    server: str, required
        Specify the server name to cycle. Use debug.server_list('all') to see all servers
    """
    import os
    import logging
    if str(server) in self.ioc_serverlist or str(server) in self.daq_serverlist:
        logging.info(f"Power cycling: {server}")
        os.system(f"/reg/g/pcds/engineering_tools/latest-released/scripts/serverStat {server} cycle")
    else:
        logging.info(f"The server you are looking for does not exist please select one of the following")
        self.server_list('ioc')
        self.server_list('daq')

server_list(server_type)

Lists servers local to MFX

Parameters

server_type: str, required Specify the server type input either 'all', 'ioc', or 'daq

Source code in mfx/debug.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def server_list(self, server_type):
    """
    Lists servers local to MFX

    Parameters
    ----------
    server_type: str, required
        Specify the server type input either 'all', 'ioc', or 'daq
    """
    import logging
    if str(server_type) == 'all':
        print('IOC SERVERS\n#########################')
        for server in self.ioc_serverlist:
            print(f'{server}')
        print('\nDAQ SERVERS\n#########################')
        for server in self.daq_serverlist:
            print(f'{server}')
    elif str(server_type) == 'ioc':
        print('IOC SERVERS\n#########################')
        for server in self.ioc_serverlist:
            print(f'{server}')
    elif str(server_type) == 'daq':
        print('DAQ SERVERS\n#########################')
        for server in self.daq_serverlist:
            print(f'{server}')
    else:
        logging.warning(f"There is no server of the type you requested. Please use either ioc or daq.")

DoD

Source code in dod/dod.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
class DoD: 
    def __init__(self, modules = 'None', ip = "172.21.72.187", port = 9999, supported_json = '/cds/group/pcds/pyps/apps/hutch-python/mfx/dod/supported.json'
):
        """
        Class definition of the DoD robot
        Parameters
        ----------
        modules : string
            Defines the optional modules of the robot. 
            Options: 'None', 'codi', , 
        ip = "172.21.72.187" , port = 9999, supported_json = "supported.json"            
        """
        from dod.DropsDriver import myClient
        from dod.JsonFileHandler import JsonFileHandler
        from dod.ServerResponse import ServerResponse

        import time

        # Create object 
        # pytest encourages this pattern, apologies.
        # ip = "172.21.72.187" #"172.21.148.101"
        # port = 9999
        # supported_json = '/cds/group/pcds/pyps/apps/hutch-python/mfx/dod/supported.json'

        # User input parameters: 
        # Safety parameters in hutch coordinate system. 
        # Note: hutch (x,y,z) = robot (x,-z, y) 
        # 
        self.y_min     = 10000 # minimum value in y.
        self.y_safety  = 50000 # value in y, above which the robot can only be in vertical configuration
        self.y_max     = 50000 # maximum value in y

        #Initialize safety regions for horizontal and vertical rotation: 
        self.forbidden_regions_horizontal = []
        self.forbidden_regions_vertical = []
        #minimum region: 
        self.set_forbidden_region(0, 300000, 0, self.y_min,rotation_state='both')
        #maximum region: 
        self.set_forbidden_region(0, 300000, self.y_max, 500000,rotation_state='both')
        #region where horizontal rotation is forbidden: 
        self.set_forbidden_region(0, 300000,  self.y_safety, self.y_max,rotation_state='horizontal')

        # Initializing the robot client that is used for communication
        self.client = myClient(ip=ip, port=port, supported_json=supported_json, reload=False)

        # create config parser handler
        json_handler = JsonFileHandler(supported_json)
        # load configs and launch web server
        json_handler.reload_endpoints()

        # Flag that can be used later on for safety aborts during task execution
        self.safety_abort = False
        if modules == 'codi': 
            from dod.codi import CoDI
            self.codi = CoDI()

        # Timing section
        from pcdsdevices.evr import Trigger
        self.delay = None

        # Trigger objects
        self.trigger_Xray = Trigger('MFX:LAS:EVR:01:TRIG7', name='trigger_X-ray_simulator')
        self.trigger_nozzle_1 = Trigger('MFX:LAS:EVR:01:TRIG2', name='trigger_nozzle_1')
        self.trigger_nozzle_2 = Trigger('MFX:LAS:EVR:01:TRIG3', name='trigger_nozzle_2')
        self.trigger_LED = Trigger('MFX:LAS:EVR:01:TRIG1', name='trigger_LED_array')

        # Timing parameter
        self.timing_Xray = self.trigger_Xray.ns_delay.get()
        self.timing_nozzle_1 = self.trigger_nozzle_1.ns_delay.get()
        self.timing_nozzle_2 = self.trigger_nozzle_2.ns_delay.get()
        self.timing_LED = self.trigger_LED.ns_delay.get()
        self.timing_delay_sciPulse = 60600
        self.timing_delay_LED = 1000 #delay of LED relative to X-ray timing
        self.timing_delay_reaction = 0
        self.timing_delay_nozzle_1 = self.timing_Xray - self.timing_nozzle_1 - self.timing_delay_sciPulse - self.timing_delay_reaction
        self.timing_delay_nozzle_2 = self.timing_Xray - self.timing_nozzle_2 - self.timing_delay_sciPulse - self.timing_delay_reaction


    def stop_task(self, verbose = True):
        """
        Stop task while running
        ** ISSUES **
        -  When stop task  is called, the Robot stays in "BUSY" Status.
        Parameters
        ----------
        verbose : boolean
            Defines whether the function returns the full output, or only the results
        Returns: 
        r : 
            status readback when aborted
        """
        r = self.client.connect("Test")
        self.safety_abort = False
        r = self.client.stop_task()
        r = self.client.disconnect()
        if verbose == True: 
            return r


    def clear_abort(self, verbose = True):
        """
        clear abort flag
        Parameters
        verbose : boolean
           Defines whether the function returns the full output, or only the results
        ----------
        Returns: 
        r : 
            status readback after error cleared
        '''
        """
        r = self.client.connect("Test")
        r = self.client.get_status()
        self.safety_abort = False
        rr = self.client.disconnect()

        if verbose == True: 
            return r


    # def clear_popup_window(self, verbose = True):
    #     """
    #     clears a popup window that might pop up on the robot gui
    #     Parameters
    #     verbose : boolean
    #        Defines whether the function returns the full output, or only the results
    #     ----------
    #     Returns: 
    #     r : 
    #         status readback after error cleared
    #     '''
    #     """
    #     r = self.client.connect("Test")
    #     r = self.client.get_status()

    #     self.client.close_dialog(reference, selection)

    #     if verbose == True: 
    #         return r


    def get_status(self, verbose = False):
        """
        returns the robot state

        Parameters
        verbose : boolean
           Defines whether the function returns the full output, or only the results
        ----------
        Returns: 
        r : dict
            different states of the robot
        """
        rr = self.client.connect("Test")
        r = self.client.get_status()
        # expected_keys = [
        #     'Position',
        #     'RunningTask',
        #     'Dialog',
        #     'LastProbe',
        #     'Humidity',
        #     'Temperature',
        #     'BathTemp',
        #     ]
        rr = self.client.disconnect()
        if verbose == True: 
            return r
        else: 
            return r.RESULTS


    def busy_wait(self, timeout):
        '''
            Busy wait untill timeout value is reached,
            timeout : sec
            returns true if timeout occured
        '''
        import time
        start = time.time()
        r = self.client.get_status()
        delta = 0

        while(r.STATUS['Status'] == "Busy"):
            if delta > timeout:
                return True

            time.sleep(0.1) #Wait a ms to stop spamming robot
            r = self.client.get_status()
            delta = time.time() - start    
        return False


    # def __del__(self):
    #     # close network connection
    #     self.client.conn.close()


    def get_task_details(self, task_name, verbose = False):
        """
            This gets the details of a task from the robot to see the scripted routines
            Parameters
            task_name : string
                Name of the task that we want to get
            verbose : boolean
                Defines whether the function returns the full output, or only the results
            ----------
            Returns: 
            r : 
                returns the robot tasks
        """
        rr = self.client.connect("Test")
        r = self.client.get_task_details(task_name)
        rr = self.client.disconnect()
        if verbose == True: 
            return r
        else: 
            return r.RESULTS


    def get_task_names(self, verbose = False):
        """
            This gets the names of available tasks from the robot
            Parameters
            task_name : string
                Name of the task that we want to get
            verbose : boolean
                Defines whether the function returns the full output, or only the results
            ----------
            Returns: 
            r : dict
                returns the robot tasks
        """
        # Check if reponse is not an empty array or any errors occured
        rr = self.client.connect("Test")
        r = self.client.get_task_names()
        rr = self.client.disconnect()
        if verbose == True: 
            return r
        else: 
            return r.RESULTS


    def get_current_position(self, verbose = False):
        '''
        Returns current robot position
        name and properties of the last selected position, together with the real current position coordinates
        Parameters

        verbose : boolean
                Defines whether the function returns the full output, or only the results
            ----------
        Returns: 
        r : 
            returns the current position.         
        # expected_keys = [
        #         'CurrentPosition',
        #         'Position',
        #         'PositionReal',
        #         ]
        '''
        rr = self.client.connect("Test")
        r = self.client.get_current_positions()
        rr = self.client.disconnect()
        if verbose == True: 
            return r
        else: 
            return r.RESULTS


    def get_nozzle_status(self, verbose = False):
        '''
        Returns current nozzle parameters position
        Parameters

        verbose : boolean
                Defines whether the function returns the full output, or only the results
        ----------
        Returns: 
        r : 
            returns the current nozzle parameters.         
        #         expected_keys = [
                "Activated Nozzles",
                "Selected Nozzles",
                "ID,Volt,Pulse,Freq,Volume",  # Intreseting? all one key
                "Dispensing",
                ]
        '''
        rr = self.client.connect("Test")
        r = self.client.get_nozzle_status()
        rr = self.client.disconnect()
        if verbose == True: 
            return r
        else: 
            return r.RESULTS


    def set_nozzle_dispensing(self, mode = "Off", verbose = False):
        '''
        Sets the dispensing, either "Free", "Triggered", or "Off"
        Parameters
        mode : string 
            either "free", "triggered", or "off"
        verbose : boolean
                Defines whether the function returns the full output, or only the results
        ----------
        Returns: 
        r : 
        '''
        rr = self.client.connect("Test")
        if mode == 'Free': 
            r = self.client.dispensing('Free')
        elif mode == 'Triggered':
            r = self.client.dispensing('Triggered')
        else: 
            #turns active nozzles off. Safer if all nozzles would be turned off
            r = self.client.dispensing('Off')
            for i in [1,2,3,4]: 
                r = self.client.select_nozzle(i)
                r = self.client.dispensing('Off')

        rr = self.client.disconnect()
        if verbose == True: 
            return r
        else: 
            return r.RESULTS


    def do_move(self, position, safety_test = False, verbose = False):
        '''
            Moves robot to new position

        Parameters
        position : string
            Position name to which the robot is supposed to move
        safety_test : Boolean
            question whether a safety test is to be performed or not
            True: Test will be performed
            False: Not performed
        verbose : boolean
                Defines whether the function returns the full output, or only the results
            ----------
        Returns: 
        r : current position
        '''

        r = self.client.connect("Test")
        r = self.client.get_current_positions()
        current_real_position = r.RESULTS['PositionReal']

        if safety_test == False:  
            r = self.client.move(position)
        else: 
            print('safety test of move has yet to be implemented')

        # WAIT FOR MOVEMENT TO BE DONE
        self.busy_wait(15)

        r = self.client.get_current_positions()
        new_real_position = r.RESULTS['PositionReal']

        rr = self.client.disconnect()
        if verbose == True: 
            return r
        else: 
            return r.RESULTS


    def move_x_abs(self, position_x, safety_test = False, verbose = False):
        '''
            Robot coordinate system!!!: 
            Moves robot to new absolute x position. 
            No safety test. 

        Parameters
        position : int
            Absolute position in um(?) to which the robot is supposed to move
        safety_test : Boolean
            question whether a safety test is to be performed or not
            True: Test will be performed
            False: Not performed
        verbose : boolean
                Defines whether the function returns the full output, or only the results
            ----------
        Returns: 
        r : current position
        '''

        r = self.client.connect("Test")
        r = self.client.get_current_positions()
        current_real_position = r.RESULTS['PositionReal']
        x_current, y_current, z_current = current_real_position

        if safety_test == False:  
            r = self.client.move_x(position_x)
        else: 
            print('safety test of move has yet to be implemented')
            if self.test_forbidden_region(position_x, y_current): 
                r = self.client.move_x(position_x)


        # WAIT FOR MOVEMENT TO BE DONE
        self.busy_wait(25)

        # r = self.client.get_current_positions()
        # new_real_position = r.RESULTS['PositionReal']

        rr = self.client.disconnect()
        if verbose == True: 
            return r
        else: 
            return r.RESULTS


    def move_y_abs(self, position_y, safety_test = False, verbose = False):
        '''
            Robot coordinate system!!!: 
            Moves robot to new absolute x position. 
            No safety test. 

        Parameters
        position : int
            Absolute position in um(?) to which the robot is supposed to move
        safety_test : Boolean
            question whether a safety test is to be performed or not
            True: Test will be performed
            False: Not performed
        verbose : boolean
                Defines whether the function returns the full output, or only the results
            ----------
        Returns: 
        r : current position
        '''

        r = self.client.connect("Test")
        r = self.client.get_current_positions()
        current_real_position = r.RESULTS['PositionReal']
        x_current, y_current, z_current = current_real_position

        if safety_test == False:  
            r = self.client.move_y(position_y)
        else: 
            print('safety test of move has yet to be implemented')
            if self.test_forbidden_region(x_current, position_y): 
                r = self.client.move_y(position_y)


        # WAIT FOR MOVEMENT TO BE DONE
        self.busy_wait(25)

        # r = self.client.get_current_positions()
        # new_real_position = r.RESULTS['PositionReal']

        rr = self.client.disconnect()
        if verbose == True: 
            return r
        else: 
            return r.RESULTS


    def move_z_abs(self, position_z, safety_test = False, verbose = False):
        '''
            Robot coordinate system!!!: 
            Moves robot to new absolute Z position. 
            No safety test. 

        Parameters
        position : int
            Absolute position in um(?) to which the robot is supposed to move
        safety_test : Boolean
            question whether a safety test is to be performed or not
            True: Test will be performed
            False: Not performed
        verbose : boolean
                Defines whether the function returns the full output, or only the results
            ----------
        Returns: 
        r : current position
        '''

        r = self.client.connect("Test")
        r = self.client.get_current_positions()
        current_real_position = r.RESULTS['PositionReal']
        x_current, y_current, z_current = current_real_position

        if safety_test == False:  
            r = self.client.move_z(position_z)
        else: 
            print('safety test of move has yet to be implemented')
            if self.test_forbidden_region(x_current, y_current): 
                r = self.client.move_z(position_z)


        # WAIT FOR MOVEMENT TO BE DONE
        self.busy_wait(25)

        # r = self.client.get_current_positions()
        # new_real_position = r.RESULTS['PositionReal']

        rr = self.client.disconnect()
        if verbose == True: 
            return r
        else: 
            return r.RESULTS


    def do_task(self, task_name, safety_check = False, verbose = False):
        '''
        Executes a task of the robot

        Parameters
        task_name : string
            task name which robot is supposed to perform
        safety_test : Boolean
            question whether a safety test is to be performed or not
            True: Test will be performed
            False: Not performed
        verbose : boolean
                Defines whether the function returns the full output, or only the results
            ----------
        Returns: 
        r : 
        '''
        import time

        rr = self.client.connect("Test")
        if safety_check == False: 
            r = self.client.execute_task(task_name)
        else: 
            print('safety check needs to be implemented')

        ## Wait for task to be done
        while(r.STATUS['Status'] == "Busy"):
            #Possible if loop is not enterd?
            time.sleep(0.5)
            r = self.client.get_status()
            if self.safety_abort == True: 
                r = self.client.stop_task()
                print('User aborted task execution')
                return r

        r = self.client.get_status()

        #Check if any error occured
        if r.ERROR_CODE == 0:
            print('no error')
        else: 
            print('error while performing task!')

        if verbose == True: 
            return r
        else: 
            return r.RESULTS


    def get_forbidden_region(self, rotation_state = 'both'): 
        """
        returns forbidden regions in the robot coordinate x y plane for end-point testing 
        defined via a rectangle in the x-y-plane. x_start<x_stop, y_start<y_stop
        returns the region depending on the rotation state, as some regions are forbidden only in one configuration

        Parameters
        rotation_state: string
            options are "horizontal" (nozzle sideways, base 90 degrees), "vertical" (nozzle vertical, base 0 degree), "both"
        ----------

        """

        #combine the lists depending on the rotation 
        if rotation_state == "vertical": 
            test_list_safety = self.forbidden_regions_vertical
        elif rotation_state == "horizontal": 
            test_list_safety = self.forbidden_regions_horizontal
        else:
            test_list_safety = self.forbidden_regions_horizontal + self.forbidden_regions_vertical

        return test_list_safety


    def set_forbidden_region(self, x_start, x_stop, y_start, y_stop, rotation_state = 'both'): 
        """
        set a forbidden region in the robot coordinate x y plane for end-point testing 
        defined via a rectangle in the x-y-plane. x_start<x_stop, y_start<y_stop
        set the region depending on the rotation state, as some regions are forbidden only in one configuration

        Parameters
        x_start : float
            x-position start 
        x_stop : float
            x-position stop
        y_start : float
            y-position start 
        y_stop : float
            y-position stop
        rotation_state: string
            options are "horizontal" (nozzle sideways, base 90 degrees), "vertical" (nozzle vertical, base 0 degree), "both"
        ----------

        """
        region_tuple = (min(x_start,x_stop), max(x_start,x_stop), min(y_start,y_stop), max(y_start,y_stop))
        if rotation_state == "horizontal": 
            self.forbidden_regions_horizontal.append(region_tuple)
        elif rotation_state == "vertical": 
            self.forbidden_regions_vertical.append(region_tuple)
        elif rotation_state == "both": 
            self.forbidden_regions_vertical.append(region_tuple)
            self.forbidden_regions_horizontal.append(region_tuple)
        else:
            print('invalid input of rotation state')


    def test_forbidden_region(self, x_test, y_test): 
        """
        tests if the end point of a motion is inside a forbidden region
        No testing of the path of a motion included!

        Parameters
        x_test : float
            x-position to test 
        y_test : float 
            y-position to test 

        ----------
        Returns: 
        safe_motion : bool
            boolean flag if endpoint of motion is safe or not
        """
        from dod.codi import CoDI_base
        # Get current rotation state
        pos_rot_base  = round(CoDI_base.wm(),0)

        #Initialize safe flag (True = safe)
        flag_safe_endpoint = True

        #combine the lists depending on the rotation 
        if pos_rot_base == 90: 
            test_list_safety = self.forbidden_regions_vertical
        elif pos_rot_base == 0:
            test_list_safety = self.forbidden_regions_horizontal
        else:
            test_list_safety = self.forbidden_regions_horizontal + self.forbidden_regions_vertical

        #Test for all regions if the end point is in the forbidden region
        for tuple_current in test_list_safety: 
            x_start, x_stop, y_start, y_stop = tuple_current
            if ((x_start < x_test) and (x_stop > x_test) and (y_start < y_test) and (y_stop > y_test)): 
                flag_safe_endpoint = flag_safe_endpoint and False
            else:
                flag_safe_endpoint = flag_safe_endpoint and True

        return flag_safe_endpoint


    def set_timing_update(self): 
        """
        updating the timing triggers according to the set relative and absolute timing values

        Parameters
               ----------
        Returns: 
        """
        # Nozzle 1
        self.timing_nozzle_1 = self.timing_Xray - self.timing_delay_nozzle_1 - self.timing_delay_sciPulse - self.timing_delay_reaction
        if self.timing_nozzle_1 < 0: 
            self.timing_nozzle_1 = self.timing_nozzle_1 + 1/120*1000000000
        self.trigger_nozzle_1.ns_delay.put(self.timing_nozzle_1)

        # Nozzle 2
        self.timing_nozzle_2 = self.timing_Xray - self.timing_delay_nozzle_2 - self.timing_delay_sciPulse - self.timing_delay_reaction
        if self.timing_nozzle_2 < 0: 
            self.timing_nozzle_2 = self.timing_nozzle_2 + 1/120*1000000000
        self.trigger_nozzle_2.ns_delay.put(self.timing_nozzle_2)

        # LED
        self.timing_LED = self.timing_Xray + self.timing_delay_LED
        self.trigger_LED.ns_delay.put(self.timing_LED)


    def set_timing_zero_nozzle(self, nozzle, timing_rel): 
        """
        Setting the time zero for the nozzles from the LED alignment in robot

        Parameters
        nozle : int
            nozzle number
        timing_rel : float
            relative delay in ns from robot alignment
        ----------
        Returns: 
        """
        if nozzle == 1: 
            self.timing_delay_nozzle_1 = timing_rel
        elif nozzle == 2: 
            self.timing_delay_nozzle_2 = timing_rel
        else: 
            print('no valid nozzle selected.')
            return

        # update timings
        self.set_timing_update()


    def set_timing_rel_LED(self, timing_rel): 
        """
        Setting the relative timing of the LED relative to the X-rays

        Parameters
        timing_rel : float
            relative delay in ns from robot alignment
        ----------
        Returns: 
        """
        self.timing_delay_LED = timing_rel #delay of LED relative to X-ray timing

        # update timings
        self.set_timing_update()


    def set_timing_rel_reaction(self, timing_rel): 
        """
        Setting the relative timing of the reaction relative to the X-rays

        Parameters
        timing_rel : float
            relative delay in ns from robot alignment
        ----------
        Returns: 
        """
        self.timing_delay_reaction = timing_rel #delay of LED relative to X-ray timing

        # update timings
        self.set_timing_update()


    def set_timing_abs_Xray(self, timing_abs): 
        """
        Setting the absolute timing of the X-rays for claculation purposes

        Parameters
        timing_abs : float
            abs timing in ns from robot alignment
        ----------
        Returns: 
        """
        self.timing_Xray = timing_abs #delay of LED relative to X-ray timing
        #self.trigger_Xray.ns_delay.put(self.timing_Xray)

        # update timings
        self.set_timing_update()


    def set_timing_relative_nozzle(self, nozzle, timing_rel): 
        """
        Changing the timing of the selected nozzle by a relative amount 

        Parameters
        nozle : int
            nozzle number
        timing_rel : float
            relative change in ns from robot alignment
        ----------
        Returns: 
        """
        if nozzle == 1: 
            current_timing = self.timing_delay_nozzle_1
            self.timing_delay_nozzle_1 = self.timing_delay_nozzle_1 + timing_rel
        elif nozzle == 2: 
            current_timing = self.timing_delay_nozzle_2
            self.timing_delay_nozzle_2 = self.timing_delay_nozzle_2 + timing_rel
        else: 
            print('no valid nozzle selected.')
            return

        # update timings
        self.set_timing_update()


    def logging_string(self):
        """
        Creating the string to post to the e-log. 

        Parameters
        ----------
        Returns: 
        post : string
            String with useful logging information for posting into the e-log 
        """
        post_str = ''

        # Info to be posted: 

        # Nozzle angles: 
        position = self.codi.get_CoDI_pos()
        position_str = 'Codi Information: \n CoDI data: name: ' + str(position[0]) + '\n rot_base: '+ str(position[1])+ '\n rot_left: '+ str(position[2]) + '\n rot_right: '+ str(position[3])+ '\n z-transl: '+ str(position[4]) 
        post_str = post_str + position_str +' \n '

        #Timings:
        post_str = post_str + 'Timing:' + ' \n '
        post_str = post_str + 'timing_Xray:' + str(self.timing_Xray) +' \n '
        post_str = post_str + 'timing_nozzle_1:' + str(self.timing_nozzle_1) +' \n '
        post_str = post_str + 'timing_nozzle_2:' + str(self.timing_nozzle_2) +' \n '
        post_str = post_str + 'timing_LED:' + str(self.timing_LED) +' \n '
        post_str = post_str + 'timing_delay_LED:' + str(self.timing_delay_LED) +' \n '
        post_str = post_str + 'timing_delay_reaction:' + str(self.timing_delay_reaction) +' \n '        
        post_str = post_str + 'timing_delay_nozzle_1:' + str(self.timing_delay_nozzle_1) +' \n '   
        post_str = post_str + 'timing_delay_nozzle_2:' + str(self.timing_delay_nozzle_2) +' \n '   

        return post_str

__init__(modules='None', ip='172.21.72.187', port=9999, supported_json='/cds/group/pcds/pyps/apps/hutch-python/mfx/dod/supported.json')

Class definition of the DoD robot Parameters


modules : string Defines the optional modules of the robot. Options: 'None', 'codi', , ip = "172.21.72.187" , port = 9999, supported_json = "supported.json"

Source code in dod/dod.py
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
    def __init__(self, modules = 'None', ip = "172.21.72.187", port = 9999, supported_json = '/cds/group/pcds/pyps/apps/hutch-python/mfx/dod/supported.json'
):
        """
        Class definition of the DoD robot
        Parameters
        ----------
        modules : string
            Defines the optional modules of the robot. 
            Options: 'None', 'codi', , 
        ip = "172.21.72.187" , port = 9999, supported_json = "supported.json"            
        """
        from dod.DropsDriver import myClient
        from dod.JsonFileHandler import JsonFileHandler
        from dod.ServerResponse import ServerResponse

        import time

        # Create object 
        # pytest encourages this pattern, apologies.
        # ip = "172.21.72.187" #"172.21.148.101"
        # port = 9999
        # supported_json = '/cds/group/pcds/pyps/apps/hutch-python/mfx/dod/supported.json'

        # User input parameters: 
        # Safety parameters in hutch coordinate system. 
        # Note: hutch (x,y,z) = robot (x,-z, y) 
        # 
        self.y_min     = 10000 # minimum value in y.
        self.y_safety  = 50000 # value in y, above which the robot can only be in vertical configuration
        self.y_max     = 50000 # maximum value in y

        #Initialize safety regions for horizontal and vertical rotation: 
        self.forbidden_regions_horizontal = []
        self.forbidden_regions_vertical = []
        #minimum region: 
        self.set_forbidden_region(0, 300000, 0, self.y_min,rotation_state='both')
        #maximum region: 
        self.set_forbidden_region(0, 300000, self.y_max, 500000,rotation_state='both')
        #region where horizontal rotation is forbidden: 
        self.set_forbidden_region(0, 300000,  self.y_safety, self.y_max,rotation_state='horizontal')

        # Initializing the robot client that is used for communication
        self.client = myClient(ip=ip, port=port, supported_json=supported_json, reload=False)

        # create config parser handler
        json_handler = JsonFileHandler(supported_json)
        # load configs and launch web server
        json_handler.reload_endpoints()

        # Flag that can be used later on for safety aborts during task execution
        self.safety_abort = False
        if modules == 'codi': 
            from dod.codi import CoDI
            self.codi = CoDI()

        # Timing section
        from pcdsdevices.evr import Trigger
        self.delay = None

        # Trigger objects
        self.trigger_Xray = Trigger('MFX:LAS:EVR:01:TRIG7', name='trigger_X-ray_simulator')
        self.trigger_nozzle_1 = Trigger('MFX:LAS:EVR:01:TRIG2', name='trigger_nozzle_1')
        self.trigger_nozzle_2 = Trigger('MFX:LAS:EVR:01:TRIG3', name='trigger_nozzle_2')
        self.trigger_LED = Trigger('MFX:LAS:EVR:01:TRIG1', name='trigger_LED_array')

        # Timing parameter
        self.timing_Xray = self.trigger_Xray.ns_delay.get()
        self.timing_nozzle_1 = self.trigger_nozzle_1.ns_delay.get()
        self.timing_nozzle_2 = self.trigger_nozzle_2.ns_delay.get()
        self.timing_LED = self.trigger_LED.ns_delay.get()
        self.timing_delay_sciPulse = 60600
        self.timing_delay_LED = 1000 #delay of LED relative to X-ray timing
        self.timing_delay_reaction = 0
        self.timing_delay_nozzle_1 = self.timing_Xray - self.timing_nozzle_1 - self.timing_delay_sciPulse - self.timing_delay_reaction
        self.timing_delay_nozzle_2 = self.timing_Xray - self.timing_nozzle_2 - self.timing_delay_sciPulse - self.timing_delay_reaction

busy_wait(timeout)

Busy wait untill timeout value is reached, timeout : sec returns true if timeout occured

Source code in dod/dod.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def busy_wait(self, timeout):
    '''
        Busy wait untill timeout value is reached,
        timeout : sec
        returns true if timeout occured
    '''
    import time
    start = time.time()
    r = self.client.get_status()
    delta = 0

    while(r.STATUS['Status'] == "Busy"):
        if delta > timeout:
            return True

        time.sleep(0.1) #Wait a ms to stop spamming robot
        r = self.client.get_status()
        delta = time.time() - start    
    return False

clear_abort(verbose=True)

clear abort flag Parameters verbose : boolean Defines whether the function returns the full output, or only the results


Returns: r : status readback after error cleared '''

Source code in dod/dod.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def clear_abort(self, verbose = True):
    """
    clear abort flag
    Parameters
    verbose : boolean
       Defines whether the function returns the full output, or only the results
    ----------
    Returns: 
    r : 
        status readback after error cleared
    '''
    """
    r = self.client.connect("Test")
    r = self.client.get_status()
    self.safety_abort = False
    rr = self.client.disconnect()

    if verbose == True: 
        return r

do_move(position, safety_test=False, verbose=False)

Moves robot to new position

Parameters position : string Position name to which the robot is supposed to move safety_test : Boolean question whether a safety test is to be performed or not True: Test will be performed False: Not performed verbose : boolean Defines whether the function returns the full output, or only the results ---------- Returns: r : current position

Source code in dod/dod.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
def do_move(self, position, safety_test = False, verbose = False):
    '''
        Moves robot to new position

    Parameters
    position : string
        Position name to which the robot is supposed to move
    safety_test : Boolean
        question whether a safety test is to be performed or not
        True: Test will be performed
        False: Not performed
    verbose : boolean
            Defines whether the function returns the full output, or only the results
        ----------
    Returns: 
    r : current position
    '''

    r = self.client.connect("Test")
    r = self.client.get_current_positions()
    current_real_position = r.RESULTS['PositionReal']

    if safety_test == False:  
        r = self.client.move(position)
    else: 
        print('safety test of move has yet to be implemented')

    # WAIT FOR MOVEMENT TO BE DONE
    self.busy_wait(15)

    r = self.client.get_current_positions()
    new_real_position = r.RESULTS['PositionReal']

    rr = self.client.disconnect()
    if verbose == True: 
        return r
    else: 
        return r.RESULTS

do_task(task_name, safety_check=False, verbose=False)

Executes a task of the robot

Parameters task_name : string task name which robot is supposed to perform safety_test : Boolean question whether a safety test is to be performed or not True: Test will be performed False: Not performed verbose : boolean Defines whether the function returns the full output, or only the results ---------- Returns: r :

Source code in dod/dod.py
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
def do_task(self, task_name, safety_check = False, verbose = False):
    '''
    Executes a task of the robot

    Parameters
    task_name : string
        task name which robot is supposed to perform
    safety_test : Boolean
        question whether a safety test is to be performed or not
        True: Test will be performed
        False: Not performed
    verbose : boolean
            Defines whether the function returns the full output, or only the results
        ----------
    Returns: 
    r : 
    '''
    import time

    rr = self.client.connect("Test")
    if safety_check == False: 
        r = self.client.execute_task(task_name)
    else: 
        print('safety check needs to be implemented')

    ## Wait for task to be done
    while(r.STATUS['Status'] == "Busy"):
        #Possible if loop is not enterd?
        time.sleep(0.5)
        r = self.client.get_status()
        if self.safety_abort == True: 
            r = self.client.stop_task()
            print('User aborted task execution')
            return r

    r = self.client.get_status()

    #Check if any error occured
    if r.ERROR_CODE == 0:
        print('no error')
    else: 
        print('error while performing task!')

    if verbose == True: 
        return r
    else: 
        return r.RESULTS

get_current_position(verbose=False)

Returns current robot position name and properties of the last selected position, together with the real current position coordinates Parameters

boolean

Defines whether the function returns the full output, or only the results

----------

Returns: r : returns the current position.

expected_keys = [

'CurrentPosition',

'Position',

'PositionReal',

]

Source code in dod/dod.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def get_current_position(self, verbose = False):
    '''
    Returns current robot position
    name and properties of the last selected position, together with the real current position coordinates
    Parameters

    verbose : boolean
            Defines whether the function returns the full output, or only the results
        ----------
    Returns: 
    r : 
        returns the current position.         
    # expected_keys = [
    #         'CurrentPosition',
    #         'Position',
    #         'PositionReal',
    #         ]
    '''
    rr = self.client.connect("Test")
    r = self.client.get_current_positions()
    rr = self.client.disconnect()
    if verbose == True: 
        return r
    else: 
        return r.RESULTS

get_forbidden_region(rotation_state='both')

returns forbidden regions in the robot coordinate x y plane for end-point testing defined via a rectangle in the x-y-plane. x_start<x_stop, y_start<y_stop returns the region depending on the rotation state, as some regions are forbidden only in one configuration

Parameters rotation_state: string options are "horizontal" (nozzle sideways, base 90 degrees), "vertical" (nozzle vertical, base 0 degree), "both"


Source code in dod/dod.py
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
def get_forbidden_region(self, rotation_state = 'both'): 
    """
    returns forbidden regions in the robot coordinate x y plane for end-point testing 
    defined via a rectangle in the x-y-plane. x_start<x_stop, y_start<y_stop
    returns the region depending on the rotation state, as some regions are forbidden only in one configuration

    Parameters
    rotation_state: string
        options are "horizontal" (nozzle sideways, base 90 degrees), "vertical" (nozzle vertical, base 0 degree), "both"
    ----------

    """

    #combine the lists depending on the rotation 
    if rotation_state == "vertical": 
        test_list_safety = self.forbidden_regions_vertical
    elif rotation_state == "horizontal": 
        test_list_safety = self.forbidden_regions_horizontal
    else:
        test_list_safety = self.forbidden_regions_horizontal + self.forbidden_regions_vertical

    return test_list_safety

get_nozzle_status(verbose=False)

Returns current nozzle parameters position Parameters

boolean

Defines whether the function returns the full output, or only the results


Returns: r : returns the current nozzle parameters.

expected_keys = [

    "Activated Nozzles",
    "Selected Nozzles",
    "ID,Volt,Pulse,Freq,Volume",  # Intreseting? all one key
    "Dispensing",
    ]
Source code in dod/dod.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def get_nozzle_status(self, verbose = False):
    '''
    Returns current nozzle parameters position
    Parameters

    verbose : boolean
            Defines whether the function returns the full output, or only the results
    ----------
    Returns: 
    r : 
        returns the current nozzle parameters.         
    #         expected_keys = [
            "Activated Nozzles",
            "Selected Nozzles",
            "ID,Volt,Pulse,Freq,Volume",  # Intreseting? all one key
            "Dispensing",
            ]
    '''
    rr = self.client.connect("Test")
    r = self.client.get_nozzle_status()
    rr = self.client.disconnect()
    if verbose == True: 
        return r
    else: 
        return r.RESULTS

get_status(verbose=False)

returns the robot state

Parameters verbose : boolean Defines whether the function returns the full output, or only the results


Returns: r : dict different states of the robot

Source code in dod/dod.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def get_status(self, verbose = False):
    """
    returns the robot state

    Parameters
    verbose : boolean
       Defines whether the function returns the full output, or only the results
    ----------
    Returns: 
    r : dict
        different states of the robot
    """
    rr = self.client.connect("Test")
    r = self.client.get_status()
    # expected_keys = [
    #     'Position',
    #     'RunningTask',
    #     'Dialog',
    #     'LastProbe',
    #     'Humidity',
    #     'Temperature',
    #     'BathTemp',
    #     ]
    rr = self.client.disconnect()
    if verbose == True: 
        return r
    else: 
        return r.RESULTS

get_task_details(task_name, verbose=False)

This gets the details of a task from the robot to see the scripted routines Parameters task_name : string Name of the task that we want to get verbose : boolean Defines whether the function returns the full output, or only the results


Returns: r : returns the robot tasks

Source code in dod/dod.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def get_task_details(self, task_name, verbose = False):
    """
        This gets the details of a task from the robot to see the scripted routines
        Parameters
        task_name : string
            Name of the task that we want to get
        verbose : boolean
            Defines whether the function returns the full output, or only the results
        ----------
        Returns: 
        r : 
            returns the robot tasks
    """
    rr = self.client.connect("Test")
    r = self.client.get_task_details(task_name)
    rr = self.client.disconnect()
    if verbose == True: 
        return r
    else: 
        return r.RESULTS

get_task_names(verbose=False)

This gets the names of available tasks from the robot Parameters task_name : string Name of the task that we want to get verbose : boolean Defines whether the function returns the full output, or only the results


Returns: r : dict returns the robot tasks

Source code in dod/dod.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def get_task_names(self, verbose = False):
    """
        This gets the names of available tasks from the robot
        Parameters
        task_name : string
            Name of the task that we want to get
        verbose : boolean
            Defines whether the function returns the full output, or only the results
        ----------
        Returns: 
        r : dict
            returns the robot tasks
    """
    # Check if reponse is not an empty array or any errors occured
    rr = self.client.connect("Test")
    r = self.client.get_task_names()
    rr = self.client.disconnect()
    if verbose == True: 
        return r
    else: 
        return r.RESULTS

logging_string()

Creating the string to post to the e-log.

Parameters

Returns: post : string String with useful logging information for posting into the e-log

Source code in dod/dod.py
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
def logging_string(self):
    """
    Creating the string to post to the e-log. 

    Parameters
    ----------
    Returns: 
    post : string
        String with useful logging information for posting into the e-log 
    """
    post_str = ''

    # Info to be posted: 

    # Nozzle angles: 
    position = self.codi.get_CoDI_pos()
    position_str = 'Codi Information: \n CoDI data: name: ' + str(position[0]) + '\n rot_base: '+ str(position[1])+ '\n rot_left: '+ str(position[2]) + '\n rot_right: '+ str(position[3])+ '\n z-transl: '+ str(position[4]) 
    post_str = post_str + position_str +' \n '

    #Timings:
    post_str = post_str + 'Timing:' + ' \n '
    post_str = post_str + 'timing_Xray:' + str(self.timing_Xray) +' \n '
    post_str = post_str + 'timing_nozzle_1:' + str(self.timing_nozzle_1) +' \n '
    post_str = post_str + 'timing_nozzle_2:' + str(self.timing_nozzle_2) +' \n '
    post_str = post_str + 'timing_LED:' + str(self.timing_LED) +' \n '
    post_str = post_str + 'timing_delay_LED:' + str(self.timing_delay_LED) +' \n '
    post_str = post_str + 'timing_delay_reaction:' + str(self.timing_delay_reaction) +' \n '        
    post_str = post_str + 'timing_delay_nozzle_1:' + str(self.timing_delay_nozzle_1) +' \n '   
    post_str = post_str + 'timing_delay_nozzle_2:' + str(self.timing_delay_nozzle_2) +' \n '   

    return post_str

move_x_abs(position_x, safety_test=False, verbose=False)

Robot coordinate system!!!: 
Moves robot to new absolute x position. 
No safety test.

Parameters position : int Absolute position in um(?) to which the robot is supposed to move safety_test : Boolean question whether a safety test is to be performed or not True: Test will be performed False: Not performed verbose : boolean Defines whether the function returns the full output, or only the results ---------- Returns: r : current position

Source code in dod/dod.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
def move_x_abs(self, position_x, safety_test = False, verbose = False):
    '''
        Robot coordinate system!!!: 
        Moves robot to new absolute x position. 
        No safety test. 

    Parameters
    position : int
        Absolute position in um(?) to which the robot is supposed to move
    safety_test : Boolean
        question whether a safety test is to be performed or not
        True: Test will be performed
        False: Not performed
    verbose : boolean
            Defines whether the function returns the full output, or only the results
        ----------
    Returns: 
    r : current position
    '''

    r = self.client.connect("Test")
    r = self.client.get_current_positions()
    current_real_position = r.RESULTS['PositionReal']
    x_current, y_current, z_current = current_real_position

    if safety_test == False:  
        r = self.client.move_x(position_x)
    else: 
        print('safety test of move has yet to be implemented')
        if self.test_forbidden_region(position_x, y_current): 
            r = self.client.move_x(position_x)


    # WAIT FOR MOVEMENT TO BE DONE
    self.busy_wait(25)

    # r = self.client.get_current_positions()
    # new_real_position = r.RESULTS['PositionReal']

    rr = self.client.disconnect()
    if verbose == True: 
        return r
    else: 
        return r.RESULTS

move_y_abs(position_y, safety_test=False, verbose=False)

Robot coordinate system!!!: 
Moves robot to new absolute x position. 
No safety test.

Parameters position : int Absolute position in um(?) to which the robot is supposed to move safety_test : Boolean question whether a safety test is to be performed or not True: Test will be performed False: Not performed verbose : boolean Defines whether the function returns the full output, or only the results ---------- Returns: r : current position

Source code in dod/dod.py
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
def move_y_abs(self, position_y, safety_test = False, verbose = False):
    '''
        Robot coordinate system!!!: 
        Moves robot to new absolute x position. 
        No safety test. 

    Parameters
    position : int
        Absolute position in um(?) to which the robot is supposed to move
    safety_test : Boolean
        question whether a safety test is to be performed or not
        True: Test will be performed
        False: Not performed
    verbose : boolean
            Defines whether the function returns the full output, or only the results
        ----------
    Returns: 
    r : current position
    '''

    r = self.client.connect("Test")
    r = self.client.get_current_positions()
    current_real_position = r.RESULTS['PositionReal']
    x_current, y_current, z_current = current_real_position

    if safety_test == False:  
        r = self.client.move_y(position_y)
    else: 
        print('safety test of move has yet to be implemented')
        if self.test_forbidden_region(x_current, position_y): 
            r = self.client.move_y(position_y)


    # WAIT FOR MOVEMENT TO BE DONE
    self.busy_wait(25)

    # r = self.client.get_current_positions()
    # new_real_position = r.RESULTS['PositionReal']

    rr = self.client.disconnect()
    if verbose == True: 
        return r
    else: 
        return r.RESULTS

move_z_abs(position_z, safety_test=False, verbose=False)

Robot coordinate system!!!: 
Moves robot to new absolute Z position. 
No safety test.

Parameters position : int Absolute position in um(?) to which the robot is supposed to move safety_test : Boolean question whether a safety test is to be performed or not True: Test will be performed False: Not performed verbose : boolean Defines whether the function returns the full output, or only the results ---------- Returns: r : current position

Source code in dod/dod.py
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
def move_z_abs(self, position_z, safety_test = False, verbose = False):
    '''
        Robot coordinate system!!!: 
        Moves robot to new absolute Z position. 
        No safety test. 

    Parameters
    position : int
        Absolute position in um(?) to which the robot is supposed to move
    safety_test : Boolean
        question whether a safety test is to be performed or not
        True: Test will be performed
        False: Not performed
    verbose : boolean
            Defines whether the function returns the full output, or only the results
        ----------
    Returns: 
    r : current position
    '''

    r = self.client.connect("Test")
    r = self.client.get_current_positions()
    current_real_position = r.RESULTS['PositionReal']
    x_current, y_current, z_current = current_real_position

    if safety_test == False:  
        r = self.client.move_z(position_z)
    else: 
        print('safety test of move has yet to be implemented')
        if self.test_forbidden_region(x_current, y_current): 
            r = self.client.move_z(position_z)


    # WAIT FOR MOVEMENT TO BE DONE
    self.busy_wait(25)

    # r = self.client.get_current_positions()
    # new_real_position = r.RESULTS['PositionReal']

    rr = self.client.disconnect()
    if verbose == True: 
        return r
    else: 
        return r.RESULTS

set_forbidden_region(x_start, x_stop, y_start, y_stop, rotation_state='both')

set a forbidden region in the robot coordinate x y plane for end-point testing defined via a rectangle in the x-y-plane. x_start<x_stop, y_start<y_stop set the region depending on the rotation state, as some regions are forbidden only in one configuration

Parameters x_start : float x-position start x_stop : float x-position stop y_start : float y-position start y_stop : float y-position stop rotation_state: string options are "horizontal" (nozzle sideways, base 90 degrees), "vertical" (nozzle vertical, base 0 degree), "both"


Source code in dod/dod.py
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
def set_forbidden_region(self, x_start, x_stop, y_start, y_stop, rotation_state = 'both'): 
    """
    set a forbidden region in the robot coordinate x y plane for end-point testing 
    defined via a rectangle in the x-y-plane. x_start<x_stop, y_start<y_stop
    set the region depending on the rotation state, as some regions are forbidden only in one configuration

    Parameters
    x_start : float
        x-position start 
    x_stop : float
        x-position stop
    y_start : float
        y-position start 
    y_stop : float
        y-position stop
    rotation_state: string
        options are "horizontal" (nozzle sideways, base 90 degrees), "vertical" (nozzle vertical, base 0 degree), "both"
    ----------

    """
    region_tuple = (min(x_start,x_stop), max(x_start,x_stop), min(y_start,y_stop), max(y_start,y_stop))
    if rotation_state == "horizontal": 
        self.forbidden_regions_horizontal.append(region_tuple)
    elif rotation_state == "vertical": 
        self.forbidden_regions_vertical.append(region_tuple)
    elif rotation_state == "both": 
        self.forbidden_regions_vertical.append(region_tuple)
        self.forbidden_regions_horizontal.append(region_tuple)
    else:
        print('invalid input of rotation state')

set_nozzle_dispensing(mode='Off', verbose=False)

Sets the dispensing, either "Free", "Triggered", or "Off" Parameters mode : string either "free", "triggered", or "off" verbose : boolean Defines whether the function returns the full output, or only the results


Returns: r :

Source code in dod/dod.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
def set_nozzle_dispensing(self, mode = "Off", verbose = False):
    '''
    Sets the dispensing, either "Free", "Triggered", or "Off"
    Parameters
    mode : string 
        either "free", "triggered", or "off"
    verbose : boolean
            Defines whether the function returns the full output, or only the results
    ----------
    Returns: 
    r : 
    '''
    rr = self.client.connect("Test")
    if mode == 'Free': 
        r = self.client.dispensing('Free')
    elif mode == 'Triggered':
        r = self.client.dispensing('Triggered')
    else: 
        #turns active nozzles off. Safer if all nozzles would be turned off
        r = self.client.dispensing('Off')
        for i in [1,2,3,4]: 
            r = self.client.select_nozzle(i)
            r = self.client.dispensing('Off')

    rr = self.client.disconnect()
    if verbose == True: 
        return r
    else: 
        return r.RESULTS

set_timing_abs_Xray(timing_abs)

Setting the absolute timing of the X-rays for claculation purposes

Parameters timing_abs : float abs timing in ns from robot alignment


Returns:

Source code in dod/dod.py
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
def set_timing_abs_Xray(self, timing_abs): 
    """
    Setting the absolute timing of the X-rays for claculation purposes

    Parameters
    timing_abs : float
        abs timing in ns from robot alignment
    ----------
    Returns: 
    """
    self.timing_Xray = timing_abs #delay of LED relative to X-ray timing
    #self.trigger_Xray.ns_delay.put(self.timing_Xray)

    # update timings
    self.set_timing_update()

set_timing_rel_LED(timing_rel)

Setting the relative timing of the LED relative to the X-rays

Parameters timing_rel : float relative delay in ns from robot alignment


Returns:

Source code in dod/dod.py
702
703
704
705
706
707
708
709
710
711
712
713
714
715
def set_timing_rel_LED(self, timing_rel): 
    """
    Setting the relative timing of the LED relative to the X-rays

    Parameters
    timing_rel : float
        relative delay in ns from robot alignment
    ----------
    Returns: 
    """
    self.timing_delay_LED = timing_rel #delay of LED relative to X-ray timing

    # update timings
    self.set_timing_update()

set_timing_rel_reaction(timing_rel)

Setting the relative timing of the reaction relative to the X-rays

Parameters timing_rel : float relative delay in ns from robot alignment


Returns:

Source code in dod/dod.py
718
719
720
721
722
723
724
725
726
727
728
729
730
731
def set_timing_rel_reaction(self, timing_rel): 
    """
    Setting the relative timing of the reaction relative to the X-rays

    Parameters
    timing_rel : float
        relative delay in ns from robot alignment
    ----------
    Returns: 
    """
    self.timing_delay_reaction = timing_rel #delay of LED relative to X-ray timing

    # update timings
    self.set_timing_update()

set_timing_relative_nozzle(nozzle, timing_rel)

Changing the timing of the selected nozzle by a relative amount

Parameters nozle : int nozzle number timing_rel : float relative change in ns from robot alignment


Returns:

Source code in dod/dod.py
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
def set_timing_relative_nozzle(self, nozzle, timing_rel): 
    """
    Changing the timing of the selected nozzle by a relative amount 

    Parameters
    nozle : int
        nozzle number
    timing_rel : float
        relative change in ns from robot alignment
    ----------
    Returns: 
    """
    if nozzle == 1: 
        current_timing = self.timing_delay_nozzle_1
        self.timing_delay_nozzle_1 = self.timing_delay_nozzle_1 + timing_rel
    elif nozzle == 2: 
        current_timing = self.timing_delay_nozzle_2
        self.timing_delay_nozzle_2 = self.timing_delay_nozzle_2 + timing_rel
    else: 
        print('no valid nozzle selected.')
        return

    # update timings
    self.set_timing_update()

set_timing_update()

updating the timing triggers according to the set relative and absolute timing values

Parameters ---------- Returns:

Source code in dod/dod.py
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
def set_timing_update(self): 
    """
    updating the timing triggers according to the set relative and absolute timing values

    Parameters
           ----------
    Returns: 
    """
    # Nozzle 1
    self.timing_nozzle_1 = self.timing_Xray - self.timing_delay_nozzle_1 - self.timing_delay_sciPulse - self.timing_delay_reaction
    if self.timing_nozzle_1 < 0: 
        self.timing_nozzle_1 = self.timing_nozzle_1 + 1/120*1000000000
    self.trigger_nozzle_1.ns_delay.put(self.timing_nozzle_1)

    # Nozzle 2
    self.timing_nozzle_2 = self.timing_Xray - self.timing_delay_nozzle_2 - self.timing_delay_sciPulse - self.timing_delay_reaction
    if self.timing_nozzle_2 < 0: 
        self.timing_nozzle_2 = self.timing_nozzle_2 + 1/120*1000000000
    self.trigger_nozzle_2.ns_delay.put(self.timing_nozzle_2)

    # LED
    self.timing_LED = self.timing_Xray + self.timing_delay_LED
    self.trigger_LED.ns_delay.put(self.timing_LED)

set_timing_zero_nozzle(nozzle, timing_rel)

Setting the time zero for the nozzles from the LED alignment in robot

Parameters nozle : int nozzle number timing_rel : float relative delay in ns from robot alignment


Returns:

Source code in dod/dod.py
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
def set_timing_zero_nozzle(self, nozzle, timing_rel): 
    """
    Setting the time zero for the nozzles from the LED alignment in robot

    Parameters
    nozle : int
        nozzle number
    timing_rel : float
        relative delay in ns from robot alignment
    ----------
    Returns: 
    """
    if nozzle == 1: 
        self.timing_delay_nozzle_1 = timing_rel
    elif nozzle == 2: 
        self.timing_delay_nozzle_2 = timing_rel
    else: 
        print('no valid nozzle selected.')
        return

    # update timings
    self.set_timing_update()

stop_task(verbose=True)

Stop task while running ** ISSUES ** - When stop task is called, the Robot stays in "BUSY" Status. Parameters


verbose : boolean Defines whether the function returns the full output, or only the results Returns: r : status readback when aborted

Source code in dod/dod.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def stop_task(self, verbose = True):
    """
    Stop task while running
    ** ISSUES **
    -  When stop task  is called, the Robot stays in "BUSY" Status.
    Parameters
    ----------
    verbose : boolean
        Defines whether the function returns the full output, or only the results
    Returns: 
    r : 
        status readback when aborted
    """
    r = self.client.connect("Test")
    self.safety_abort = False
    r = self.client.stop_task()
    r = self.client.disconnect()
    if verbose == True: 
        return r

test_forbidden_region(x_test, y_test)

tests if the end point of a motion is inside a forbidden region No testing of the path of a motion included!

Parameters x_test : float x-position to test y_test : float y-position to test


Returns: safe_motion : bool boolean flag if endpoint of motion is safe or not

Source code in dod/dod.py
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
def test_forbidden_region(self, x_test, y_test): 
    """
    tests if the end point of a motion is inside a forbidden region
    No testing of the path of a motion included!

    Parameters
    x_test : float
        x-position to test 
    y_test : float 
        y-position to test 

    ----------
    Returns: 
    safe_motion : bool
        boolean flag if endpoint of motion is safe or not
    """
    from dod.codi import CoDI_base
    # Get current rotation state
    pos_rot_base  = round(CoDI_base.wm(),0)

    #Initialize safe flag (True = safe)
    flag_safe_endpoint = True

    #combine the lists depending on the rotation 
    if pos_rot_base == 90: 
        test_list_safety = self.forbidden_regions_vertical
    elif pos_rot_base == 0:
        test_list_safety = self.forbidden_regions_horizontal
    else:
        test_list_safety = self.forbidden_regions_horizontal + self.forbidden_regions_vertical

    #Test for all regions if the end point is in the forbidden region
    for tuple_current in test_list_safety: 
        x_start, x_stop, y_start, y_stop = tuple_current
        if ((x_start < x_test) and (x_stop > x_test) and (y_start < y_test) and (y_stop > y_test)): 
            flag_safe_endpoint = flag_safe_endpoint and False
        else:
            flag_safe_endpoint = flag_safe_endpoint and True

    return flag_safe_endpoint

MFX_Timing

Source code in mfx/mfx_timing.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
class MFX_Timing:
    def __init__(self,sequencer=None):
        from pcdsdevices.sequencer import EventSequencer
        self.seq1 = EventSequencer('ECS:SYS0:7', name='mfx_sequencer')
        self.seq2 = EventSequencer('ECS:SYS0:12', name='mfx_sequencer_spare')

        self.evt_code = {
            'wait':0,
            'pp_trig':197,
            'daq_readout':198,
            'laser_on':203,
            'laser_off':204,
            'ray_readout':210,
            'ray1':211,
            'ray2':212,
            'ray3':213,
        }
        self.sync_markers = {0.5:0, 1:1, 5:2, 10:3, 30:4, 60:5, 120:6, 360:7}
        self.sequence = []


    def _seq_step(self, evt_code_name=None, delta_beam=0):
        try:
            return [self.evt_code[evt_code_name], delta_beam, 0, 0]
        except:
            print('Error: event sequencer step not recognized.')


    def _seq_init(self, sync_mark=30):
        from time import sleep
        self.seq.sync_marker.put(self.sync_markers[sync_mark])
        self.sequence = []
        sequence = []
        for ii in range(15):
            sequence.append(self._seq_step('wait', 0))
        self.seq.sequence.put_seq(sequence)
        sleep(1)


    def _seq_put(self, steps):
        for step in steps:
            self.sequence.append(self._seq_step(step[0], step[1]))
        self.seq.sequence.put_seq(self.sequence)


    def _seq_120hz(self):
        steps = [['ray_readout', 1],
                 ['daq_readout',0],
                 ['ray1',1],
                 ['daq_readout',0],
                 ['ray2',1],
                 ['daq_readout',0],
                 ['ray3',1],
                 ['daq_readout', 0]]
        return steps


    def _seq_120hz_trucated(self):
        steps = [['ray_readout', 1],
                 ['daq_readout',0]]
        return steps


    def _seq_60hz(self):
        steps = [['ray_readout', 1],
                 ['pp_trig', 0],
                 ['ray1', 1],
                 ['daq_readout', 0],
                 ['ray2', 1],
                 ['pp_trig', 0],
                 ['ray3', 1],
                 ['daq_readout', 0]]
        return steps


    def _seq_60hz_trucated(self):
        steps = [['ray_readout', 1],
                 ['ray1', 0],
                 ['ray2', 1],
                 ['daq_readout', 0]]
        return steps


    def _seq_30hz(self):
        steps = [['ray_readout', 1],
                 ['pp_trig', 0],
                 ['ray1', 1],
                 ['ray2', 1],
                 ['daq_readout', 0],
                 ['ray3', 1]]
        return steps


    def _seq_20hz(self):
        steps = [['ray_readout', 1],
                 ['pp_trig', 0],
                 ['ray1', 1],
                 ['ray2', 1],
                 ['daq_readout', 0],
                 ['ray3', 1],
                 ['ray3', 1],
                 ['ray3', 1]]
        return steps


    def set_seq(self, rep=None, sequencer=None, laser=None):
        """
    Set your event sequencer

    Parameters
    ----------
    rep: int, optional
        Set repitition rate only 120, 60, 30, and 20 Hz are currently available

    sequencer: str, optional
        default is event sequencer 7 and use 'spare' to run sequencer 12

    laser: list, optional
        sets laser sequence list in format [['laser_on',0],['laser_off',0],...]

    Operations
    ----------

        """
        self.seq1.stop()
        self.seq2.stop()
        if str(sequencer).lower() == 'spare':
            self.seq = self.seq2
        else:
            self.seq = self.seq1
        if laser:
            if rep is None or rep == 120:
                self._seq_init(sync_mark=120)
                for laser_evt in laser:
                    sequence = self._seq_120hz_trucated()
                    block = sequence[:-1]
                    block.append(laser_evt)
                    block.append(sequence[-1])
                    self._seq_put(block)
            elif rep == 60:
                self._seq_init(sync_mark=120)
                for laser_evt in laser:
                    sequence = self._seq_60hz_trucated()
                    block = sequence[:-1]
                    block.append(laser_evt)
                    block.append(sequence[-1])
                    self._seq_put(block)
            elif rep == 30:
                self._seq_init(sync_mark=30)
                for laser_evt in laser:
                    sequence = self._seq_30hz()
                    block = sequence[:-1]
                    block.append(laser_evt)
                    block.append(sequence[-1])
                    self._seq_put(block)
            elif rep == 20:
                self._seq_init(sync_mark=60)
                for laser_evt in laser:
                    sequence = self._seq_20hz()
                    block = sequence[:-1]
                    block.append(laser_evt)
                    block.append(sequence[-1])
                    self._seq_put(block)
        else:
            if rep is None or rep == 120:
                self._seq_init(sync_mark=120)
                self._seq_put(self._seq_120hz())
            elif rep == 60:
                self._seq_init(sync_mark=60)
                self._seq_put(self._seq_60hz())
            elif rep == 30:
                self._seq_init(sync_mark=30)
                self._seq_put(self._seq_30hz())
            elif rep == 20:
                self._seq_init(sync_mark=20)
                self._seq_put(self._seq_20hz())

        self.seq.start()
        return self.sequence

    def check_seq(self):
        for line in self.sequence:
            print(line)

set_seq(rep=None, sequencer=None, laser=None)

Set your event sequencer

Parameters

rep: int, optional Set repitition rate only 120, 60, 30, and 20 Hz are currently available

str, optional

default is event sequencer 7 and use 'spare' to run sequencer 12

list, optional

sets laser sequence list in format [['laser_on',0],['laser_off',0],...]

Operations
Source code in mfx/mfx_timing.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def set_seq(self, rep=None, sequencer=None, laser=None):
    """
Set your event sequencer

Parameters
----------
rep: int, optional
    Set repitition rate only 120, 60, 30, and 20 Hz are currently available

sequencer: str, optional
    default is event sequencer 7 and use 'spare' to run sequencer 12

laser: list, optional
    sets laser sequence list in format [['laser_on',0],['laser_off',0],...]

Operations
----------

    """
    self.seq1.stop()
    self.seq2.stop()
    if str(sequencer).lower() == 'spare':
        self.seq = self.seq2
    else:
        self.seq = self.seq1
    if laser:
        if rep is None or rep == 120:
            self._seq_init(sync_mark=120)
            for laser_evt in laser:
                sequence = self._seq_120hz_trucated()
                block = sequence[:-1]
                block.append(laser_evt)
                block.append(sequence[-1])
                self._seq_put(block)
        elif rep == 60:
            self._seq_init(sync_mark=120)
            for laser_evt in laser:
                sequence = self._seq_60hz_trucated()
                block = sequence[:-1]
                block.append(laser_evt)
                block.append(sequence[-1])
                self._seq_put(block)
        elif rep == 30:
            self._seq_init(sync_mark=30)
            for laser_evt in laser:
                sequence = self._seq_30hz()
                block = sequence[:-1]
                block.append(laser_evt)
                block.append(sequence[-1])
                self._seq_put(block)
        elif rep == 20:
            self._seq_init(sync_mark=60)
            for laser_evt in laser:
                sequence = self._seq_20hz()
                block = sequence[:-1]
                block.append(laser_evt)
                block.append(sequence[-1])
                self._seq_put(block)
    else:
        if rep is None or rep == 120:
            self._seq_init(sync_mark=120)
            self._seq_put(self._seq_120hz())
        elif rep == 60:
            self._seq_init(sync_mark=60)
            self._seq_put(self._seq_60hz())
        elif rep == 30:
            self._seq_init(sync_mark=30)
            self._seq_put(self._seq_30hz())
        elif rep == 20:
            self._seq_init(sync_mark=20)
            self._seq_put(self._seq_20hz())

    self.seq.start()
    return self.sequence

OM

Source code in mfx/om.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
class OM:
    def __init__(self):
        from mfx.macros import get_exp
        self.experiment = str(get_exp())
        self.cwd = f'/cds/home/opr/mfxopr/OM-GUI'
        self.pwd = f'{self.cwd}/{self.experiment}'


    def fix_run_om(self, path: str):
        """
        Fixes the run_om.sh file to have current experiment number

        Parameters:
            path (str): run_om.sh file path.

        Operations:

        """
        from subprocess import check_output
        import logging
        logging.info(f"Updating run_om.sh file: {path}")
        wherepsana = check_output("wherepsana",shell=True).decode().strip('\n')

        with open(path, "r") as file:
            lines = file.readlines()
            for ind, line in enumerate(lines):
                if '--host' in lines[ind]:
                    newline = f'     --host {wherepsana} $(pwd)/monitor_wrapper.sh\n'
                    logging.info(f'Changing line {ind} to {newline}')
                    lines[ind] = newline

        with open(path, 'w') as file:
            file.writelines(lines)


    def fix_yaml(self, yaml: str, mask='', geom=''):
        """
        Fixes the yaml file to have current experiment number

        Parameters:
            yaml (str): monitor.yaml file path.

            mask (str): mask file path.

            geom (str): geom file path.

        Operations:

        """
        import logging
        logging.info(f"Updating yaml file: {yaml}")
        with open(yaml, "r") as file:
            lines = file.readlines()
            for ind, line in enumerate(lines):
                if 'psana_calibration_directory' in lines[ind]:
                    newline = f'  psana_calibration_directory: /sdf/data/lcls/ds/mfx/{self.experiment}/calib\n'
                    logging.info(f'Changing line {ind} to {newline}')
                    lines[ind] = newline
            if mask != '':
                for ind, line in enumerate(lines):
                    if 'bad_pixel_map_filename' in lines[ind]:
                        newline = f'  bad_pixel_map_filename: {mask}\n'
                        logging.info(f'Changing line {ind} to {newline}')
                        lines[ind] = newline

            if geom != '':
                for ind, line in enumerate(lines):
                    if 'geometry_file' in lines[ind]:
                        newline = f'  geometry_file: {geom}\n'
                        logging.info(f'Changing line {ind} to {newline}')
                        lines[ind] = newline

        with open(yaml, 'w') as file:
            file.writelines(lines)


    def check_settings(self):
        """
        Checks OM's file system and sets it up if needed

        Parameters
        ----------

        Operations
        ----------
        """
        import os
        import sys
        import logging
        from shutil import copy2
        logging.info("Checking OM Files")
        if not os.path.exists(self.pwd) or len(os.listdir(self.pwd)) == 0:
            logging.warning(f"No Directory Exists for Experiment: {self.experiment}. Would you like to create it?")
            mkdir = input("(y/n)? ")

            if mkdir.lower() == "y":
                logging.info(f"Creating Directory for Experiment: {self.experiment}.")
                pre_pwd = max(
                    [os.path.join(self.cwd, d) for d in os.listdir(
                        self.cwd) if os.path.isdir(os.path.join(self.cwd, d))], key=os.path.getmtime)

                os.makedirs(self.pwd)
                det_dirs=[f'{self.pwd}/om_workspace', f'{self.pwd}/om_workspace_rayonix', f'{self.pwd}/om_workspace_xes']
                logging.info(f"Creating Directories for Epix10k2M.")
                os.makedirs(det_dirs[0])
                logging.info(f"Creating Directories for Rayonix.")
                os.makedirs(det_dirs[1])
                logging.info(f"Creating Directories for XES.")
                os.makedirs(det_dirs[2])

                pre_det_dirs=[f'{pre_pwd}/om_workspace', f'{pre_pwd}/om_workspace_rayonix', f'{pre_pwd}/om_workspace_xes']

                logging.info(f"Copying Key Files from Previous Experiment: {pre_pwd}")

                for ind, det in enumerate(det_dirs):
                    logging.info(f"Copying Key Files for: {pre_det_dirs[ind]}")
                    shell_list = [os.path.join(pre_det_dirs[ind], file) for file in os.listdir(
                        pre_det_dirs[ind]) if file.endswith(".sh")]
                    if len(shell_list) != 0:
                        for sh in shell_list:
                            copy2(sh, det_dirs[ind])
                    else:
                        logging.error(f'No shell scripts found in: {det}')

                    if os.path.isfile(os.path.join(det, 'run_om.sh')):
                        self.fix_run_om(os.path.join(det, 'run_om.sh'))

                    geom_list = [os.path.join(
                        pre_det_dirs[ind], file) for file in os.listdir(
                            pre_det_dirs[ind]) if file.endswith(".geom")]
                    if len(geom_list) != 0:
                        geom = max(geom_list, key=os.path.getmtime)
                        copy2(geom, det_dirs[ind])
                    else:
                        logging.error(f'No geom found in: {det}')

                    mask_list = [os.path.join(
                        pre_det_dirs[ind], file) for file in os.listdir(
                            pre_det_dirs[ind]) if 'mask' in file]
                    if len(mask_list) != 0:
                        mask = max(mask_list, key=os.path.getmtime)
                        copy2(mask, det_dirs[ind])
                    else:
                        logging.error(f'No mask found in: {det}')

                    yaml_list = [os.path.join(
                        pre_det_dirs[ind], file) for file in os.listdir(
                            pre_det_dirs[ind]) if file.endswith(".yaml")]
                    if len(yaml_list) != 0:
                        yaml = max(yaml_list, key=os.path.getmtime)
                        copy2(yaml, det_dirs[ind])
                        self.fix_yaml(
                            os.path.join(
                                det_dirs[ind], os.path.basename(yaml)), mask=os.path.basename(mask), geom=os.path.basename(geom))
                    else:
                        logging.error(f'No yaml found in: {det}')

                logging.info(f"Copying om_reset_plots.py")
                copy2(os.path.join(pre_pwd, 'om_reset_plots.py'), self.pwd)
            else:
                logging.info(f"You've decided not to continue. Program will exit")
                sys.exit()


    def gui(
        self,
        user: str,
        facility: str = "S3DF",
        debug: bool = False,
    ):
        """Launch OM GUI.

        Parameters:

            user (str): username for computer account at facility.

            facility (str): Default: "S3DF". Options: "S3DF, NERSC".

            debug (bool): Default: False.
        """
        import logging
        import os
        import subprocess

        proc = [
            f"ssh -YAC {user}@s3dflogin "
            f"/sdf/group/lcls/ds/tools/mfx/scripts/unknown "
            f"{user} {self.experiment} {facility} 1 {str(debug)}"
            ]

        logging.info(proc)

        if debug:
            os.system(proc[0])
        else:
            subprocess.Popen(
                proc, shell=True,
                stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)


    def kill(self):
        import logging
        import subprocess

        proc = [
            "ssh -YAC mfx-monitor "
            "kill -9  `ps ux | grep monitor_wrapper.sh | grep -v grep | awk '{ print $2 }'`"
            ]

        logging.info(proc)

        subprocess.Popen(
            proc, shell=True,
            stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)


    def om(self, det=None, start='all', calibdir=None, debug: bool = False):
        """
        Launches OM

        Parameters
        ----------
        det: str, optional
            Detector name. Example 'Rayonix', 'Epix10k2M', or 'XES' for spectroscopy

        start: str, optional
            Which process to start. All by default

        calibdir: str, optional
            path to calib directory

        debug (bool): Default: False.

        Operations
        ----------

        """
        import logging
        import subprocess
        import os
        self.check_settings()
        monitor_wrapper = ''
        monitor_wrapper = subprocess.check_output(
            "ssh -YAC mfx-monitor ps aux | grep monitor_wrapper.sh | grep -v grep | awk '{ print $2 }'",shell=True).decode()

        det_dir = None

        if str(det).lower() == 'rayonix':
                det_dir = f'{self.pwd}/om_workspace_rayonix'
        elif str(det).lower() == 'epix10k2m':
                det_dir = f'{self.pwd}/om_workspace'
        elif str(det).lower() == 'xes':
                det_dir = f'{self.pwd}/om_workspace_xes'
        else:
            logging.error(
                "No proper detector seclected. Please use either"
                "'Rayonix', 'Epix10k2M', or 'XES' for spectroscopy")

        if monitor_wrapper == '':
            proc = [
                f'{det_dir}/run_gui.sh',
                f'{det_dir}/run_frame_viewer.sh',
                f'ssh -YAC mfx-monitor "cd {det_dir}; ./run_om.sh"'
                ]
        else:
            proc = [
                f'{det_dir}/run_gui.sh',
                f'{det_dir}/run_frame_viewer.sh'
                ]

        logging.info(proc)

        if debug:
            subprocess.Popen(
                proc[0], shell=True, 
                    stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
            subprocess.Popen(
                proc[1], shell=True, 
                    stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
            os.system(proc[-1])
        else:
            for cmd in proc:
                subprocess.Popen(
                    cmd, shell=True, 
                    stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)

    def reset(self):
        """
        Resets OM

        Parameters
        ----------

        Operations
        ----------

        """
        import logging
        import subprocess

        proc = [
            f'ssh -YAC mfx-monitor "source /reg/g/pcds/engineering_tools/mfx/scripts/pcds_conda; '
            f'conda deactivate; source /cds/sw/ds/ana/conda1/manage/bin/psconda.sh; '
            f'python {self.pwd}/om_reset_plots.py daq-mfx-mon10"',
            ]

        logging.info(proc)

        subprocess.Popen(
            proc, shell=True, 
            stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)

check_settings()

Checks OM's file system and sets it up if needed

Parameters
Operations
Source code in mfx/om.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def check_settings(self):
    """
    Checks OM's file system and sets it up if needed

    Parameters
    ----------

    Operations
    ----------
    """
    import os
    import sys
    import logging
    from shutil import copy2
    logging.info("Checking OM Files")
    if not os.path.exists(self.pwd) or len(os.listdir(self.pwd)) == 0:
        logging.warning(f"No Directory Exists for Experiment: {self.experiment}. Would you like to create it?")
        mkdir = input("(y/n)? ")

        if mkdir.lower() == "y":
            logging.info(f"Creating Directory for Experiment: {self.experiment}.")
            pre_pwd = max(
                [os.path.join(self.cwd, d) for d in os.listdir(
                    self.cwd) if os.path.isdir(os.path.join(self.cwd, d))], key=os.path.getmtime)

            os.makedirs(self.pwd)
            det_dirs=[f'{self.pwd}/om_workspace', f'{self.pwd}/om_workspace_rayonix', f'{self.pwd}/om_workspace_xes']
            logging.info(f"Creating Directories for Epix10k2M.")
            os.makedirs(det_dirs[0])
            logging.info(f"Creating Directories for Rayonix.")
            os.makedirs(det_dirs[1])
            logging.info(f"Creating Directories for XES.")
            os.makedirs(det_dirs[2])

            pre_det_dirs=[f'{pre_pwd}/om_workspace', f'{pre_pwd}/om_workspace_rayonix', f'{pre_pwd}/om_workspace_xes']

            logging.info(f"Copying Key Files from Previous Experiment: {pre_pwd}")

            for ind, det in enumerate(det_dirs):
                logging.info(f"Copying Key Files for: {pre_det_dirs[ind]}")
                shell_list = [os.path.join(pre_det_dirs[ind], file) for file in os.listdir(
                    pre_det_dirs[ind]) if file.endswith(".sh")]
                if len(shell_list) != 0:
                    for sh in shell_list:
                        copy2(sh, det_dirs[ind])
                else:
                    logging.error(f'No shell scripts found in: {det}')

                if os.path.isfile(os.path.join(det, 'run_om.sh')):
                    self.fix_run_om(os.path.join(det, 'run_om.sh'))

                geom_list = [os.path.join(
                    pre_det_dirs[ind], file) for file in os.listdir(
                        pre_det_dirs[ind]) if file.endswith(".geom")]
                if len(geom_list) != 0:
                    geom = max(geom_list, key=os.path.getmtime)
                    copy2(geom, det_dirs[ind])
                else:
                    logging.error(f'No geom found in: {det}')

                mask_list = [os.path.join(
                    pre_det_dirs[ind], file) for file in os.listdir(
                        pre_det_dirs[ind]) if 'mask' in file]
                if len(mask_list) != 0:
                    mask = max(mask_list, key=os.path.getmtime)
                    copy2(mask, det_dirs[ind])
                else:
                    logging.error(f'No mask found in: {det}')

                yaml_list = [os.path.join(
                    pre_det_dirs[ind], file) for file in os.listdir(
                        pre_det_dirs[ind]) if file.endswith(".yaml")]
                if len(yaml_list) != 0:
                    yaml = max(yaml_list, key=os.path.getmtime)
                    copy2(yaml, det_dirs[ind])
                    self.fix_yaml(
                        os.path.join(
                            det_dirs[ind], os.path.basename(yaml)), mask=os.path.basename(mask), geom=os.path.basename(geom))
                else:
                    logging.error(f'No yaml found in: {det}')

            logging.info(f"Copying om_reset_plots.py")
            copy2(os.path.join(pre_pwd, 'om_reset_plots.py'), self.pwd)
        else:
            logging.info(f"You've decided not to continue. Program will exit")
            sys.exit()

fix_run_om(path)

Fixes the run_om.sh file to have current experiment number

Parameters:

Name Type Description Default
path str

run_om.sh file path.

required

Operations:

Source code in mfx/om.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def fix_run_om(self, path: str):
    """
    Fixes the run_om.sh file to have current experiment number

    Parameters:
        path (str): run_om.sh file path.

    Operations:

    """
    from subprocess import check_output
    import logging
    logging.info(f"Updating run_om.sh file: {path}")
    wherepsana = check_output("wherepsana",shell=True).decode().strip('\n')

    with open(path, "r") as file:
        lines = file.readlines()
        for ind, line in enumerate(lines):
            if '--host' in lines[ind]:
                newline = f'     --host {wherepsana} $(pwd)/monitor_wrapper.sh\n'
                logging.info(f'Changing line {ind} to {newline}')
                lines[ind] = newline

    with open(path, 'w') as file:
        file.writelines(lines)

fix_yaml(yaml, mask='', geom='')

Fixes the yaml file to have current experiment number

Parameters:

Name Type Description Default
yaml str

monitor.yaml file path.

required
mask str

mask file path.

''
geom str

geom file path.

''

Operations:

Source code in mfx/om.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def fix_yaml(self, yaml: str, mask='', geom=''):
    """
    Fixes the yaml file to have current experiment number

    Parameters:
        yaml (str): monitor.yaml file path.

        mask (str): mask file path.

        geom (str): geom file path.

    Operations:

    """
    import logging
    logging.info(f"Updating yaml file: {yaml}")
    with open(yaml, "r") as file:
        lines = file.readlines()
        for ind, line in enumerate(lines):
            if 'psana_calibration_directory' in lines[ind]:
                newline = f'  psana_calibration_directory: /sdf/data/lcls/ds/mfx/{self.experiment}/calib\n'
                logging.info(f'Changing line {ind} to {newline}')
                lines[ind] = newline
        if mask != '':
            for ind, line in enumerate(lines):
                if 'bad_pixel_map_filename' in lines[ind]:
                    newline = f'  bad_pixel_map_filename: {mask}\n'
                    logging.info(f'Changing line {ind} to {newline}')
                    lines[ind] = newline

        if geom != '':
            for ind, line in enumerate(lines):
                if 'geometry_file' in lines[ind]:
                    newline = f'  geometry_file: {geom}\n'
                    logging.info(f'Changing line {ind} to {newline}')
                    lines[ind] = newline

    with open(yaml, 'w') as file:
        file.writelines(lines)

gui(user, facility='S3DF', debug=False)

Launch OM GUI.

Parameters:

user (str): username for computer account at facility.

facility (str): Default: "S3DF". Options: "S3DF, NERSC".

debug (bool): Default: False.
Source code in mfx/om.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def gui(
    self,
    user: str,
    facility: str = "S3DF",
    debug: bool = False,
):
    """Launch OM GUI.

    Parameters:

        user (str): username for computer account at facility.

        facility (str): Default: "S3DF". Options: "S3DF, NERSC".

        debug (bool): Default: False.
    """
    import logging
    import os
    import subprocess

    proc = [
        f"ssh -YAC {user}@s3dflogin "
        f"/sdf/group/lcls/ds/tools/mfx/scripts/unknown "
        f"{user} {self.experiment} {facility} 1 {str(debug)}"
        ]

    logging.info(proc)

    if debug:
        os.system(proc[0])
    else:
        subprocess.Popen(
            proc, shell=True,
            stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)

om(det=None, start='all', calibdir=None, debug=False)

Launches OM

Parameters

det: str, optional Detector name. Example 'Rayonix', 'Epix10k2M', or 'XES' for spectroscopy

str, optional

Which process to start. All by default

str, optional

path to calib directory

debug (bool): Default: False.

Operations
Source code in mfx/om.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def om(self, det=None, start='all', calibdir=None, debug: bool = False):
    """
    Launches OM

    Parameters
    ----------
    det: str, optional
        Detector name. Example 'Rayonix', 'Epix10k2M', or 'XES' for spectroscopy

    start: str, optional
        Which process to start. All by default

    calibdir: str, optional
        path to calib directory

    debug (bool): Default: False.

    Operations
    ----------

    """
    import logging
    import subprocess
    import os
    self.check_settings()
    monitor_wrapper = ''
    monitor_wrapper = subprocess.check_output(
        "ssh -YAC mfx-monitor ps aux | grep monitor_wrapper.sh | grep -v grep | awk '{ print $2 }'",shell=True).decode()

    det_dir = None

    if str(det).lower() == 'rayonix':
            det_dir = f'{self.pwd}/om_workspace_rayonix'
    elif str(det).lower() == 'epix10k2m':
            det_dir = f'{self.pwd}/om_workspace'
    elif str(det).lower() == 'xes':
            det_dir = f'{self.pwd}/om_workspace_xes'
    else:
        logging.error(
            "No proper detector seclected. Please use either"
            "'Rayonix', 'Epix10k2M', or 'XES' for spectroscopy")

    if monitor_wrapper == '':
        proc = [
            f'{det_dir}/run_gui.sh',
            f'{det_dir}/run_frame_viewer.sh',
            f'ssh -YAC mfx-monitor "cd {det_dir}; ./run_om.sh"'
            ]
    else:
        proc = [
            f'{det_dir}/run_gui.sh',
            f'{det_dir}/run_frame_viewer.sh'
            ]

    logging.info(proc)

    if debug:
        subprocess.Popen(
            proc[0], shell=True, 
                stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
        subprocess.Popen(
            proc[1], shell=True, 
                stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
        os.system(proc[-1])
    else:
        for cmd in proc:
            subprocess.Popen(
                cmd, shell=True, 
                stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)

reset()

Resets OM

Parameters
Operations
Source code in mfx/om.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
def reset(self):
    """
    Resets OM

    Parameters
    ----------

    Operations
    ----------

    """
    import logging
    import subprocess

    proc = [
        f'ssh -YAC mfx-monitor "source /reg/g/pcds/engineering_tools/mfx/scripts/pcds_conda; '
        f'conda deactivate; source /cds/sw/ds/ana/conda1/manage/bin/psconda.sh; '
        f'python {self.pwd}/om_reset_plots.py daq-mfx-mon10"',
        ]

    logging.info(proc)

    subprocess.Popen(
        proc, shell=True, 
        stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)

attenuator_scan_separate_runs(duration=None, record=False, transmissions=[0.01, 0.02, 0.03], use_daq=True, **kwargs)

Runs through attenuator conditions and records each as an individual run

Parameters

duration: int, optional When using the DAQ this corresponds to the number of events. If not using the DAQ, it corresponds to the number of seconds to wait at ech attenuator step. Default is 240 events (with DAQ), or 3 seconds (no DAQ).

bool, optional

set True to record

list of floats, optional

list of transmissions to run through. default [0.01,0.02,0.03]

bool, optional

Whether to include the DAQ or not. Default: True. If False can run the scans while using the DAQ elsewhere.

**kwargs - Additional optional keyword arguments events: int Provided for backwards compatibility. When using the DAQ, if this keyword argument is passed, and duration is not, it will be used as the number of events.

Operations

Source code in mfx/attenuator_scan.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def attenuator_scan_separate_runs(
    duration: int = None,
    record: bool = False,
    transmissions: list = [0.01, 0.02, 0.03],
    use_daq: bool = True,
    **kwargs
) -> None:
    """
    Runs through attenuator conditions and records each as an individual run

    Parameters
    ----------
    duration: int, optional
        When using the DAQ this corresponds to the number of events. If not
        using the DAQ, it corresponds to the number of seconds to wait at ech
        attenuator step. Default is 240 events (with DAQ), or 3 seconds (no DAQ).

    record: bool, optional
        set True to record

    transmissions: list of floats, optional
        list of transmissions to run through. default [0.01,0.02,0.03]

    use_daq: bool, optional
        Whether to include the DAQ or not. Default: True. If False can run the
        scans while using the DAQ elsewhere.

    **kwargs - Additional optional keyword arguments
        events: int
            Provided for backwards compatibility. When using the DAQ, if this
            keyword argument is passed, and `duration` is not, it will be used
            as the number of events.

    Operations
    ----------

    """
    from time import sleep
    from mfx.db import att, pp

    if use_daq:
        from mfx.db import daq

    evts = kwargs.get("events")
    if duration is None:
        if use_daq:
            duration = evts if evts else 240
        else:
            duration = 3
            if evts is not None:
                print("`events` parameter ignored when not using DAQ! Use `duration`!")

    pp.open()
    for i in transmissions:
        att(i)
        if use_daq:
            sleep(3)
            daq.begin(events=duration, record=record, wait=True, use_l3t=False)
            daq.end_run()
        else:
            sleep(duration)
    pp.close()
    if use_daq:
        daq.disconnect()

attenuator_scan_single_run(duration=None, record=False, transmissions=[0.01, 0.02, 0.03], use_daq=True, **kwargs)

Runs through attenuator conditions and records them all as one continuous run

Parameters

duration: int, optional When using the DAQ this corresponds to the number of events. If not using the DAQ, it corresponds to the number of seconds to wait at ech attenuator step. Default is 240 events (with DAQ), or 3 seconds (no DAQ).

bool, optional

set True to record

list of floats, optional

list of transmissions to run through. default [0.01,0.02,0.03]

bool, optional

Whether to include the DAQ or not. Default: True. If False can run the scans while using the DAQ elsewhere.

**kwargs - Additional optional keyword arguments events: int Provided for backwards compatibility. When using the DAQ, if this keyword argument is passed, and duration is not, it will be used as the number of events. It is ignored when not using the DAQ.

Operations

Source code in mfx/attenuator_scan.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def attenuator_scan_single_run(
    duration: int = None,
    record: bool = False,
    transmissions: list = [0.01, 0.02, 0.03],
    use_daq: bool = True,
    **kwargs
) -> None:
    """
    Runs through attenuator conditions and records them all as one continuous run

    Parameters
    ----------
    duration: int, optional
        When using the DAQ this corresponds to the number of events. If not
        using the DAQ, it corresponds to the number of seconds to wait at ech
        attenuator step. Default is 240 events (with DAQ), or 3 seconds (no DAQ).

    record: bool, optional
        set True to record

    transmissions: list of floats, optional
        list of transmissions to run through. default [0.01,0.02,0.03]

    use_daq: bool, optional
        Whether to include the DAQ or not. Default: True. If False can run the
        scans while using the DAQ elsewhere.

    **kwargs - Additional optional keyword arguments
        events: int
            Provided for backwards compatibility. When using the DAQ, if this
            keyword argument is passed, and `duration` is not, it will be used
            as the number of events. It is ignored when not using the DAQ.

    Operations
    ----------

    """
    from time import sleep
    from mfx.db import att, pp

    if use_daq:
        from mfx.db import daq

        #daq.end_run()
        #daq.disconnect()

    evts = kwargs.get("events")
    if duration is None:
        if use_daq:
            duration = evts if evts else 240
        else:
            duration = 3
            if evts is not None:
                print("`events` parameter ignored when not using DAQ! Use `duration`!")

    try:
        pp.open()
        if use_daq:
            daq.configure(record=record)
            sleep(3)
        for i in transmissions:
            att(i, wait=True)
            if use_daq:
                sleep(3)
                daq.begin(events=duration, record=record, wait=True, use_l3t=False)
            else:
                sleep(duration)
    finally:
        if use_daq:
            daq.end_run()
            daq.disconnect()
        pp.close()

autorun(sample='?', tag=None, run_length=300, record=True, runs=5, inspire=False, daq_delay=5, picker=None, cam=None)

Automate runs.... With optional quotes

Parameters

sample: str, optional Sample Name

str, optional

Run group tag

int, optional

number of seconds for run 300 is default

bool, optional

set True to record

int, optional

number of runs 5 is default

bool, optional

Set false by default because it makes Sandra sad. Set True to inspire

int, optional

delay time between runs. Default is 5 second but increase is the DAQ is being slow.

str, optional

If 'open' it opens pp before run starts. If 'flip' it flipflops before run starts

Operations

Source code in mfx/autorun.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def autorun(sample='?', tag=None, run_length=300, record=True,
            runs=5, inspire=False, daq_delay=5, picker=None, cam=None):
    """
    Automate runs.... With optional quotes

    Parameters
    ----------
    sample: str, optional
        Sample Name

    tag: str, optional
        Run group tag

    run_length: int, optional
        number of seconds for run 300 is default

    record: bool, optional
        set True to record

    runs: int, optional
        number of runs 5 is default

    inspire: bool, optional
        Set false by default because it makes Sandra sad. Set True to inspire

    daq_delay: int, optional
        delay time between runs. Default is 5 second but increase is the DAQ is being slow.

    picker: str, optional
        If 'open' it opens pp before run starts. If 'flip' it flipflops before run starts

    Operations
    ----------

    """
    import logging
    from time import sleep
    from mfx.db import daq, pp
    logger = logging.getLogger(__name__)

    if sample.lower()=='water' or sample.lower()=='h2o':
        inspire=True
    if picker=='open':
        pp.open()
    if picker=='flip':
        pp.flipflop()

    if tag is None:
        tag = sample

    for i in range(runs):
        logger.info(f"Run Number {daq.run_number() + 1} Running {sample}......{quote()['quote']}")
        run_number = daq.run_number() + 1
        status = begin(duration = run_length, record = record, wait = True, end_run = True)
        if cam is not None:
            ioc_cam_recorder(cam, run_length, tag)
        if status is False:
            pp.close()
            post(sample, run_number, record, inspire, 'Run ended prematurely. Probably sample delivery problem')
            logger.warning("[*] Stopping Run and exiting???...")
            sleep(5)
            daq.stop()
            daq.disconnect()
            logger.warning('Run ended prematurely. Probably sample delivery problem')
            break

        post(sample, tag, run_number, record, inspire)
        try:
            sleep(daq_delay)
        except KeyboardInterrupt:
            pp.close()
            logger.warning("[*] Stopping Run and exiting???...")
            sleep(5)
            daq.disconnect()
            status = False
            if status is False:
                logger.warning('Run ended prematurely. Probably sample delivery problem')
                break
    if status:
        pp.close()
        daq.end_run()
        daq.disconnect()
        logger.warning('Finished with all runs thank you for choosing the MFX beamline!\n')

begin(events=None, duration=300, record=False, use_l3t=None, controls=None, wait=False, end_run=False)

Start the daq and block until the daq has begun acquiring data.

Optionally block with wait=True until the daq has finished aquiring data. If blocking, a ctrl+c will end the run and clean up.

If omitted, any argument that is shared with configure will fall back to the configured value.

Internally, this calls kickoff and manages its Status object.

Parameters

events: int, optional Number events to take in the daq.

int, optional

Time to run the daq in seconds, if events was not provided.

bool, optional

If True, we'll configure the daq to record data before this run.

bool, optional

If True, we'll run with the level 3 trigger. This means that if we specified a number of events, we will wait for that many "good" events as determined by the daq.

dict{name: device} or list[device...], optional

If provided, values from these will make it into the DAQ data stream as variables. We will check device.position and device.value for quantities to use and we will update these values each time begin is called. To provide a list, all devices must have a name attribute.

bool, optional

If True, wait for the daq to finish aquiring data. A KeyboardInterrupt (ctrl+c) during this wait will end the run and clean up.

bool, optional

If True, we'll end the run after the daq has stopped.

Source code in mfx/autorun.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def begin(events=None, duration=300,
          record=False, use_l3t=None, controls=None,
          wait=False, end_run=False):
    """
    Start the daq and block until the daq has begun acquiring data.

    Optionally block with ``wait=True`` until the daq has finished aquiring
    data. If blocking, a ``ctrl+c`` will end the run and clean up.

    If omitted, any argument that is shared with `configure`
    will fall back to the configured value.

    Internally, this calls `kickoff` and manages its ``Status`` object.

    Parameters
    ----------
    events: ``int``, optional
        Number events to take in the daq.

    duration: ``int``, optional
        Time to run the daq in seconds, if ``events`` was not provided.

    record: ``bool``, optional
        If ``True``, we'll configure the daq to record data before this
        run.

    use_l3t: ``bool``, optional
        If ``True``, we'll run with the level 3 trigger. This means that
        if we specified a number of events, we will wait for that many
        "good" events as determined by the daq.

    controls: ``dict{name: device}`` or ``list[device...]``, optional
        If provided, values from these will make it into the DAQ data
        stream as variables. We will check ``device.position`` and
        ``device.value`` for quantities to use and we will update these
        values each time begin is called. To provide a list, all devices
        must have a ``name`` attribute.

    wait: ``bool``, optional
        If ``True``, wait for the daq to finish aquiring data. A
        ``KeyboardInterrupt`` (``ctrl+c``) during this wait will end the
        run and clean up.

    end_run: ``bool``, optional
        If ``True``, we'll end the run after the daq has stopped.
    """
    import logging
    from time import sleep
    from mfx.db import daq
    from ophyd.utils import StatusTimeoutError, WaitTimeoutError
    logger = logging.getLogger(__name__)

    logger.debug(('Daq.begin(events=%s, duration=%s, record=%s, '
                    'use_l3t=%s, controls=%s, wait=%s)'),
                    events, duration, record, use_l3t, controls, wait)
    status = True
    try:
        if record is not None and record != daq.record:
            old_record = daq.record
            daq.preconfig(record=record, show_queued_cfg=False)
        begin_status = daq.kickoff(events=events, duration=duration,
                                    use_l3t=use_l3t, controls=controls)
        try:
            begin_status.wait(timeout=daq._begin_timeout)
        except (StatusTimeoutError, WaitTimeoutError) as e:
            msg = (f'Timeout after {daq._begin_timeout} seconds waiting '
                    'for daq to begin. Exception: {type(e).__name__}')
            logger.info(msg)
            #raise DaqTimeoutError(msg) from None

        # In some daq configurations the begin status returns very early,
        # so we allow the user to configure an emperically derived extra
        # sleep.
        sleep(daq.config['begin_sleep'])
        if wait:
            daq.wait()
            if end_run:
                daq.end_run()
        if end_run and not wait:
            threading.Thread(target=daq._ender_thread, args=()).start()
        return status
    except KeyboardInterrupt:
            status = False
            return status

correct_timing_drift(amplitude_thresh=0.02, ipm_thresh=500.0, drift_adjustment_thresh=0.05, fwhm_threshs=(30, 130), num_events=61, will_log=True)

Automate the correction of timing drift. Will adjust the stages to center the timetool edge on the camera and compensate the laser delay to maintain the desired nominal time point. Runs in an infinite loop.

Parameters

amplitude_thresh : float, optional The minimum amplitude of the fitted timetool peak to include the data point in the rolling average used for drift correction. Default: 0.02. ipm_thresh : float, optional The minimum ipm DG2 value to perform drift correction. Setting a reasonable value prevents attempts at drift correction when X-rays are very weak or down. Default: 500. drift_adjustment_thresh : float, optional The minimum drift value to correct for in picoseconds. E.g. a value of 0.05 means any time the rolling average finds that the timetool center is off by 50 fs in either direction it will compensate. Default: 0.05 ps. fwhm_threshs : Tuple[float, float], optional Minimum and maximum FWHM from the processed timetool signal to consider a measurement to be "good." num_events : int, optional The number of "good" timetool edge measurements to include in the rolling average. Ideally a prime number to remove effects from sytematic errors. Default 61 measurements. will_log : bool, optional Log timing corrections to a file.

Source code in mfx/timetool.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def correct_timing_drift(
    amplitude_thresh: float = 0.02,
    ipm_thresh: float = 500.0,
    drift_adjustment_thresh: float = 0.05,
    fwhm_threshs: Tuple[float, float] = (30, 130),
    num_events: int = 61,
    will_log: bool = True,
) -> None:
    """
    Automate the correction of timing drift. Will adjust the stages to
    center the timetool edge on the camera and compensate the laser delay to
    maintain the desired nominal time point. Runs in an infinite loop.

    Parameters
    ----------
    amplitude_thresh : float, optional
        The minimum amplitude of the fitted timetool peak to include the
        data point in the rolling average used for drift correction.
        Default: 0.02.
    ipm_thresh : float, optional
        The minimum ipm DG2 value to perform drift correction. Setting a
        reasonable value prevents attempts at drift correction when X-rays
        are very weak or down. Default: 500.
    drift_adjustment_thresh : float, optional
        The minimum drift value to correct for in picoseconds. E.g. a value
        of 0.05 means any time the rolling average finds that the timetool
        center is off by 50 fs in either direction it will compensate. Default:
        0.05 ps.
    fwhm_threshs : Tuple[float, float], optional
        Minimum and maximum FWHM from the processed timetool signal to consider
        a measurement to be "good."
    num_events : int, optional
        The number of "good" timetool edge measurements to include in the
        rolling average. Ideally a prime number to remove effects from
        sytematic errors. Default 61 measurements.
    will_log : bool, optional
        Log timing corrections to a file.
    """
    from mfx.db import lxt, txt

    logfile: str = ""
    if will_log:
        logfile = input("Please enter a file to log correction info to: ")

    timetool_edges: np.ndarray = np.zeros([num_events])

    write_log(f"Entering timetool drift correction loop", logfile)
    while True:
        try:
            num_curr_edges: int = 0
            time_last_good_val: float = time.time()
            while num_curr_edges < num_events:
                try:
                    # EVENTBUILD PV contains 10 fields. TTALL makes up the last 8.
                    # (indices 0-7), and IPM DG1 and DG2 makeup the first 2.
                    # See `is_good_measurement` function for more accesses.
                    timetool: EpicsSignal = EpicsSignal("MFX:TT:01:EVENTBUILD.VALA")
                    tt_data: np.ndarray = timetool.get()

                    timetool_edge_ps: float = tt_data[3]

                    if is_good_measurement(
                        tt_data, amplitude_thresh, ipm_thresh, fwhm_threshs
                    ):
                        timetool_edges[num_curr_edges] = timetool_edge_ps
                        num_curr_edges += 1
                        time_last_good_val = time.time()
                    elif time.time() - time_last_good_val > 60:
                        write_log(
                            f"No good measurement over one minute. Check thresholds?",
                            logfile,
                        )
                        time_last_good_val = time.time()

                    time.sleep(0.01)
                except KeyboardInterrupt as e:
                    raise KeyboardInterrupt

            tt_edge_average_ps: float = np.mean(timetool_edges)
            write_log(f"Current average: {tt_edge_average_ps}", logfile)

            if np.abs(tt_edge_average_ps) > drift_adjustment_thresh:
                tt_average_seconds: float = -(tt_edge_average_ps * 1e-12)
                write_log(f"Making adjustment to {tt_average_seconds}!", logfile)
                lxt.mvr(tt_average_seconds)
                lxt.set_current_position(-float(txt.position))

        except KeyboardInterrupt as e:
            write_log(f"Breaking out of timetool drift correction loop", logfile)
            break

delay_scan(daq, time_motor, time_points, sweep_time, duration=None, record=None, use_l3t=False, controls=None)

Bluesky plan that sets up and executes the delay scan.

Parameters

daq: Daq The daq

DelayNewport

The movable device in seconds

list of float

The times in second to move between

float

The duration we take to move from one end of the range to the other.

bool, optional

Whether or not to record in the daq

float, optional

If provided, the time to run in seconds. If omitted, we'll run forever.

bool, optional

If True, events argument will be interpreted to only count events that pass the level 3 trigger

dict or list of devices, optional

If provided, values will make it to DAQ data stream as variables

Source code in mfx/delay_scan.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def delay_scan(daq, time_motor, time_points, sweep_time, duration=None, 
               record=None, use_l3t=False, controls=None):
    """
    Bluesky plan that sets up and executes the delay scan.

    Parameters
    ----------
    daq: Daq
        The daq

    time_motor: DelayNewport
        The movable device in seconds

    time_points: list of float
        The times in second to move between

    sweep_time: float
        The duration we take to move from one end of the range to the other.

    record: bool, optional
        Whether or not to record in the daq

    duration: float, optional
        If provided, the time to run in seconds. If omitted, we'll run forever.

    use_l3t: bool, optional
        If True, events argument will be interpreted to only count events that
        pass the level 3 trigger

    controls: dict or list of devices, optional
        If provided, values will make it to DAQ data stream as variables
    """

    spatial_pts = []
    for time_pt in time_points:
        pseudo_tuple = time_motor.PseudoPosition(delay=time_pt)
        real_tuple = time_motor.forward(pseudo_tuple)
        spatial_pts.append(real_tuple.motor)

    space_delta = abs(spatial_pts[0] - spatial_pts[1])
    velo = space_delta/sweep_time

    yield from bps.abs_set(time_motor.motor.velocity, velo)

    scan = infinite_scan([], time_motor, time_points, duration=duration)

    if daq is not None:
        yield from daq_during_wrapper(scan, record=record, use_l3t=use_l3t,
                                      controls=controls)
    else:
        yield from scan

detector_image(node=5, det='Rayonix', calibdir=None, ave=1)

Launches detector monitor

Parameters

node: int, optional Node to run detector monitor 1-9 only

str, optional

Detector name. Example 'Rayonix'

str, optional

path to calib directory

int, optional

Average over this number of events

Operations

Source code in mfx/detector_image.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def detector_image(node=5, det='Rayonix', calibdir=None, ave=1):
    """
    Launches detector monitor

    Parameters
    ----------
    node: int, optional
        Node to run detector monitor 1-9 only

    det: str, optional
        Detector name. Example 'Rayonix'

    calibdir: str, optional
        path to calib directory

    ave: int, optional
        Average over this number of events

    Operations
    ----------

    """
    import logging
    import subprocess
    from mfx.macros import get_exp

    experiment = str(get_exp())

    proc = [
        f"ssh -YAC mfxopr@daq-mfx-mon0{str(node)} "
        f"/cds/group/pcds/pyps/apps/hutch-python/mfx/scripts/"
        f"detector_image.sh {experiment} {str(det)} {str(calibdir)} {str(ave)}"
        ]

    logging.info(proc)

    subprocess.Popen(
        proc, shell=True, 
        stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)

detector_image_kill(node=5)

Kills all detector monitors made with this script

Parameters

node: int, optional Node to run detector monitor 1-9 only

str, optional

Detector name. Example 'Rayonix'

str, optional

path to calib directory

int, optional

Average over this number of events

Operations

Source code in mfx/detector_image.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def detector_image_kill(node=5):
    """
    Kills all detector monitors made with this script

    Parameters
    ----------
    node: int, optional
        Node to run detector monitor 1-9 only

    det: str, optional
        Detector name. Example 'Rayonix'

    calibdir: str, optional
        path to calib directory

    ave: int, optional
        Average over this number of events

    Operations
    ----------

    """
    import logging
    import subprocess

    proc = [
        f"ssh -YAC mfxopr@daq-mfx-mon0{str(node)} " +
        f"/cds/group/pcds/pyps/apps/hutch-python/mfx/scripts/"
        f"detector_image_kill.sh"
        ]

    logging.info(proc)

    subprocess.Popen(
        proc, shell=True)

focus_scan(camera, start=1, end=299, step=1)

Runs through transfocator Z to find the best focus

Parameters

camera: str, required camera where you want to focus

int, optional

step size of transfocator movements

int, optional

starting transfocator position

int, optional

final transfocator position

Examples: mfx dg1 yag is MFX:DG1:P6740 mfx dg2 yag is MFX:DG2:P6740 mfx dg3 yag is MFX:GIGE:02:IMAGE1

Operations

Source code in mfx/focus_scan.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def focus_scan(camera, start=1, end=299, step=1):
    """
    Runs through transfocator Z to find the best focus

    Parameters
    ----------
    camera: str, required
        camera where you want to focus

    step: int, optional
	step size of transfocator movements

    start: int, optional
	starting transfocator position

    end: int, optional
	final transfocator position

    Examples:
    mfx dg1 yag is MFX:DG1:P6740
    mfx dg2 yag is MFX:DG2:P6740
    mfx dg3 yag is MFX:GIGE:02:IMAGE1

    Operations
    ----------

    """
    # cd /reg/g/pcds/pyps/apps/hutch-python/mfx/mfx
    # from mfx.transfocator_scan import *
    from mfx.transfocator_scan import transfocator_aligner
    import numpy as np
    from mfx.db import tfs

    trf_align = transfocator_aligner(camera)
    trf_pos = np.arange(start, end, step)
    trf_align.scan_transfocator(tfs.translation,trf_pos,1)

infinite_scan(detectors, motor, points, duration=None, per_step=None, md=None)

Bluesky plan that moves a motor among points until interrupted.

Parameters

detectors: list of readables Objects to read into Python in the scan.

settable

Object to move in the scan.

list of floats

Positions to move between in the scan.

float

If provided, the time to run in seconds. If omitted, we'll run forever.

Source code in mfx/delay_scan.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def infinite_scan(detectors, motor, points, duration=None,
                  per_step=None, md=None):
    """
    Bluesky plan that moves a motor among points until interrupted.

    Parameters
    ----------
    detectors: list of readables
        Objects to read into Python in the scan.

    motor: settable
        Object to move in the scan.

    points: list of floats
        Positions to move between in the scan.

    duration: float
        If provided, the time to run in seconds. If omitted, we'll run forever.
    """
    if per_step is None:
        per_step = bps.one_nd_step

    if md is None:
        md = {}

    md.update(motors=[motor.name])
    start = time.time()

    #@bpp.stage_decorator(list(detectors) + [motor])
    @bpp.reset_positions_decorator()
    @bpp.run_decorator(md=md)
    def inner():
        # Where last position is stored
        pos_cache = defaultdict(lambda: None)
        while duration is None or time.time() - start < duration:
            for pt in points:
                step = {motor: pt}
                yield from per_step(detectors, step, pos_cache)

    return (yield from inner())

ioc_cam_recorder(cam='camera name', run_length=10, tag='?')

Record IOC Cameras

Parameters

cam: str, required Select camera PV you'd like to record

int, required

number of seconds for recording. 10 is default

str, required

Run group tag

Operations

Source code in mfx/autorun.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def ioc_cam_recorder(cam='camera name', run_length=10, tag='?'):
    """
    Record IOC Cameras

    Parameters
    ----------
    cam: str, required
        Select camera PV you'd like to record

    run_length: int, required
        number of seconds for recording. 10 is default

    tag: str, required
        Run group tag

    Operations
    ----------

    """
    import subprocess
    from epics import caget
    import logging
    from mfx.bash_utilities import bs
    bs = bs()
    camera_names = bs.camera_list_out()
    if cam not in [pv[1] for pv in camera_names]:
            logging.info("Desired Camera not in List. Please choose from the above list:.")
    else:
        rate = caget(f'{cam}:ArrayRate_RBV')
        n_images = int(run_length * rate)
        logging.info(f"Recording Camera {cam} for {run_length} sec")
        logging.info(
            f"/reg/g/pcds/engineering_tools/latest-released/scripts/image_saver -c {cam} -n {n_images} -f {tag} -p /cds/data/iocData")

        subprocess.Popen(
            [f"source /cds/group/pcds/pyps/conda/pcds_conda; /reg/g/pcds/engineering_tools/latest-released/scripts/image_saver -c {cam} -n {n_images} -f {tag} -p /cds/data/iocData"],
            shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)

laser_in(wait=False, timeout=10)

Insert the Reference Laser and clear the beampath

Parameters

wait: bool, optional Wait and check that motion is properly completed

float, optional

Time to wait for motion completion if requested to do so

Operations

  • Insert the Reference Laser
  • Set the Wave8 out (35 mm)
  • Set the DG1 slits to 6 mm x 6 mm
  • Set the DG2 upstream slits to 6 mm x 6 mm
  • Set the DG2 midstream slits to 1 mm x 1 mm
  • Set the DG2 downstream slits to 1 mm x 1 mm
Source code in mfx/macros.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def laser_in(wait=False, timeout=10):
    """
    Insert the Reference Laser and clear the beampath

    Parameters
    ----------
    wait: bool, optional
        Wait and check that motion is properly completed

    timeout: float, optional
        Time to wait for motion completion if requested to do so

    Operations
    ----------
    * Insert the Reference Laser
    * Set the Wave8 out (35 mm)
    * Set the DG1 slits to 6 mm x 6 mm
    * Set the DG2 upstream slits to 6 mm x 6 mm
    * Set the DG2 midstream slits to 1 mm x 1 mm
    * Set the DG2 downstream slits to 1 mm x 1 mm
    """
    # Command motion and collect status objects
    ref = mfx_reflaser.insert(wait=False)
    tfs = mfx_tfs.remove_all()
    dg1_ipm=mfx_dg1_ipm.target.remove()
    dg2_ipm=mfx_dg2_ipm.target.remove()
    dg1 = mfx_dg1_slits.move(6., wait=False)
    dg2_us = mfx_dg2_upstream_slits.move(6., wait=False)
    dg2_ms = mfx_dg2_midstream_slits.move(1., wait=False)
#    dg2_ds = mfx_dg2_downstream_slits.move(1., wait=False)
    # Combine status and wait for completion
    if wait:
        status_wait(ref & dg1 & dg2_us & dg2_ms,
                    timeout=timeout)

laser_out(wait=False, timeout=10)

Remove the Reference Laser and configure the beamline

Parameters

wait: bool, optional Wait and check that motion is properly completed

float, optional

Time to wait for motion completion if requested to do so

Operations

  • Remove the Reference Laser
  • Set the Wave8 Target 3 In (5.5 mm)
  • Set the DG1 slits to 0.7 mm x 0.7 mm
  • Set the DG2 upstream slits to 0.7 mm x 0.7 mm
  • Set the DG2 midstream slits to 0.7 mm x 0.7 mm
  • Set the DG2 downstream slits to 0.7 mm x 0.7 mm
Source code in mfx/macros.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def laser_out(wait=False, timeout=10):
    """
    Remove the Reference Laser and configure the beamline

    Parameters
    ----------
    wait: bool, optional
        Wait and check that motion is properly completed

    timeout: float, optional
        Time to wait for motion completion if requested to do so

    Operations
    ----------
    * Remove the Reference Laser
    * Set the Wave8 Target 3 In (5.5 mm)
    * Set the DG1 slits to 0.7 mm x 0.7 mm
    * Set the DG2 upstream slits to 0.7 mm x 0.7 mm
    * Set the DG2 midstream slits to 0.7 mm x 0.7 mm
    * Set the DG2 downstream slits to 0.7 mm x 0.7 mm
    """
    # Command motion and collect status objects
    ref = mfx_reflaser.remove(wait=False)
# Removing dg1 wave8 movement for now, until wave8 target positions have been fixed
#    w8 = mfx_dg1_wave8_motor.move(5.5, wait=False)
    dg1 = mfx_dg1_slits.move(0.7, wait=False)
    dg2_us = mfx_dg2_upstream_slits.move(0.7, wait=False)
    dg2_ms = mfx_dg2_midstream_slits.move(0.7, wait=False)
#    dg2_ds = mfx_dg2_downstream_slits.move(0.7, wait=False)
    # Combine status and wait for completion
    if wait:
        status_wait(ref & w8 & dg1 & dg2_us & dg2_ms ,
                    timeout=timeout)

mfxslits(pos)

Set all the slits to specific position

Source code in mfx/macros.py
86
87
88
89
90
91
def mfxslits(pos):
    """Set all the slits to specific position"""
    dg1 = mfx_dg1_slits.move(pos, wait=False)
    dg2_us = mfx_dg2_upstream_slits.move(pos, wait=False)
    dg2_ms = mfx_dg2_midstream_slits.move(pos, wait=False)
    dg2_ds = mfx_dg2_downstream_slits.move(pos, wait=False)

post(sample='?', tag=None, run_number=None, post=False, inspire=False, add_note='')

Posts a message to the elog

Parameters

sample: str, optional Sample Name

int, optional

Run Number. By default this is read off of the DAQ

bool, optional

set True to record/post message to elog

bool, optional

Set false by default because it makes Sandra sad. Set True to inspire

string, optional

adds additional note to elog message

Source code in mfx/autorun.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def post(sample='?', tag=None, run_number=None, post=False, inspire=False, add_note=''):
    """
    Posts a message to the elog

    Parameters
    ----------
    sample: str, optional
        Sample Name

    run_number: int, optional
        Run Number. By default this is read off of the DAQ

    post: bool, optional
        set True to record/post message to elog

    inspire: bool, optional
        Set false by default because it makes Sandra sad. Set True to inspire

    add_note: string, optional
        adds additional note to elog message 
    """
    from mfx.db import daq, elog
    if add_note!='':
        add_note = '\n' + add_note
    if tag is None:
        tag = sample
    if inspire:
        comment = f"Running {sample}\n{quote()['quote']}{add_note}"
    else:
        comment = f"Running {sample}{add_note}"
    if run_number is None:
        run_number = daq.run_number()
    info = [run_number, comment]
    post_msg = post_template.format(*info)
    print('\n' + post_msg + '\n')
    if post:
        elog.post(msg=post_msg, tags=tag, run=(run_number))
    return post_msg

xlj_fast_xyz(orientation='horizontal', scale=0.1)

Parameters

orientation: str, optional set orientation to change the x and y axis. horizontal by default use only 'horizontal' of 'vertical'

float, optional

starting scale for the step size

Operations

Base function to control motors with the arrow keys.

With three motors, you can use the right and left arrow keys to move right and left, up and down arrow keys to move up and down, and Shift+up and shift+down to move z in and out

The scale for the tweak can be doubled by pressing + and halved by pressing -. Shift+right and shift+left can also be used, and the up and down keys will also adjust the scaling in one motor mode. The starting scale can be set with the keyword argument scale.

Ctrl+c will stop an ongoing move during a tweak without exiting the tweak. Both q and ctrl+c will quit the tweak between moves.

Source code in mfx/xlj_fast.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def xlj_fast_xyz(orientation='horizontal', scale=0.1):
    """
    Parameters
    ----------
    orientation: str, optional
        set orientation to change the x and y axis. horizontal by default
        use only 'horizontal' of 'vertical'

    scale: float, optional
        starting scale for the step size

    Operations
    ----------
    Base function to control motors with the arrow keys.

    With three motors, you can use the right and left arrow keys to move right 
    and left, up and down arrow keys to move up and down, and Shift+up and 
    shift+down to move z in and out

    The scale for the tweak can be doubled by pressing + and halved by pressing
    -. Shift+right and shift+left can also be used, and the up and down keys will
    also adjust the scaling in one motor mode. The starting scale can be set
    with the keyword argument `scale`.

    Ctrl+c will stop an ongoing move during a tweak without exiting the tweak.
    Both q and ctrl+c will quit the tweak between moves.
    """

    if orientation == str('horizontal').lower():
        xlj_fast_x = BypassPositionCheck("MFX:LJH:JET:X", name="xlj_fast_x")
        xlj_fast_y = BypassPositionCheck("MFX:LJH:JET:Y", name="xlj_fast_y")

    if orientation == str('vertical').lower():
        xlj_fast_y = BypassPositionCheck("MFX:LJH:JET:X", name="xlj_fast_x")
        xlj_fast_x = BypassPositionCheck("MFX:LJH:JET:Y", name="xlj_fast_y")

    xlj_fast_z = BypassPositionCheck("MFX:LJH:JET:Z", name="xlj_fast_z")

    xlj = BeckhoffJet('MFX:LJH', name='xlj')

    up = "\x1b[A"
    down = "\x1b[B"
    right = "\x1b[C"
    left = "\x1b[D"
    shift_up = "\x1b[1;2A"
    shift_down = "\x1b[1;2B"
    shift_right = "\x1b[1;2C"
    shift_left = "\x1b[1;2D"
    alt_up = "\x1b[1;3A"
    alt_down = "\x1b[1;3B"
    alt_right = "\x1b[1;3C"
    alt_left = "\x1b[1;3D"
    ctrl_up = "\x1b[1;5A"
    ctrl_down = "\x1b[1;5B"
    ctrl_right = "\x1b[1;5C"
    ctrl_left = "\x1b[1;5D"
    plus = "+"
    equal = "="
    minus = "-"
    under = "_"

    abs_status = '{}: {:.4f}'
    exp_status = '{}: {:.4e}'

    move_keys = (left, right, up, down, shift_up, shift_down)
    scale_keys = (plus, minus, equal, under, shift_right, shift_left)
    motors = [xlj_fast_x, xlj_fast_y, xlj_fast_z]


    def show_status():
        if scale >= 0.0001:
            template = abs_status
        else:
            template = exp_status
        text = [template.format(mot.name, mot.wm()) for mot in motors]
        text.append(f'scale: {scale}')
        print('\x1b[2K\r' + ', '.join(text), end='')


    def usage():
        print()  # Newline
        print(" Left: move x motor left")
        print(" Right: move x motor right")
        print(" Down: move y motor down")
        print(" Up: move y motor up")
        print(" shift+up: Z upstream")
        print(" shift+down: Z downstream")
        print(" + or shift+right: scale*2")
        print(" - or shift+left: scale/2")
        print(" Press q to quit."
              " Press any other key to display this message.")
        print()  # Newline


    def edit_scale(scale, direction):
        """Function used to change the scale."""
        if direction in (up, shift_right, plus, equal):
            scale = scale*2
        elif direction in (down, shift_left, minus, under):
            scale = scale/2
        return scale


    def movement(scale, direction):
        """Function used to know when and the direction to move the motor."""
        try:
            if direction == left:
                if round(xlj.jet.x(), 2) != round(xlj_fast_x(), 2):
                    logger.error(f'xlj.jet.x = {xlj.jet.x()}, xlj_fast_x = {xlj_fast_x()}')
                    xlj_fast_x.umv(xlj.jet.x())
                xlj_fast_x.umvr(-scale, log=False, newline=False)
            elif direction == right:
                if round(xlj.jet.x(), 2) != round(xlj_fast_x(), 2):
                    logger.error(f'xlj.jet.x = {xlj.jet.x()}, xlj_fast_x = {xlj_fast_x()}')
                    xlj_fast_x.umv(xlj.jet.x())
                xlj_fast_x.umvr(scale, log=False, newline=False)
            elif direction == up:
                if round(xlj.jet.y(), 2) != round(xlj_fast_y(), 2):
                    logger.error(f'xlj.jet.y = {xlj.jet.y()}, xlj_fast_y = {xlj_fast_y()}')
                    xlj_fast_y.umv(xlj.jet.y())
                xlj_fast_y.umvr(scale, log=False, newline=False)
            elif direction == down:
                if round(xlj.jet.y(), 2) != round(xlj_fast_y(), 2):
                    logger.error(f'xlj.jet.y = {xlj.jet.y()}, xlj_fast_y = {xlj_fast_y()}')
                    xlj_fast_y.umv(xlj.jet.y())
                xlj_fast_y.umvr(-scale, log=False, newline=False)
            elif direction == shift_up:
                if round(xlj.jet.z(), 2) != round(xlj_fast_z(), 2):
                    logger.error(f'xlj.jet.z = {xlj.jet.z()}, xlj_fast_z = {xlj_fast_z()}')
                    xlj_fast_z.umv(xlj.jet.z())
                xlj_fast_z.umvr(-scale, log=False, newline=False)
            elif direction == shift_down:
                if round(xlj.jet.z(), 2) != round(xlj_fast_z(), 2):
                    logger.error(f'xlj.jet.z = {xlj.jet.z()}, xlj_fast_z = {xlj_fast_z()}')
                    xlj_fast_z.umv(xlj.jet.z())
                xlj_fast_z.umvr(scale, log=False, newline=False)
        except Exception as exc:
            logger.error('Error in tweak move: %s', exc)
            logger.debug('', exc_info=True)

    start_text = [f'{mot.name} at {mot.wm():.4f}' for mot in motors]
    logger.info('Started tweak of ' + ', '.join(start_text))
    usage()

    # Loop takes in user key input and stops when 'q' is pressed
    is_input = True
    while is_input is True:
        show_status()
        inp = utils.get_input()
        if inp in ('q'):
            is_input = False
        elif inp in move_keys:
            movement(scale, inp)
        elif inp in scale_keys:
            scale = edit_scale(scale, inp)
        elif inp in ('h'):
            usage()
        else:
            logger.error('Not the way to use this. Press "h" to see how.')
    print()
    logger.info('Tweak complete')