1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336 | class User:
from subprocess import check_output
import logging
import json
import sys
import time
import os
import numpy as np
import elog
from hutch_python.utils import safe_load
from ophyd import EpicsSignalRO
from ophyd import EpicsSignal
from bluesky import RunEngine
from bluesky.plans import scan
from bluesky.plans import list_scan
from ophyd import Component as Cpt
from ophyd import Device
from pcdsdevices.interface import BaseInterface
from pcdsdevices.areadetector import plugins
from mfx.db import daq
from mfx.db import camviewer
from mfx.db import RE
#from mfx.db import bec
#from mfx.db import mfx_ccm as ccm
from mfx.delay_scan import delay_scan
from mfx.db import mfx_lxt_fast1 as lxt_fast
from pcdsdevices.device_types import Newport, IMS
from pcdsdevices.evr import Trigger
from epics import caget
#from macros import *
import time
logger = logging.getLogger(__name__)
#from xpp.db import daq, seq, elog
#from ophyd.status import wait as status_wait
from pcdsdevices.evr import Trigger
#from xpp.db import cp
#from xpp.db import lp
#from xpp.db import xpp_pulsepicker as pp
#from xpp.db import xpp_ccm as ccm
#from pcdsdevices.device_types import Newport, IMS
# WAIT A WHILE FOR THE DAQ TO START
#import pcdsdaq.daq
#pcdsdaq.daq.BEGIN_TIMEOUT = 5
#######################
"""Generic User Object"""
with safe_load('Dummy'):
print('Import dummy')
def __init__(self):
# with safe_load ('VH_motors'):
# self.vh_y = IMS('XCS:USR:MMS:02', name='vh_y')
# self.vh_x = IMS('XCS:USR:MMS:03', name='vh_x')
# with safe_load('Von Hamos Beckhoff'):
# from pcdsdevices.epics_motor import BeckhoffAxis
# class VonHamos_Spec(Device):
# cr1_move = Cpt(BeckhoffAxis, ':01', name='cr1_move')
# cr2_move = Cpt(BeckhoffAxis, ':02', name='cr2_move')
# cr3_move = Cpt(BeckhoffAxis, ':03', name='cr3_move')
# cr4_move = Cpt(BeckhoffAxis, ':04', name='cr4_move')
# cr1_rot = Cpt(BeckhoffAxis, ':01', name='cr1_rot')
# cr2_rot = Cpt(BeckhoffAxis, ':02', name='cr2_rot')
# cr3_rot = Cpt(BeckhoffAxis, ':03', name='cr3_rot')
# cr4_rot = Cpt(BeckhoffAxis, ':04', name='cr4_rot')
# self.vhs = VonHamos_Spec('MFX:VHS:MMB', name='vhs')
with safe_load ('LED'):
#self.led = Trigger('XCS:R42:EVR:01:TRIG6', name='led_delay')
self.led = Trigger('MFX:LAS:EVR:01:TRIG3', name='led_delay')
#self.led = Trigger('XCS:R44:EVR:44:TRIG6', name='led_delay')
self.led_uS = MicroToNano()
# with safe_load('Laser Phase Motor'):
# from ophyd.epics_motor import EpicsMotor
# self.las_phase = EpicsMotor('LAS:FS4:MMS:PH', name='las_phase')
def set_current_position(self, motor, value):
motor.set_use_switch.put(1)
motor.set_use_switch.wait_for_connection()
motor.user_setpoint.put(value, force=True)
motor.user_setpoint.wait_for_connection()
motor.set_use_switch.put(0)
def led_scan(self, start, end, nsteps, duration):
"""
Function to scan led delays and dwell at each delay. Goes back to original delay after scanning.
Parameters
---------
start : float
Start delay in microseconds.
end : float
End delay in microseconds.
nsteps : integer
number of steps for delay.
duration : float
Dwell time at each delay. In seconds.
"""
old_led = self.led.ns_delay.get()
print('Scanning LED delays...')
steps = np.linspace(start * 1000.0, end * 1000.0, nsteps)
for step in steps:
self.led.ns_delay.put(step)
time.sleep(duration)
self.led.ns_delay.put(old_led)
print('Scan complete setting delay back to old value: ' +str(old_led))
def lxt_fast_set_absolute_zero(self):
currentpos = lxt_fast()
lxt_fast1_enc = UsDigitalUsbEncoder('MFX:USDUSB4:01:CH0', name='lxt_fast_enc1', linked_axis=lxt_fast)
currentenc = lxt_fast_enc.get()
#elog.post('Set current stage position {}, encoder value {} to 0'.format(currentpos,currentenc.pos))
print('Set current stage position {}, encoder value {} to 0'.format(currentpos,currentenc.pos))
lxt_fast.set_current_position(0)
lxt_fast_enc.set_zero()
return
def takeRun(self, nEvents=None, duration=None, record=True, use_l3t=False):
daq.configure(events=120, record=record, use_l3t=use_l3t)
daq.begin(events=nEvents, duration=duration)
daq.wait()
daq.end_run()
def pvascan(self, motor, start, end, nsteps, nEvents, record=None):
currPos = motor.get()
daq.configure(nEvents, record=record, controls=[motor])
RE(scan([daq], motor, start, end, nsteps))
motor.put(currPos)
def pvdscan(self, motor, start, end, nsteps, nEvents, record=None):
daq.configure(nEvents, record=record, controls=[motor])
currPos = motor.get()
RE(scan([daq], motor, currPos + start, currPos + end, nsteps))
motor.put(currPos)
#def ascan(self, motor, start, end, nsteps, nEvents, record=True, use_l3t=False):
# self.cleanup_RE()
# currPos = motor.wm()
# daq.configure(nEvents, record=record, controls=[motor], use_l3t=use_l3t)
# try:
# RE(scan([daq], motor, start, end, nsteps))
# except Exception:
# logger.debug('RE Exit', exc_info=True)
# finally:
# self.cleanup_RE()
# motor.mv(currPos)
def listscan(self, motor, posList, nEvents, record=True, use_l3t=False):
self.cleanup_RE()
currPos = motor.wm()
daq.configure(nEvents, record=record, controls=[motor], use_l3t=use_l3t)
try:
RE(list_scan([daq], motor, posList))
except Exception:
logger.debug('RE Exit', exc_info=True)
finally:
self.cleanup_RE()
motor.mv(currPos)
#def dscan(self, motor, start, end, nsteps, nEvents, record=True, use_l3t=False):
# self.cleanup_RE()
# daq.configure(nEvents, record=record, controls=[motor], use_l3t=use_l3t)
# currPos = motor.wm()
# try:
# RE(scan([daq], motor, currPos+start, currPos+end, nsteps))
# except Exception:
# logger.debug('RE Exit', exc_info=True)
# finally:
# self.cleanup_RE()
# motor.mv(currPos)
#def a2scan(self, m1, a1, b1, m2, a2, b2, nsteps, nEvents, record=True, use_l3t=False):
# self.cleanup_RE()
# daq.configure(nEvents, record=record, controls=[m1, m2], use_l3t=use_l3t)
# try:
# RE(scan([daq], m1, a1, b1, m2, a2, b2, nsteps))
# except Exception:
# logger.debug('RE Exit', exc_info=True)
# finally:
# self.cleanup_RE()
#def a3scan(self, m1, a1, b1, m2, a2, b2, m3, a3, b3, nsteps, nEvents, record=True):
# self.cleanup_RE()
# daq.configure(nEvents, record=record, controls=[m1, m2, m3])
# try:
# RE(scan([daq], m1, a1, b1, m2, a2, b2, m3, a3, b3, nsteps))
# except Exception:
# logger.debug('RE Exit', exc_info=True)
# finally:
# self.cleanup_RE()
#def delay_scan(self, start, end, sweep_time, record=True, use_l3t=False,
# duration=None):
# """Delay scan with the daq."""
# self.cleanup_RE()
# bec.disable_plots()
# controls = [lxt_fast]
# try:
# RE(delay_scan(daq, lxt_fast, [start, end], sweep_time,
# duration=duration, record=record, use_l3t=use_l3t,
# controls=controls))
# except Exception:
# logger.debug('RE Exit', exc_info=True)
# finally:
# self.cleanup_RE()
# bec.enable_plots()
#def empty_delay_scan(self, start, end, sweep_time, record=True,
# use_l3t=False, duration=None):
# """Delay scan without the daq."""
# self.cleanup_RE()
# #daq.configure(events=None, duration=None, record=record,
# # use_l3t=use_l3t, controls=[lxt_fast])
# try:
# RE(delay_scan(None, lxt_fast, [start, end], sweep_time,
# duration=duration))
# except Exception:
# logger.debug('RE Exit', exc_info=True)
# finally:
# self.cleanup_RE()
def cleanup_RE(self):
if not RE.state.is_idle:
print('Cleaning up RunEngine')
print('Stopping previous run')
try:
RE.stop()
except Exception:
pass
def scanExamples(self):
print("""
Absolute Scan
re.daq_ascan([], sim.fast_motor1, -1, 1, 21, events = 10, record = False)
Absolute 2D Scan
re.daq_a2scan([], sim.fast_motor1, -1, 1 , sim.fast_motor2, -1, 1, nsteps = 5, events = 10, record = False)
Absolute 3D Scan
re.daq_a3scan([], sim.fast_motor1, -1, 1 , sim.fast_motor2, -1, 1, sim.fast_motor3, -1, 1, nsteps = 5, events = 10, record = False)
Delta Scan
re.daq_dscan([], sim.fast_motor1, -1, 1, 21, events = 10, record = False)
Delay Scan
re.daq_delay_scan([], lxt_fast, [-1e-12, 1e-12], sweep_time = 5, duration = 20, record = False)
Empty Delay Scan (no Daq)
re.delay_scan([], lxt_fast, [-1e-12,1e-12], sweep_time = 5, duration = 20)
""")
##########################
# CCM Scanning Functions #
##########################
def perform_run(self, events, record=True, comment='', post=True,
**kwargs):
"""
Perform a single run of the experiment
Parameters
----------
events: int
Number of events to include in run
record: bool, optional
Whether to record the run
comment : str, optional
Comment for ELog
post: bool, optional
Whether to post to the experimental ELog or not. Will not post if
not recording
Note
----
"""
comment = comment or ''
# Start recording
logger.info("Starting DAQ run, -> record=%s", record)
daq.begin(events=events, record=record)
time.sleep(1)
# Post to ELog if desired
runnum = daq._control.runnumber()
info = [runnum, comment, events, self._delaystr]
post_msg = post_template.format(*info)
print(post_msg)
if post and record:
elog(msg=post_msg, run=runnum)
# Wait for the DAQ to finish
logger.info("Waiting or DAQ to complete %s events ...", events)
daq.wait()
logger.info("Run complete!")
daq.end_run()
time.sleep(0.5)
return
def dummy_daq_test(self, events=360, sleep=3, record=False):
daq.connect()
while 1:
daq.begin(events=events, record=record)
daq.wait()
daq.end_run()
time.sleep(sleep)
print(time.ctime(time.time()))
return
post_template = """\
Run Number: {} {}
Acquiring {} events
{}
"""
post_template_escan = """\
Run Number: {} {}
{}
Minimum photon_energy -> {}
Maximum photon_energy -> {}
"""
|
str, optional
Comment for ELog