Total Complexity | 232 |
Total Lines | 1552 |
Duplicated Lines | 17.91 % |
Changes | 11 | ||
Bugs | 0 | Features | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like AWG70K often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # -*- coding: utf-8 -*- |
||
38 | class AWG70K(Base, PulserInterface): |
||
39 | """ |
||
40 | |||
41 | """ |
||
42 | _modclass = 'awg70k' |
||
43 | _modtype = 'hardware' |
||
44 | |||
45 | # config options |
||
46 | _tmp_work_dir = ConfigOption(name='tmp_work_dir', |
||
47 | default=os.path.join(get_home_dir(), 'pulsed_files'), |
||
48 | missing='warn') |
||
49 | _visa_address = ConfigOption(name='awg_visa_address', missing='error') |
||
50 | _ip_address = ConfigOption(name='awg_ip_address', missing='error') |
||
51 | _ftp_dir = ConfigOption(name='ftp_root_dir', default='C:\\inetpub\\ftproot', missing='warn') |
||
52 | _username = ConfigOption(name='ftp_login', default='anonymous', missing='warn') |
||
53 | _password = ConfigOption(name='ftp_passwd', default='anonymous@', missing='warn') |
||
54 | _visa_timeout = ConfigOption(name='timeout', default=30, missing='nothing') |
||
55 | |||
56 | # translation dict from qudi trigger descriptor to device command |
||
57 | __event_triggers = {'OFF': 'OFF', 'A': 'ATR', 'B': 'BTR', 'INT': 'INT'} |
||
58 | |||
59 | def __init__(self, *args, **kwargs): |
||
60 | super().__init__(*args, **kwargs) |
||
61 | |||
62 | # Get an instance of the visa resource manager |
||
63 | self._rm = visa.ResourceManager() |
||
64 | |||
65 | self.awg = None # This variable will hold a reference to the awg visa resource |
||
66 | self.awg_model = '' # String describing the model |
||
67 | |||
68 | self.ftp_working_dir = 'waves' # subfolder of FTP root dir on AWG disk to work in |
||
69 | return |
||
70 | |||
71 | def on_activate(self): |
||
72 | """ Initialisation performed during activation of the module. |
||
73 | """ |
||
74 | # Create work directory if necessary |
||
75 | if not os.path.exists(self._tmp_work_dir): |
||
76 | os.makedirs(os.path.abspath(self._tmp_work_dir)) |
||
77 | |||
78 | # connect to awg using PyVISA |
||
79 | if self._visa_address not in self._rm.list_resources(): |
||
80 | self.awg = None |
||
81 | self.log.error('VISA address "{0}" not found by the pyVISA resource manager.\nCheck ' |
||
82 | 'the connection by using for example "Agilent Connection Expert".' |
||
83 | ''.format(self._visa_address)) |
||
84 | else: |
||
85 | self.awg = self._rm.open_resource(self._visa_address) |
||
86 | # set timeout by default to 30 sec |
||
87 | self.awg.timeout = self._visa_timeout * 1000 |
||
88 | |||
89 | # try connecting to AWG using FTP protocol |
||
90 | with FTP(self._ip_address) as ftp: |
||
91 | ftp.login(user=self._username, passwd=self._password) |
||
92 | ftp.cwd(self.ftp_working_dir) |
||
93 | |||
94 | if self.awg is not None: |
||
95 | self.awg_model = self.query('*IDN?').split(',')[1] |
||
96 | else: |
||
97 | self.awg_model = '' |
||
98 | return |
||
99 | |||
100 | def on_deactivate(self): |
||
101 | """ Required tasks to be performed during deactivation of the module. |
||
102 | """ |
||
103 | # Closes the connection to the AWG |
||
104 | try: |
||
105 | self.awg.close() |
||
106 | except: |
||
107 | self.log.debug('Closing AWG connection using pyvisa failed.') |
||
108 | self.log.info('Closed connection to AWG') |
||
109 | return |
||
110 | |||
111 | def get_constraints(self): |
||
112 | """ |
||
113 | Retrieve the hardware constrains from the Pulsing device. |
||
114 | |||
115 | @return constraints object: object with pulser constraints as attributes. |
||
116 | |||
117 | Provides all the constraints (e.g. sample_rate, amplitude, total_length_bins, |
||
118 | channel_config, ...) related to the pulse generator hardware to the caller. |
||
119 | |||
120 | SEE PulserConstraints CLASS IN pulser_interface.py FOR AVAILABLE CONSTRAINTS!!! |
||
121 | |||
122 | If you are not sure about the meaning, look in other hardware files to get an impression. |
||
123 | If still additional constraints are needed, then they have to be added to the |
||
124 | PulserConstraints class. |
||
125 | |||
126 | Each scalar parameter is an ScalarConstraints object defined in cor.util.interfaces. |
||
127 | Essentially it contains min/max values as well as min step size, default value and unit of |
||
128 | the parameter. |
||
129 | |||
130 | PulserConstraints.activation_config differs, since it contain the channel |
||
131 | configuration/activation information of the form: |
||
132 | {<descriptor_str>: <channel_set>, |
||
133 | <descriptor_str>: <channel_set>, |
||
134 | ...} |
||
135 | |||
136 | If the constraints cannot be set in the pulsing hardware (e.g. because it might have no |
||
137 | sequence mode) just leave it out so that the default is used (only zeros). |
||
138 | """ |
||
139 | constraints = PulserConstraints() |
||
140 | |||
141 | if self.awg_model == 'AWG70002A': |
||
142 | constraints.sample_rate.min = 1.5e3 |
||
143 | constraints.sample_rate.max = 25.0e9 |
||
144 | constraints.sample_rate.step = 5.0e2 |
||
145 | constraints.sample_rate.default = 25.0e9 |
||
146 | elif self.awg_model == 'AWG70001A': |
||
147 | constraints.sample_rate.min = 3.0e3 |
||
148 | constraints.sample_rate.max = 50.0e9 |
||
149 | constraints.sample_rate.step = 1.0e3 |
||
150 | constraints.sample_rate.default = 50.0e9 |
||
151 | |||
152 | constraints.a_ch_amplitude.min = 0.25 |
||
153 | constraints.a_ch_amplitude.max = 0.5 |
||
154 | constraints.a_ch_amplitude.step = 0.001 |
||
155 | constraints.a_ch_amplitude.default = 0.5 |
||
156 | # FIXME: Enter the proper digital channel low constraints: |
||
157 | constraints.d_ch_low.min = 0.0 |
||
158 | constraints.d_ch_low.max = 0.0 |
||
159 | constraints.d_ch_low.step = 0.0 |
||
160 | constraints.d_ch_low.default = 0.0 |
||
161 | # FIXME: Enter the proper digital channel high constraints: |
||
162 | constraints.d_ch_high.min = 0.0 |
||
163 | constraints.d_ch_high.max = 1.4 |
||
164 | constraints.d_ch_high.step = 0.1 |
||
165 | constraints.d_ch_high.default = 1.4 |
||
166 | |||
167 | constraints.waveform_length.min = 1 |
||
168 | constraints.waveform_length.max = 8000000000 |
||
169 | constraints.waveform_length.step = 1 |
||
170 | constraints.waveform_length.default = 1 |
||
171 | |||
172 | # FIXME: Check the proper number for your device |
||
173 | constraints.waveform_num.min = 1 |
||
174 | constraints.waveform_num.max = 32000 |
||
175 | constraints.waveform_num.step = 1 |
||
176 | constraints.waveform_num.default = 1 |
||
177 | # FIXME: Check the proper number for your device |
||
178 | constraints.sequence_num.min = 1 |
||
179 | constraints.sequence_num.max = 4000 |
||
180 | constraints.sequence_num.step = 1 |
||
181 | constraints.sequence_num.default = 1 |
||
182 | # FIXME: Check the proper number for your device |
||
183 | constraints.subsequence_num.min = 1 |
||
184 | constraints.subsequence_num.max = 8000 |
||
185 | constraints.subsequence_num.step = 1 |
||
186 | constraints.subsequence_num.default = 1 |
||
187 | |||
188 | # If sequencer mode is available then these should be specified |
||
189 | constraints.repetitions.min = 0 |
||
190 | constraints.repetitions.max = 65536 |
||
191 | constraints.repetitions.step = 1 |
||
192 | constraints.repetitions.default = 0 |
||
193 | # ToDo: Check how many external triggers are available |
||
194 | constraints.event_triggers = ['A', 'B'] |
||
195 | constraints.flags = ['A', 'B', 'C', 'D'] |
||
196 | |||
197 | constraints.sequence_steps.min = 0 |
||
198 | constraints.sequence_steps.max = 8000 |
||
199 | constraints.sequence_steps.step = 1 |
||
200 | constraints.sequence_steps.default = 0 |
||
201 | |||
202 | # the name a_ch<num> and d_ch<num> are generic names, which describe UNAMBIGUOUSLY the |
||
203 | # channels. Here all possible channel configurations are stated, where only the generic |
||
204 | # names should be used. The names for the different configurations can be customary chosen. |
||
205 | activation_config = OrderedDict() |
||
206 | if self.awg_model == 'AWG70002A': |
||
207 | activation_config['all'] = {'a_ch1', 'd_ch1', 'd_ch2', 'a_ch2', 'd_ch3', 'd_ch4'} |
||
208 | # Usage of both channels but reduced markers (higher analog resolution) |
||
209 | activation_config['ch1_2mrk_ch2_1mrk'] = {'a_ch1', 'd_ch1', 'd_ch2', 'a_ch2', 'd_ch3'} |
||
210 | activation_config['ch1_2mrk_ch2_0mrk'] = {'a_ch1', 'd_ch1', 'd_ch2', 'a_ch2'} |
||
211 | activation_config['ch1_1mrk_ch2_2mrk'] = {'a_ch1', 'd_ch1', 'a_ch2', 'd_ch3', 'd_ch4'} |
||
212 | activation_config['ch1_0mrk_ch2_2mrk'] = {'a_ch1', 'a_ch2', 'd_ch3', 'd_ch4'} |
||
213 | activation_config['ch1_1mrk_ch2_1mrk'] = {'a_ch1', 'd_ch1', 'a_ch2', 'd_ch3'} |
||
214 | activation_config['ch1_0mrk_ch2_1mrk'] = {'a_ch1', 'a_ch2', 'd_ch3'} |
||
215 | activation_config['ch1_1mrk_ch2_0mrk'] = {'a_ch1', 'd_ch1', 'a_ch2'} |
||
216 | # Usage of channel 1 only: |
||
217 | activation_config['ch1_2mrk'] = {'a_ch1', 'd_ch1', 'd_ch2'} |
||
218 | # Usage of channel 2 only: |
||
219 | activation_config['ch2_2mrk'] = {'a_ch2', 'd_ch3', 'd_ch4'} |
||
220 | # Usage of only channel 1 with one marker: |
||
221 | activation_config['ch1_1mrk'] = {'a_ch1', 'd_ch1'} |
||
222 | # Usage of only channel 2 with one marker: |
||
223 | activation_config['ch2_1mrk'] = {'a_ch2', 'd_ch3'} |
||
224 | # Usage of only channel 1 with no marker: |
||
225 | activation_config['ch1_0mrk'] = {'a_ch1'} |
||
226 | # Usage of only channel 2 with no marker: |
||
227 | activation_config['ch2_0mrk'] = {'a_ch2'} |
||
228 | elif self.awg_model == 'AWG70001A': |
||
229 | activation_config['all'] = {'a_ch1', 'd_ch1', 'd_ch2'} |
||
230 | # Usage of only channel 1 with one marker: |
||
231 | activation_config['ch1_1mrk'] = {'a_ch1', 'd_ch1'} |
||
232 | # Usage of only channel 1 with no marker: |
||
233 | activation_config['ch1_0mrk'] = {'a_ch1'} |
||
234 | |||
235 | constraints.activation_config = activation_config |
||
236 | |||
237 | # FIXME: additional constraint really necessary? |
||
238 | constraints.dac_resolution = {'min': 8, 'max': 10, 'step': 1, 'unit': 'bit'} |
||
239 | return constraints |
||
240 | |||
241 | def pulser_on(self): |
||
242 | """ Switches the pulsing device on. |
||
243 | |||
244 | @return int: error code (0:OK, -1:error, higher number corresponds to |
||
245 | current status of the device. Check then the |
||
246 | class variable status_dic.) |
||
247 | """ |
||
248 | # do nothing if AWG is already running |
||
249 | if not self._is_output_on(): |
||
250 | self.write('AWGC:RUN') |
||
251 | # wait until the AWG is actually running |
||
252 | while not self._is_output_on(): |
||
253 | time.sleep(0.25) |
||
254 | return self.get_status()[0] |
||
255 | |||
256 | def pulser_off(self): |
||
257 | """ Switches the pulsing device off. |
||
258 | |||
259 | @return int: error code (0:OK, -1:error, higher number corresponds to |
||
260 | current status of the device. Check then the |
||
261 | class variable status_dic.) |
||
262 | """ |
||
263 | # do nothing if AWG is already idle |
||
264 | if self._is_output_on(): |
||
265 | self.write('AWGC:STOP') |
||
266 | # wait until the AWG has actually stopped |
||
267 | while self._is_output_on(): |
||
268 | time.sleep(0.25) |
||
269 | return self.get_status()[0] |
||
270 | |||
271 | def write_waveform(self, name, analog_samples, digital_samples, is_first_chunk, is_last_chunk, |
||
272 | total_number_of_samples): |
||
273 | """ |
||
274 | Write a new waveform or append samples to an already existing waveform on the device memory. |
||
275 | The flags is_first_chunk and is_last_chunk can be used as indicator if a new waveform should |
||
276 | be created or if the write process to a waveform should be terminated. |
||
277 | |||
278 | @param name: str, the name of the waveform to be created/append to |
||
279 | @param analog_samples: numpy.ndarray of type float32 containing the voltage samples |
||
280 | @param digital_samples: numpy.ndarray of type bool containing the marker states |
||
281 | (if analog channels are active, this must be the same length as |
||
282 | analog_samples) |
||
283 | @param is_first_chunk: bool, flag indicating if it is the first chunk to write. |
||
284 | If True this method will create a new empty wavveform. |
||
285 | If False the samples are appended to the existing waveform. |
||
286 | @param is_last_chunk: bool, flag indicating if it is the last chunk to write. |
||
287 | Some devices may need to know when to close the appending wfm. |
||
288 | @param total_number_of_samples: int, The number of sample points for the entire waveform |
||
289 | (not only the currently written chunk) |
||
290 | |||
291 | @return: (int, list) number of samples written (-1 indicates failed process) and list of |
||
292 | created waveform names |
||
293 | """ |
||
294 | waveforms = list() |
||
295 | |||
296 | # Sanity checks |
||
297 | if len(analog_samples) == 0: |
||
298 | self.log.error('No analog samples passed to write_waveform method in awg70k.') |
||
299 | return -1, waveforms |
||
300 | |||
301 | min_samples = int(self.query('WLIS:WAV:LMIN?')) |
||
302 | if total_number_of_samples < min_samples: |
||
303 | self.log.error('Unable to write waveform.\nNumber of samples to write ({0:d}) is ' |
||
304 | 'smaller than the allowed minimum waveform length ({1:d}).' |
||
305 | ''.format(total_number_of_samples, min_samples)) |
||
306 | return -1, waveforms |
||
307 | |||
308 | # determine active channels |
||
309 | activation_dict = self.get_active_channels() |
||
310 | active_channels = {chnl for chnl in activation_dict if activation_dict[chnl]} |
||
311 | active_analog = sorted(chnl for chnl in active_channels if chnl.startswith('a')) |
||
312 | |||
313 | # Sanity check of channel numbers |
||
314 | if active_channels != set(analog_samples.keys()).union(set(digital_samples.keys())): |
||
315 | self.log.error('Mismatch of channel activation and sample array dimensions for ' |
||
316 | 'waveform creation.\nChannel activation is: {0}\nSample arrays have: ' |
||
317 | ''.format(active_channels, |
||
318 | set(analog_samples.keys()).union(set(digital_samples.keys())))) |
||
319 | return -1, waveforms |
||
320 | |||
321 | # Write waveforms. One for each analog channel. |
||
322 | for a_ch in active_analog: |
||
323 | # Get the integer analog channel number |
||
324 | a_ch_num = int(a_ch.split('ch')[-1]) |
||
325 | # Get the digital channel specifiers belonging to this analog channel markers |
||
326 | mrk_ch_1 = 'd_ch{0:d}'.format(a_ch_num * 2 - 1) |
||
327 | mrk_ch_2 = 'd_ch{0:d}'.format(a_ch_num * 2) |
||
328 | |||
329 | start = time.time() |
||
330 | # Encode marker information in an array of bytes (uint8). Avoid intermediate copies!!! |
||
331 | if mrk_ch_1 in digital_samples and mrk_ch_2 in digital_samples: |
||
332 | mrk_bytes = digital_samples[mrk_ch_2].view('uint8') |
||
333 | tmp_bytes = digital_samples[mrk_ch_1].view('uint8') |
||
334 | np.left_shift(mrk_bytes, 7, out=mrk_bytes) |
||
335 | np.left_shift(tmp_bytes, 6, out=tmp_bytes) |
||
336 | np.add(mrk_bytes, tmp_bytes, out=mrk_bytes) |
||
337 | elif mrk_ch_1 in digital_samples: |
||
338 | mrk_bytes = digital_samples[mrk_ch_1].view('uint8') |
||
339 | np.left_shift(mrk_bytes, 6, out=mrk_bytes) |
||
340 | else: |
||
341 | mrk_bytes = None |
||
342 | print('Prepare digital channel data: {0}'.format(time.time()-start)) |
||
343 | |||
344 | # Create waveform name string |
||
345 | wfm_name = '{0}_ch{1:d}'.format(name, a_ch_num) |
||
346 | |||
347 | # Check if waveform already exists and delete if necessary. |
||
348 | if wfm_name in self.get_waveform_names(): |
||
349 | self.delete_waveform(wfm_name) |
||
350 | |||
351 | # Write WFMX file for waveform |
||
352 | start = time.time() |
||
353 | self._write_wfmx(filename=wfm_name, |
||
354 | analog_samples=analog_samples[a_ch], |
||
355 | digital_samples=mrk_bytes, |
||
356 | is_first_chunk=is_first_chunk, |
||
357 | is_last_chunk=is_last_chunk, |
||
358 | total_number_of_samples=total_number_of_samples) |
||
359 | print('Write WFMX file: {0}'.format(time.time() - start)) |
||
360 | |||
361 | # transfer waveform to AWG and load into workspace |
||
362 | start = time.time() |
||
363 | self._send_file(filename=wfm_name + '.wfmx') |
||
364 | print('Send WFMX file: {0}'.format(time.time() - start)) |
||
365 | |||
366 | start = time.time() |
||
367 | self.write('MMEM:OPEN "{0}"'.format(os.path.join(self._ftp_path, wfm_name + '.wfmx'))) |
||
368 | # Wait for everything to complete |
||
369 | while int(self.query('*OPC?')) != 1: |
||
370 | time.sleep(0.25) |
||
371 | # Just to make sure |
||
372 | while wfm_name not in self.get_waveform_names(): |
||
373 | time.sleep(0.25) |
||
374 | print('Load WFMX file into workspace: {0}'.format(time.time() - start)) |
||
375 | |||
376 | # Append created waveform name to waveform list |
||
377 | waveforms.append(wfm_name) |
||
378 | return total_number_of_samples, waveforms |
||
379 | |||
380 | def write_sequence(self, name, sequence_parameter_list): |
||
381 | """ |
||
382 | Write a new sequence on the device memory. |
||
383 | |||
384 | @param name: str, the name of the waveform to be created/append to |
||
385 | @param sequence_parameter_list: list, contains the parameters for each sequence step and |
||
386 | the according waveform names. |
||
387 | |||
388 | @return: int, number of sequence steps written (-1 indicates failed process) |
||
389 | """ |
||
390 | # Check if device has sequencer option installed |
||
391 | if not self.has_sequence_mode(): |
||
392 | self.log.error('Direct sequence generation in AWG not possible. Sequencer option not ' |
||
393 | 'installed.') |
||
394 | return -1 |
||
395 | |||
396 | # Check if all waveforms are present on device memory |
||
397 | avail_waveforms = set(self.get_waveform_names()) |
||
398 | for waveform_tuple, param_dict in sequence_parameter_list: |
||
399 | if not avail_waveforms.issuperset(waveform_tuple): |
||
400 | self.log.error('Failed to create sequence "{0}" due to waveforms "{1}" not ' |
||
401 | 'present in device memory.'.format(name, waveform_tuple)) |
||
402 | return -1 |
||
403 | |||
404 | active_analog = sorted(chnl for chnl in self.get_active_channels() if chnl.startswith('a')) |
||
405 | num_tracks = len(active_analog) |
||
406 | num_steps = len(sequence_parameter_list) |
||
407 | |||
408 | # Create new sequence and set jump timing to immediate. |
||
409 | # Delete old sequence by the same name if present. |
||
410 | self.new_sequence(name=name, steps=num_steps) |
||
411 | |||
412 | # Fill in sequence information |
||
413 | for step, (wfm_tuple, seq_params) in enumerate(sequence_parameter_list, 1): |
||
414 | # Set waveforms to play |
||
415 | if num_tracks == len(wfm_tuple): |
||
416 | for track, waveform in enumerate(wfm_tuple, 1): |
||
417 | self.sequence_set_waveform(name, waveform, step, track) |
||
418 | else: |
||
419 | self.log.error('Unable to write sequence.\nLength of waveform tuple "{0}" does not ' |
||
420 | 'match the number of sequence tracks.'.format(waveform_tuple)) |
||
421 | return -1 |
||
422 | |||
423 | # Set event jump trigger |
||
424 | self.sequence_set_event_jump(name, |
||
425 | step, |
||
426 | seq_params['event_trigger'], |
||
427 | seq_params['event_jump_to']) |
||
428 | # Set wait trigger |
||
429 | self.sequence_set_wait_trigger(name, step, seq_params['wait_for']) |
||
430 | # Set repetitions |
||
431 | self.sequence_set_repetitions(name, step, seq_params['repetitions']) |
||
432 | # Set go_to parameter |
||
433 | self.sequence_set_goto(name, step, seq_params['go_to']) |
||
434 | # Set flag states |
||
435 | trigger = seq_params['flag_trigger'] != 'OFF' |
||
436 | flag_list = [seq_params['flag_trigger']] if trigger else [seq_params['flag_high']] |
||
437 | self.sequence_set_flags(name, step, flag_list, trigger) |
||
438 | |||
439 | # Wait for everything to complete |
||
440 | while int(self.query('*OPC?')) != 1: |
||
441 | time.sleep(0.25) |
||
442 | return num_steps |
||
443 | |||
444 | def get_waveform_names(self): |
||
445 | """ Retrieve the names of all uploaded waveforms on the device. |
||
446 | |||
447 | @return list: List of all uploaded waveform name strings in the device workspace. |
||
448 | """ |
||
449 | try: |
||
450 | query_return = self.query('WLIS:LIST?') |
||
451 | except visa.VisaIOError: |
||
452 | query_return = None |
||
453 | self.log.error('Unable to read waveform list from device. VisaIOError occured.') |
||
454 | waveform_list = sorted(query_return.split(',')) if query_return else list() |
||
455 | return waveform_list |
||
456 | |||
457 | def get_sequence_names(self): |
||
458 | """ Retrieve the names of all uploaded sequence on the device. |
||
459 | |||
460 | @return list: List of all uploaded sequence name strings in the device workspace. |
||
461 | """ |
||
462 | sequence_list = list() |
||
463 | |||
464 | if not self.has_sequence_mode(): |
||
465 | return sequence_list |
||
466 | |||
467 | try: |
||
468 | number_of_seq = int(self.query('SLIS:SIZE?')) |
||
469 | for ii in range(number_of_seq): |
||
470 | sequence_list.append(self.query('SLIS:NAME? {0:d}'.format(ii + 1))) |
||
471 | except visa.VisaIOError: |
||
472 | self.log.error('Unable to read sequence list from device. VisaIOError occurred.') |
||
473 | return sequence_list |
||
474 | |||
475 | def delete_waveform(self, waveform_name): |
||
476 | """ Delete the waveform with name "waveform_name" from the device memory. |
||
477 | |||
478 | @param str waveform_name: The name of the waveform to be deleted |
||
479 | Optionally a list of waveform names can be passed. |
||
480 | |||
481 | @return list: a list of deleted waveform names. |
||
482 | """ |
||
483 | if isinstance(waveform_name, str): |
||
484 | waveform_name = [waveform_name] |
||
485 | |||
486 | avail_waveforms = self.get_waveform_names() |
||
487 | deleted_waveforms = list() |
||
488 | for waveform in waveform_name: |
||
489 | if waveform in avail_waveforms: |
||
490 | self.write('WLIS:WAV:DEL "{0}"'.format(waveform)) |
||
491 | deleted_waveforms.append(waveform) |
||
492 | return deleted_waveforms |
||
493 | |||
494 | def delete_sequence(self, sequence_name): |
||
495 | """ Delete the sequence with name "sequence_name" from the device memory. |
||
496 | |||
497 | @param str sequence_name: The name of the sequence to be deleted |
||
498 | Optionally a list of sequence names can be passed. |
||
499 | |||
500 | @return list: a list of deleted sequence names. |
||
501 | """ |
||
502 | if isinstance(sequence_name, str): |
||
503 | sequence_name = [sequence_name] |
||
504 | |||
505 | avail_sequences = self.get_sequence_names() |
||
506 | deleted_sequences = list() |
||
507 | for sequence in sequence_name: |
||
508 | if sequence in avail_sequences: |
||
509 | self.write('SLIS:SEQ:DEL "{0}"'.format(sequence)) |
||
510 | deleted_sequences.append(sequence) |
||
511 | return deleted_sequences |
||
512 | |||
513 | View Code Duplication | def load_waveform(self, load_dict): |
|
|
|||
514 | """ Loads a waveform to the specified channel of the pulsing device. |
||
515 | For devices that have a workspace (i.e. AWG) this will load the waveform from the device |
||
516 | workspace into the channel. |
||
517 | For a device without mass memory this will make the waveform/pattern that has been |
||
518 | previously written with self.write_waveform ready to play. |
||
519 | |||
520 | @param load_dict: dict|list, a dictionary with keys being one of the available channel |
||
521 | index and values being the name of the already written |
||
522 | waveform to load into the channel. |
||
523 | Examples: {1: rabi_ch1, 2: rabi_ch2} or |
||
524 | {1: rabi_ch2, 2: rabi_ch1} |
||
525 | If just a list of waveform names if given, the channel |
||
526 | association will be invoked from the channel |
||
527 | suffix '_ch1', '_ch2' etc. |
||
528 | |||
529 | @return (dict, str): Dictionary with keys being the channel number and values being the |
||
530 | respective asset loaded into the channel, string describing the asset |
||
531 | type ('waveform' or 'sequence') |
||
532 | """ |
||
533 | if isinstance(load_dict, list): |
||
534 | new_dict = dict() |
||
535 | for waveform in load_dict: |
||
536 | channel = int(waveform.rsplit('_ch', 1)[1]) |
||
537 | new_dict[channel] = waveform |
||
538 | load_dict = new_dict |
||
539 | |||
540 | # Get all active channels |
||
541 | chnl_activation = self.get_active_channels() |
||
542 | analog_channels = sorted( |
||
543 | chnl for chnl in chnl_activation if chnl.startswith('a') and chnl_activation[chnl]) |
||
544 | |||
545 | # Check if all channels to load to are active |
||
546 | channels_to_set = {'a_ch{0:d}'.format(chnl_num) for chnl_num in load_dict} |
||
547 | if not channels_to_set.issubset(analog_channels): |
||
548 | self.log.error('Unable to load waveforms into channels.\n' |
||
549 | 'One or more channels to set are not active.') |
||
550 | return self.get_loaded_assets() |
||
551 | |||
552 | # Check if all waveforms to load are present on device memory |
||
553 | if not set(load_dict.values()).issubset(self.get_waveform_names()): |
||
554 | self.log.error('Unable to load waveforms into channels.\n' |
||
555 | 'One or more waveforms to load are missing on device memory.') |
||
556 | return self.get_loaded_assets() |
||
557 | |||
558 | # Load waveforms into channels |
||
559 | for chnl_num, waveform in load_dict.items(): |
||
560 | self.write('SOUR{0:d}:CASS:WAV "{1}"'.format(chnl_num, waveform)) |
||
561 | while self.query('SOUR{0:d}:CASS?'.format(chnl_num)) != waveform: |
||
562 | time.sleep(0.1) |
||
563 | |||
564 | return self.get_loaded_assets() |
||
565 | |||
566 | def load_sequence(self, sequence_name): |
||
567 | """ Loads a sequence to the channels of the device in order to be ready for playback. |
||
568 | For devices that have a workspace (i.e. AWG) this will load the sequence from the device |
||
569 | workspace into the channels. |
||
570 | |||
571 | @param sequence_name: str, name of the sequence to load |
||
572 | |||
573 | @return (dict, str): Dictionary with keys being the channel number and values being the |
||
574 | respective asset loaded into the channel, string describing the asset |
||
575 | type ('waveform' or 'sequence') |
||
576 | """ |
||
577 | if sequence_name not in self.get_sequence_names(): |
||
578 | self.log.error('Unable to load sequence.\n' |
||
579 | 'Sequence to load is missing on device memory.') |
||
580 | return self.get_loaded_assets() |
||
581 | |||
582 | # Get all active channels |
||
583 | chnl_activation = self.get_active_channels() |
||
584 | analog_channels = sorted( |
||
585 | chnl for chnl in chnl_activation if chnl.startswith('a') and chnl_activation[chnl]) |
||
586 | |||
587 | # Check if number of sequence tracks matches the number of analog channels |
||
588 | trac_num = int(self.query('SLIS:SEQ:TRAC? "{0}"'.format(sequence_name))) |
||
589 | if trac_num != len(analog_channels): |
||
590 | self.log.error('Unable to load sequence.\nNumber of tracks in sequence to load does ' |
||
591 | 'not match the number of active analog channels.') |
||
592 | return self.get_loaded_assets() |
||
593 | |||
594 | # Load sequence |
||
595 | for chnl in range(1, trac_num + 1): |
||
596 | self.write('SOUR{0:d}:CASS:SEQ "{1}", {2:d}'.format(chnl, sequence_name, chnl)) |
||
597 | while self.query('SOUR{0:d}:CASS?'.format(chnl))[1:-2] != '{0},{1:d}'.format( |
||
598 | sequence_name, chnl): |
||
599 | time.sleep(0.2) |
||
600 | |||
601 | return self.get_loaded_assets() |
||
602 | |||
603 | def get_loaded_assets(self): |
||
604 | """ |
||
605 | Retrieve the currently loaded asset names for each active channel of the device. |
||
606 | The returned dictionary will have the channel numbers as keys. |
||
607 | In case of loaded waveforms the dictionary values will be the waveform names. |
||
608 | In case of a loaded sequence the values will be the sequence name appended by a suffix |
||
609 | representing the track loaded to the respective channel (i.e. '<sequence_name>_1'). |
||
610 | |||
611 | @return (dict, str): Dictionary with keys being the channel number and values being the |
||
612 | respective asset loaded into the channel, |
||
613 | string describing the asset type ('waveform' or 'sequence') |
||
614 | """ |
||
615 | # Get all active channels |
||
616 | chnl_activation = self.get_active_channels() |
||
617 | channel_numbers = sorted(int(chnl.split('_ch')[1]) for chnl in chnl_activation if |
||
618 | chnl.startswith('a') and chnl_activation[chnl]) |
||
619 | |||
620 | # Get assets per channel |
||
621 | loaded_assets = dict() |
||
622 | current_type = None |
||
623 | for chnl_num in channel_numbers: |
||
624 | # Ask AWG for currently loaded waveform or sequence. The answer for a waveform will |
||
625 | # look like '"waveformname"\n' and for a sequence '"sequencename,1"\n' |
||
626 | # (where the number is the current track) |
||
627 | asset_name = self.query('SOUR1:CASS?') |
||
628 | # Figure out if a sequence or just a waveform is loaded by splitting after the comma |
||
629 | splitted = asset_name.rsplit(',', 1) |
||
630 | # If the length is 2 a sequence is loaded and if it is 1 a waveform is loaded |
||
631 | asset_name = splitted[0] |
||
632 | if len(splitted) > 1: |
||
633 | if current_type is not None and current_type != 'sequence': |
||
634 | self.log.error('Unable to determine loaded assets.') |
||
635 | return dict(), '' |
||
636 | current_type = 'sequence' |
||
637 | asset_name += '_' + splitted[1] |
||
638 | else: |
||
639 | if current_type is not None and current_type != 'waveform': |
||
640 | self.log.error('Unable to determine loaded assets.') |
||
641 | return dict(), '' |
||
642 | current_type = 'waveform' |
||
643 | loaded_assets[chnl_num] = asset_name |
||
644 | |||
645 | return loaded_assets, current_type |
||
646 | |||
647 | def clear_all(self): |
||
648 | """ Clears all loaded waveform from the pulse generators RAM. |
||
649 | |||
650 | @return int: error code (0:OK, -1:error) |
||
651 | |||
652 | Unused for digital pulse generators without storage capability |
||
653 | (PulseBlaster, FPGA). |
||
654 | """ |
||
655 | self.write('WLIS:WAV:DEL ALL') |
||
656 | while int(self.query('*OPC?')) != 1: |
||
657 | time.sleep(0.25) |
||
658 | if self.has_sequence_mode(): |
||
659 | self.write('SLIS:SEQ:DEL ALL') |
||
660 | while int(self.query('*OPC?')) != 1: |
||
661 | time.sleep(0.25) |
||
662 | return 0 |
||
663 | |||
664 | def get_status(self): |
||
665 | """ Retrieves the status of the pulsing hardware |
||
666 | |||
667 | @return (int, dict): inter value of the current status with the |
||
668 | corresponding dictionary containing status |
||
669 | description for all the possible status variables |
||
670 | of the pulse generator hardware |
||
671 | """ |
||
672 | status_dic = {-1: 'Failed Request or Communication', |
||
673 | 0: 'Device has stopped, but can receive commands', |
||
674 | 1: 'Device is active and running'} |
||
675 | current_status = -1 if self.awg is None else int(self._is_output_on()) |
||
676 | # All the other status messages should have higher integer values then 1. |
||
677 | return current_status, status_dic |
||
678 | |||
679 | def set_sample_rate(self, sample_rate): |
||
680 | """ Set the sample rate of the pulse generator hardware |
||
681 | |||
682 | @param float sample_rate: The sample rate to be set (in Hz) |
||
683 | |||
684 | @return foat: the sample rate returned from the device (-1:error) |
||
685 | """ |
||
686 | # Check if AWG is in function generator mode |
||
687 | # self._activate_awg_mode() |
||
688 | |||
689 | self.write('CLOCK:SRATE %.4G' % sample_rate) |
||
690 | while int(self.query('*OPC?')) != 1: |
||
691 | time.sleep(0.25) |
||
692 | time.sleep(1) |
||
693 | return self.get_sample_rate() |
||
694 | |||
695 | def get_sample_rate(self): |
||
696 | """ Set the sample rate of the pulse generator hardware |
||
697 | |||
698 | @return float: The current sample rate of the device (in Hz) |
||
699 | """ |
||
700 | return_rate = float(self.query('CLOCK:SRATE?')) |
||
701 | return return_rate |
||
702 | |||
703 | def get_analog_level(self, amplitude=None, offset=None): |
||
704 | """ Retrieve the analog amplitude and offset of the provided channels. |
||
705 | |||
706 | @param list amplitude: optional, if a specific amplitude value (in Volt |
||
707 | peak to peak, i.e. the full amplitude) of a |
||
708 | channel is desired. |
||
709 | @param list offset: optional, if a specific high value (in Volt) of a |
||
710 | channel is desired. |
||
711 | |||
712 | @return dict: with keys being the generic string channel names and items |
||
713 | being the values for those channels. Amplitude is always |
||
714 | denoted in Volt-peak-to-peak and Offset in (absolute) |
||
715 | Voltage. |
||
716 | |||
717 | Note: Do not return a saved amplitude and/or offset value but instead |
||
718 | retrieve the current amplitude and/or offset directly from the |
||
719 | device. |
||
720 | |||
721 | If no entries provided then the levels of all channels where simply |
||
722 | returned. If no analog channels provided, return just an empty dict. |
||
723 | Example of a possible input: |
||
724 | amplitude = ['a_ch1','a_ch4'], offset =[1,3] |
||
725 | to obtain the amplitude of channel 1 and 4 and the offset |
||
726 | {'a_ch1': -0.5, 'a_ch4': 2.0} {'a_ch1': 0.0, 'a_ch3':-0.75} |
||
727 | since no high request was performed. |
||
728 | |||
729 | The major difference to digital signals is that analog signals are |
||
730 | always oscillating or changing signals, otherwise you can use just |
||
731 | digital output. In contrast to digital output levels, analog output |
||
732 | levels are defined by an amplitude (here total signal span, denoted in |
||
733 | Voltage peak to peak) and an offset (a value around which the signal |
||
734 | oscillates, denoted by an (absolute) voltage). |
||
735 | |||
736 | In general there is no bijective correspondence between |
||
737 | (amplitude, offset) and (value high, value low)! |
||
738 | """ |
||
739 | amp = dict() |
||
740 | off = dict() |
||
741 | |||
742 | chnl_list = self._get_all_analog_channels() |
||
743 | |||
744 | # get pp amplitudes |
||
745 | View Code Duplication | if amplitude is None: |
|
746 | for ch_num, chnl in enumerate(chnl_list, 1): |
||
747 | amp[chnl] = float(self.query('SOUR{0:d}:VOLT:AMPL?'.format(ch_num))) |
||
748 | else: |
||
749 | for chnl in amplitude: |
||
750 | if chnl in chnl_list: |
||
751 | ch_num = int(chnl.rsplit('_ch', 1)[1]) |
||
752 | amp[chnl] = float(self.query('SOUR{0:d}:VOLT:AMPL?'.format(ch_num))) |
||
753 | else: |
||
754 | self.log.warning('Get analog amplitude from AWG70k channel "{0}" failed. ' |
||
755 | 'Channel non-existent.'.format(chnl)) |
||
756 | |||
757 | # get voltage offsets |
||
758 | if offset is None: |
||
759 | for ch_num, chnl in enumerate(chnl_list): |
||
760 | off[chnl] = 0.0 |
||
761 | else: |
||
762 | for chnl in offset: |
||
763 | if chnl in chnl_list: |
||
764 | ch_num = int(chnl.rsplit('_ch', 1)[1]) |
||
765 | off[chnl] = 0.0 |
||
766 | else: |
||
767 | self.log.warning('Get analog offset from AWG70k channel "{0}" failed. ' |
||
768 | 'Channel non-existent.'.format(chnl)) |
||
769 | return amp, off |
||
770 | |||
771 | View Code Duplication | def set_analog_level(self, amplitude=None, offset=None): |
|
772 | """ Set amplitude and/or offset value of the provided analog channel. |
||
773 | |||
774 | @param dict amplitude: dictionary, with key being the channel and items |
||
775 | being the amplitude values (in Volt peak to peak, |
||
776 | i.e. the full amplitude) for the desired channel. |
||
777 | @param dict offset: dictionary, with key being the channel and items |
||
778 | being the offset values (in absolute volt) for the |
||
779 | desired channel. |
||
780 | |||
781 | @return (dict, dict): tuple of two dicts with the actual set values for |
||
782 | amplitude and offset. |
||
783 | |||
784 | If nothing is passed then the command will return two empty dicts. |
||
785 | |||
786 | Note: After setting the analog and/or offset of the device, retrieve |
||
787 | them again for obtaining the actual set value(s) and use that |
||
788 | information for further processing. |
||
789 | |||
790 | The major difference to digital signals is that analog signals are |
||
791 | always oscillating or changing signals, otherwise you can use just |
||
792 | digital output. In contrast to digital output levels, analog output |
||
793 | levels are defined by an amplitude (here total signal span, denoted in |
||
794 | Voltage peak to peak) and an offset (a value around which the signal |
||
795 | oscillates, denoted by an (absolute) voltage). |
||
796 | |||
797 | In general there is no bijective correspondence between |
||
798 | (amplitude, offset) and (value high, value low)! |
||
799 | """ |
||
800 | # Check the inputs by using the constraints... |
||
801 | constraints = self.get_constraints() |
||
802 | # ...and the available analog channels |
||
803 | analog_channels = self._get_all_analog_channels() |
||
804 | |||
805 | # amplitude sanity check |
||
806 | if amplitude is not None: |
||
807 | for chnl in amplitude: |
||
808 | ch_num = int(chnl.rsplit('_ch', 1)[1]) |
||
809 | if chnl not in analog_channels: |
||
810 | self.log.warning('Channel to set (a_ch{0}) not available in AWG.\nSetting ' |
||
811 | 'analogue voltage for this channel ignored.'.format(chnl)) |
||
812 | del amplitude[chnl] |
||
813 | if amplitude[chnl] < constraints.a_ch_amplitude.min: |
||
814 | self.log.warning('Minimum Vpp for channel "{0}" is {1}. Requested Vpp of {2}V ' |
||
815 | 'was ignored and instead set to min value.' |
||
816 | ''.format(chnl, constraints.a_ch_amplitude.min, |
||
817 | amplitude[chnl])) |
||
818 | amplitude[chnl] = constraints.a_ch_amplitude.min |
||
819 | elif amplitude[chnl] > constraints.a_ch_amplitude.max: |
||
820 | self.log.warning('Maximum Vpp for channel "{0}" is {1}. Requested Vpp of {2}V ' |
||
821 | 'was ignored and instead set to max value.' |
||
822 | ''.format(chnl, constraints.a_ch_amplitude.max, |
||
823 | amplitude[chnl])) |
||
824 | amplitude[chnl] = constraints.a_ch_amplitude.max |
||
825 | # offset sanity check |
||
826 | if offset is not None: |
||
827 | for chnl in offset: |
||
828 | ch_num = int(chnl.rsplit('_ch', 1)[1]) |
||
829 | if chnl not in analog_channels: |
||
830 | self.log.warning('Channel to set (a_ch{0}) not available in AWG.\nSetting ' |
||
831 | 'offset voltage for this channel ignored.'.format(chnl)) |
||
832 | del offset[chnl] |
||
833 | if offset[chnl] < constraints.a_ch_offset.min: |
||
834 | self.log.warning('Minimum offset for channel "{0}" is {1}. Requested offset of ' |
||
835 | '{2}V was ignored and instead set to min value.' |
||
836 | ''.format(chnl, constraints.a_ch_offset.min, offset[chnl])) |
||
837 | offset[chnl] = constraints.a_ch_offset.min |
||
838 | elif offset[chnl] > constraints.a_ch_offset.max: |
||
839 | self.log.warning('Maximum offset for channel "{0}" is {1}. Requested offset of ' |
||
840 | '{2}V was ignored and instead set to max value.' |
||
841 | ''.format(chnl, constraints.a_ch_offset.max, |
||
842 | offset[chnl])) |
||
843 | offset[chnl] = constraints.a_ch_offset.max |
||
844 | |||
845 | if amplitude is not None: |
||
846 | for a_ch in amplitude: |
||
847 | ch_num = int(chnl.rsplit('_ch', 1)[1]) |
||
848 | self.write('SOUR{0:d}:VOLT:AMPL {1}'.format(ch_num, amplitude[a_ch])) |
||
849 | while int(self.query('*OPC?')) != 1: |
||
850 | time.sleep(0.25) |
||
851 | |||
852 | if offset is not None: |
||
853 | for a_ch in offset: |
||
854 | ch_num = int(chnl.rsplit('_ch', 1)[1]) |
||
855 | self.write('SOUR{0:d}:VOLT:OFFSET {1}'.format(ch_num, offset[a_ch])) |
||
856 | while int(self.query('*OPC?')) != 1: |
||
857 | time.sleep(0.25) |
||
858 | return self.get_analog_level() |
||
859 | |||
860 | View Code Duplication | def get_digital_level(self, low=None, high=None): |
|
861 | """ Retrieve the digital low and high level of the provided channels. |
||
862 | |||
863 | @param list low: optional, if a specific low value (in Volt) of a |
||
864 | channel is desired. |
||
865 | @param list high: optional, if a specific high value (in Volt) of a |
||
866 | channel is desired. |
||
867 | |||
868 | @return: (dict, dict): tuple of two dicts, with keys being the channel |
||
869 | number and items being the values for those |
||
870 | channels. Both low and high value of a channel is |
||
871 | denoted in (absolute) Voltage. |
||
872 | |||
873 | Note: Do not return a saved low and/or high value but instead retrieve |
||
874 | the current low and/or high value directly from the device. |
||
875 | |||
876 | If no entries provided then the levels of all channels where simply |
||
877 | returned. If no digital channels provided, return just an empty dict. |
||
878 | |||
879 | Example of a possible input: |
||
880 | low = ['d_ch1', 'd_ch4'] |
||
881 | to obtain the low voltage values of digital channel 1 an 4. A possible |
||
882 | answer might be |
||
883 | {'d_ch1': -0.5, 'd_ch4': 2.0} {} |
||
884 | since no high request was performed. |
||
885 | |||
886 | The major difference to analog signals is that digital signals are |
||
887 | either ON or OFF, whereas analog channels have a varying amplitude |
||
888 | range. In contrast to analog output levels, digital output levels are |
||
889 | defined by a voltage, which corresponds to the ON status and a voltage |
||
890 | which corresponds to the OFF status (both denoted in (absolute) voltage) |
||
891 | |||
892 | In general there is no bijective correspondence between |
||
893 | (amplitude, offset) and (value high, value low)! |
||
894 | """ |
||
895 | # TODO: Test with multiple channel AWG |
||
896 | low_val = {} |
||
897 | high_val = {} |
||
898 | |||
899 | digital_channels = self._get_all_digital_channels() |
||
900 | |||
901 | if low is None: |
||
902 | low = digital_channels |
||
903 | if high is None: |
||
904 | high = digital_channels |
||
905 | |||
906 | # get low marker levels |
||
907 | for chnl in low: |
||
908 | if chnl not in digital_channels: |
||
909 | continue |
||
910 | d_ch_number = int(chnl.rsplit('_ch', 1)[1]) |
||
911 | a_ch_number = (1 + d_ch_number) // 2 |
||
912 | marker_index = 2 - (d_ch_number % 2) |
||
913 | low_val[chnl] = float( |
||
914 | self.query('SOUR{0:d}:MARK{1:d}:VOLT:LOW?'.format(a_ch_number, marker_index))) |
||
915 | # get high marker levels |
||
916 | for chnl in high: |
||
917 | if chnl not in digital_channels: |
||
918 | continue |
||
919 | d_ch_number = int(chnl.rsplit('_ch', 1)[1]) |
||
920 | a_ch_number = (1 + d_ch_number) // 2 |
||
921 | marker_index = 2 - (d_ch_number % 2) |
||
922 | high_val[chnl] = float( |
||
923 | self.query('SOUR{0:d}:MARK{1:d}:VOLT:HIGH?'.format(a_ch_number, marker_index))) |
||
924 | |||
925 | return low_val, high_val |
||
926 | |||
927 | def set_digital_level(self, low=None, high=None): |
||
928 | """ Set low and/or high value of the provided digital channel. |
||
929 | |||
930 | @param dict low: dictionary, with key being the channel and items being |
||
931 | the low values (in volt) for the desired channel. |
||
932 | @param dict high: dictionary, with key being the channel and items being |
||
933 | the high values (in volt) for the desired channel. |
||
934 | |||
935 | @return (dict, dict): tuple of two dicts where first dict denotes the |
||
936 | current low value and the second dict the high |
||
937 | value. |
||
938 | |||
939 | If nothing is passed then the command will return two empty dicts. |
||
940 | |||
941 | Note: After setting the high and/or low values of the device, retrieve |
||
942 | them again for obtaining the actual set value(s) and use that |
||
943 | information for further processing. |
||
944 | |||
945 | The major difference to analog signals is that digital signals are |
||
946 | either ON or OFF, whereas analog channels have a varying amplitude |
||
947 | range. In contrast to analog output levels, digital output levels are |
||
948 | defined by a voltage, which corresponds to the ON status and a voltage |
||
949 | which corresponds to the OFF status (both denoted in (absolute) voltage) |
||
950 | |||
951 | In general there is no bijective correspondence between |
||
952 | (amplitude, offset) and (value high, value low)! |
||
953 | """ |
||
954 | if low is None: |
||
955 | low = dict() |
||
956 | if high is None: |
||
957 | high = dict() |
||
958 | |||
959 | #If you want to check the input use the constraints: |
||
960 | constraints = self.get_constraints() |
||
961 | |||
962 | for d_ch, value in low.items(): |
||
963 | #FIXME: Tell the device the proper digital voltage low value: |
||
964 | # self.tell('SOURCE1:MARKER{0}:VOLTAGE:LOW {1}'.format(d_ch, low[d_ch])) |
||
965 | pass |
||
966 | |||
967 | for d_ch, value in high.items(): |
||
968 | #FIXME: Tell the device the proper digital voltage high value: |
||
969 | # self.tell('SOURCE1:MARKER{0}:VOLTAGE:HIGH {1}'.format(d_ch, high[d_ch])) |
||
970 | pass |
||
971 | return self.get_digital_level() |
||
972 | |||
973 | def get_active_channels(self, ch=None): |
||
974 | """ Get the active channels of the pulse generator hardware. |
||
975 | |||
976 | @param list ch: optional, if specific analog or digital channels are |
||
977 | needed to be asked without obtaining all the channels. |
||
978 | |||
979 | @return dict: where keys denoting the channel number and items boolean |
||
980 | expressions whether channel are active or not. |
||
981 | |||
982 | Example for an possible input (order is not important): |
||
983 | ch = ['a_ch2', 'd_ch2', 'a_ch1', 'd_ch5', 'd_ch1'] |
||
984 | then the output might look like |
||
985 | {'a_ch2': True, 'd_ch2': False, 'a_ch1': False, 'd_ch5': True, 'd_ch1': False} |
||
986 | |||
987 | If no parameters are passed to this method all channels will be asked |
||
988 | for their setting. |
||
989 | """ |
||
990 | # If you want to check the input use the constraints: |
||
991 | # constraints = self.get_constraints() |
||
992 | |||
993 | analog_channels = self._get_all_analog_channels() |
||
994 | |||
995 | active_ch = dict() |
||
996 | for ch_num, a_ch in enumerate(analog_channels, 1): |
||
997 | # check what analog channels are active |
||
998 | active_ch[a_ch] = bool(int(self.query('OUTPUT{0:d}:STATE?'.format(ch_num)))) |
||
999 | # check how many markers are active on each channel, i.e. the DAC resolution |
||
1000 | if active_ch[a_ch]: |
||
1001 | digital_mrk = 10 - int(self.query('SOUR{0:d}:DAC:RES?'.format(ch_num))) |
||
1002 | View Code Duplication | if digital_mrk == 2: |
|
1003 | active_ch['d_ch{0:d}'.format(ch_num * 2)] = True |
||
1004 | active_ch['d_ch{0:d}'.format(ch_num * 2 - 1)] = True |
||
1005 | elif digital_mrk == 1: |
||
1006 | active_ch['d_ch{0:d}'.format(ch_num * 2)] = False |
||
1007 | active_ch['d_ch{0:d}'.format(ch_num * 2 - 1)] = True |
||
1008 | else: |
||
1009 | active_ch['d_ch{0:d}'.format(ch_num * 2)] = False |
||
1010 | active_ch['d_ch{0:d}'.format(ch_num * 2 - 1)] = False |
||
1011 | else: |
||
1012 | active_ch['d_ch{0:d}'.format(ch_num * 2)] = False |
||
1013 | active_ch['d_ch{0:d}'.format(ch_num * 2 - 1)] = False |
||
1014 | |||
1015 | # return either all channel information or just the one asked for. |
||
1016 | if ch is not None: |
||
1017 | chnl_to_delete = [chnl for chnl in active_ch if chnl not in ch] |
||
1018 | for chnl in chnl_to_delete: |
||
1019 | del active_ch[chnl] |
||
1020 | return active_ch |
||
1021 | |||
1022 | def set_active_channels(self, ch=None): |
||
1023 | """ Set the active channels for the pulse generator hardware. |
||
1024 | |||
1025 | @param dict ch: dictionary with keys being the analog or digital |
||
1026 | string generic names for the channels with items being |
||
1027 | a boolean value.current_loaded_asset |
||
1028 | |||
1029 | @return dict: with the actual set values for active channels for analog |
||
1030 | and digital values. |
||
1031 | |||
1032 | If nothing is passed then the command will return an empty dict. |
||
1033 | |||
1034 | Note: After setting the active channels of the device, retrieve them |
||
1035 | again for obtaining the actual set value(s) and use that |
||
1036 | information for further processing. |
||
1037 | |||
1038 | Example for possible input: |
||
1039 | ch={'a_ch2': True, 'd_ch1': False, 'd_ch3': True, 'd_ch4': True} |
||
1040 | to activate analog channel 2 digital channel 3 and 4 and to deactivate |
||
1041 | digital channel 1. |
||
1042 | |||
1043 | The hardware itself has to handle, whether separate channel activation |
||
1044 | is possible. |
||
1045 | """ |
||
1046 | current_channel_state = self.get_active_channels() |
||
1047 | |||
1048 | if ch is None: |
||
1049 | return current_channel_state |
||
1050 | |||
1051 | if not set(current_channel_state).issuperset(ch): |
||
1052 | self.log.error('Trying to (de)activate channels that are not present in AWG70k.\n' |
||
1053 | 'Setting of channel activation aborted.') |
||
1054 | return current_channel_state |
||
1055 | |||
1056 | # Determine new channel activation states |
||
1057 | new_channels_state = current_channel_state.copy() |
||
1058 | for chnl in ch: |
||
1059 | new_channels_state[chnl] = ch[chnl] |
||
1060 | |||
1061 | # check if the channels to set are part of the activation_config constraints |
||
1062 | constraints = self.get_constraints() |
||
1063 | new_active_channels = {chnl for chnl in new_channels_state if new_channels_state[chnl]} |
||
1064 | if new_active_channels not in constraints.activation_config.values(): |
||
1065 | self.log.error('activation_config to set ({0}) is not allowed according to constraints.' |
||
1066 | ''.format(new_active_channels)) |
||
1067 | return current_channel_state |
||
1068 | |||
1069 | # get lists of all analog channels |
||
1070 | analog_channels = self._get_all_analog_channels() |
||
1071 | |||
1072 | # calculate dac resolution for each analog channel and set it in hardware. |
||
1073 | # Also (de)activate the analog channels accordingly |
||
1074 | max_res = constraints.dac_resolution['max'] |
||
1075 | for a_ch in analog_channels: |
||
1076 | ach_num = int(a_ch.rsplit('_ch', 1)[1]) |
||
1077 | # determine number of markers for current a_ch |
||
1078 | if new_channels_state['d_ch{0:d}'.format(2 * ach_num - 1)]: |
||
1079 | marker_num = 2 if new_channels_state['d_ch{0:d}'.format(2 * ach_num)] else 1 |
||
1080 | else: |
||
1081 | marker_num = 0 |
||
1082 | # set DAC resolution for this channel |
||
1083 | dac_res = max_res - marker_num |
||
1084 | self.write('SOUR{0:d}:DAC:RES {1:d}'.format(ach_num, dac_res)) |
||
1085 | # (de)activate the analog channel |
||
1086 | if new_channels_state[a_ch]: |
||
1087 | self.write('OUTPUT{0:d}:STATE ON'.format(ach_num)) |
||
1088 | else: |
||
1089 | self.write('OUTPUT{0:d}:STATE OFF'.format(ach_num)) |
||
1090 | |||
1091 | return self.get_active_channels() |
||
1092 | |||
1093 | def get_interleave(self): |
||
1094 | """ Check whether Interleave is ON or OFF in AWG. |
||
1095 | |||
1096 | @return bool: True: ON, False: OFF |
||
1097 | |||
1098 | Unused for pulse generator hardware other than an AWG. |
||
1099 | """ |
||
1100 | return False |
||
1101 | |||
1102 | def set_interleave(self, state=False): |
||
1103 | """ Turns the interleave of an AWG on or off. |
||
1104 | |||
1105 | @param bool state: The state the interleave should be set to |
||
1106 | (True: ON, False: OFF) |
||
1107 | |||
1108 | @return bool: actual interleave status (True: ON, False: OFF) |
||
1109 | |||
1110 | Note: After setting the interleave of the device, retrieve the |
||
1111 | interleave again and use that information for further processing. |
||
1112 | |||
1113 | Unused for pulse generator hardware other than an AWG. |
||
1114 | """ |
||
1115 | if state: |
||
1116 | self.log.warning('Interleave mode not available for the AWG 70000 Series!\n' |
||
1117 | 'Method call will be ignored.') |
||
1118 | return False |
||
1119 | |||
1120 | def has_sequence_mode(self): |
||
1121 | """ Asks the pulse generator whether sequence mode exists. |
||
1122 | |||
1123 | @return: bool, True for yes, False for no. |
||
1124 | """ |
||
1125 | options = self.query('*OPT?').split(',') |
||
1126 | return '03' in options |
||
1127 | |||
1128 | def reset(self): |
||
1129 | """Reset the device. |
||
1130 | |||
1131 | @return int: error code (0:OK, -1:error) |
||
1132 | """ |
||
1133 | self.write('*RST') |
||
1134 | self.write('*WAI') |
||
1135 | return 0 |
||
1136 | |||
1137 | def query(self, question): |
||
1138 | """ Asks the device a 'question' and receive and return an answer from it. |
||
1139 | |||
1140 | @param string question: string containing the command |
||
1141 | |||
1142 | @return string: the answer of the device to the 'question' in a string |
||
1143 | """ |
||
1144 | return self.awg.query(question).strip().rstrip('\n').rstrip().strip('"') |
||
1145 | |||
1146 | def write(self, command): |
||
1147 | """ Sends a command string to the device. |
||
1148 | |||
1149 | @param string command: string containing the command |
||
1150 | |||
1151 | @return int: error code (0:OK, -1:error) |
||
1152 | """ |
||
1153 | bytes_written, enum_status_code = self.awg.write(command) |
||
1154 | return int(enum_status_code) |
||
1155 | |||
1156 | def new_sequence(self, name, steps): |
||
1157 | """ |
||
1158 | Generate a new sequence 'name' having 'steps' number of steps with immediate (async.) jump |
||
1159 | timing. |
||
1160 | |||
1161 | @param str name: Name of the sequence which should be generated |
||
1162 | @param int steps: Number of steps |
||
1163 | |||
1164 | @return int: error code |
||
1165 | """ |
||
1166 | if not self.has_sequence_mode(): |
||
1167 | self.log.error('Sequence generation in AWG not possible. ' |
||
1168 | 'Sequencer option not installed.') |
||
1169 | return -1 |
||
1170 | |||
1171 | if name in self.get_sequence_names(): |
||
1172 | self.delete_sequence(name) |
||
1173 | self.write('SLIS:SEQ:NEW "{0}", {1:d}'.format(name, steps)) |
||
1174 | self.write('SLIS:SEQ:EVEN:JTIM "{0}", IMM'.format(name)) |
||
1175 | return 0 |
||
1176 | |||
1177 | def sequence_set_waveform(self, sequence_name, waveform_name, step, track): |
||
1178 | """ |
||
1179 | Set the waveform 'waveform_name' to position 'step' in the sequence 'sequence_name'. |
||
1180 | |||
1181 | @param str sequence_name: Name of the sequence which should be editted |
||
1182 | @param str waveform_name: Name of the waveform which should be added |
||
1183 | @param int step: Position of the added waveform |
||
1184 | @param int track: track which should be editted |
||
1185 | |||
1186 | @return int: error code |
||
1187 | """ |
||
1188 | if not self.has_sequence_mode(): |
||
1189 | self.log.error('Direct sequence generation in AWG not possible. ' |
||
1190 | 'Sequencer option not installed.') |
||
1191 | return -1 |
||
1192 | |||
1193 | self.write('SLIS:SEQ:STEP{0:d}:TASS{1:d}:WAV "{2}", "{3}"'.format(step, |
||
1194 | track, |
||
1195 | sequence_name, |
||
1196 | waveform_name)) |
||
1197 | return 0 |
||
1198 | |||
1199 | def sequence_set_repetitions(self, sequence_name, step, repeat=1): |
||
1200 | """ |
||
1201 | Set the repetition counter of sequence "sequence_name" at step "step" to "repeat". |
||
1202 | A repeat value of -1 denotes infinite repetitions; 0 means the step is played once. |
||
1203 | |||
1204 | @param str sequence_name: Name of the sequence to be edited |
||
1205 | @param int step: Sequence step to be edited |
||
1206 | @param int repeat: number of repetitions. (-1: infinite, 0: once, 1: twice, ...) |
||
1207 | |||
1208 | @return int: error code |
||
1209 | """ |
||
1210 | if not self.has_sequence_mode(): |
||
1211 | self.log.error('Direct sequence generation in AWG not possible. ' |
||
1212 | 'Sequencer option not installed.') |
||
1213 | return -1 |
||
1214 | repeat = 'INF' if repeat < 0 else str(int(repeat)) |
||
1215 | self.write('SLIS:SEQ:STEP{0:d}:RCO "{1}", {2}'.format(step, sequence_name, repeat)) |
||
1216 | return 0 |
||
1217 | |||
1218 | def sequence_set_goto(self, sequence_name, step, goto=-1): |
||
1219 | """ |
||
1220 | |||
1221 | @param str sequence_name: |
||
1222 | @param int step: |
||
1223 | @param int goto: |
||
1224 | |||
1225 | @return int: error code |
||
1226 | """ |
||
1227 | if not self.has_sequence_mode(): |
||
1228 | self.log.error('Direct sequence generation in AWG not possible. ' |
||
1229 | 'Sequencer option not installed.') |
||
1230 | return -1 |
||
1231 | |||
1232 | goto = str(int(goto)) if seq_params['go_to'] > 0 else 'NEXT' |
||
1233 | self.write('SLIS:SEQ:STEP{0:d}:GOTO "{1}", {2}'.format(step, sequence_name, goto)) |
||
1234 | return 0 |
||
1235 | |||
1236 | def sequence_set_event_jump(self, sequence_name, step, trigger='OFF', jumpto=0): |
||
1237 | """ |
||
1238 | Set the event trigger input of the specified sequence step and the jump_to destination. |
||
1239 | |||
1240 | @param str sequence_name: Name of the sequence to be edited |
||
1241 | @param int step: Sequence step to be edited |
||
1242 | @param str trigger: Trigger string specifier. ('OFF', 'A', 'B' or 'INT') |
||
1243 | @param int jumpto: The sequence step to jump to. 0 or -1 is interpreted as next step |
||
1244 | |||
1245 | @return int: error code |
||
1246 | """ |
||
1247 | if not self.has_sequence_mode(): |
||
1248 | self.log.error('Direct sequence generation in AWG not possible. ' |
||
1249 | 'Sequencer option not installed.') |
||
1250 | return -1 |
||
1251 | |||
1252 | trigger = self.__event_triggers.get(trigger) |
||
1253 | if trigger is None: |
||
1254 | self.log.error('Invalid trigger specifier "{0}".\n' |
||
1255 | 'Please choose one of: "OFF", "A", "B", "INT"') |
||
1256 | return -1 |
||
1257 | |||
1258 | self.write('SLIS:SEQ:STEP{0:d}:EJIN "{1}", {2}'.format(step, sequence_name, trigger)) |
||
1259 | # Set event_jump_to if event trigger is enabled |
||
1260 | if trigger != 'OFF': |
||
1261 | jumpto = 'NEXT' if jumpto <= 0 else str(int(jumpto)) |
||
1262 | self.write('SLIS:SEQ:STEP{0:d}:EJUM "{1}", {2}'.format(step, sequence_name, jumpto)) |
||
1263 | return 0 |
||
1264 | |||
1265 | def sequence_set_wait_trigger(self, sequence_name, step, trigger='OFF'): |
||
1266 | """ |
||
1267 | Make a certain sequence step wait for a trigger to start playing. |
||
1268 | |||
1269 | @param str sequence_name: Name of the sequence to be edited |
||
1270 | @param int step: Sequence step to be edited |
||
1271 | @param str trigger: Trigger string specifier. ('OFF', 'A', 'B' or 'INT') |
||
1272 | |||
1273 | @return int: error code |
||
1274 | """ |
||
1275 | if not self.has_sequence_mode(): |
||
1276 | self.log.error('Direct sequence generation in AWG not possible. ' |
||
1277 | 'Sequencer option not installed.') |
||
1278 | return -1 |
||
1279 | |||
1280 | trigger = self.__event_triggers.get(trigger) |
||
1281 | if trigger is None: |
||
1282 | self.log.error('Invalid trigger specifier "{0}".\n' |
||
1283 | 'Please choose one of: "OFF", "A", "B", "INT"') |
||
1284 | return -1 |
||
1285 | |||
1286 | self.write('SLIS:SEQ:STEP{0:d}:WINP "{1}", {2}'.format(step, sequence_name, trigger)) |
||
1287 | return 0 |
||
1288 | |||
1289 | def sequence_set_flags(self, sequence_name, step, flags=None, trigger=False): |
||
1290 | """ |
||
1291 | Set the flags in "flags" to HIGH (trigger=False) during the sequence step or let the flags |
||
1292 | send out a fixed duration trigger pulse (trigger=True). All other flags are set to LOW. |
||
1293 | |||
1294 | @param str sequence_name: Name of the sequence to be edited |
||
1295 | @param int step: Sequence step to be edited |
||
1296 | @param list flags: List of flag specifiers to be active during this sequence step |
||
1297 | @param bool trigger: Whether the flag should be HIGH during the step (False) or send out a |
||
1298 | fixed length trigger pulse when starting to play the step (True). |
||
1299 | |||
1300 | @return int: error code |
||
1301 | """ |
||
1302 | if not self.has_sequence_mode(): |
||
1303 | self.log.error('Direct sequence generation in AWG not possible. ' |
||
1304 | 'Sequencer option not installed.') |
||
1305 | return -1 |
||
1306 | |||
1307 | for flag in ('A', 'B', 'C', 'D'): |
||
1308 | if flag in flags: |
||
1309 | state = 'PULS' if trigger else 'HIGH' |
||
1310 | else: |
||
1311 | state = 'LOW' |
||
1312 | |||
1313 | self.write('SLIS:SEQ:STEP{0:d}:TFL1:{2}FL "{3}",{4}'.format(step, |
||
1314 | flag, |
||
1315 | sequence_name, |
||
1316 | state)) |
||
1317 | return 0 |
||
1318 | |||
1319 | def make_sequence_continuous(self, sequencename): |
||
1320 | """ |
||
1321 | Usually after a run of a sequence the output stops. Many times it is desired that the full |
||
1322 | sequence is repeated many times. This is achieved here by setting the 'jump to' value of |
||
1323 | the last element to 'First' |
||
1324 | |||
1325 | @param sequencename: Name of the sequence which should be made continous |
||
1326 | |||
1327 | @return int last_step: The step number which 'jump to' has to be set to 'First' |
||
1328 | """ |
||
1329 | if not self.has_sequence_mode(): |
||
1330 | self.log.error('Direct sequence generation in AWG not possible. ' |
||
1331 | 'Sequencer option not installed.') |
||
1332 | return -1 |
||
1333 | |||
1334 | last_step = int(self.query('SLIS:SEQ:LENG? "{0}"'.format(sequencename))) |
||
1335 | err = self.sequence_set_goto(sequencename, last_step, 1) |
||
1336 | if err < 0: |
||
1337 | last_step = err |
||
1338 | return last_step |
||
1339 | |||
1340 | def force_jump_sequence(self, final_step, channel=1): |
||
1341 | """ |
||
1342 | This command forces the sequencer to jump to the specified step per channel. A |
||
1343 | force jump does not require a trigger event to execute the jump. |
||
1344 | For two channel instruments, if both channels are playing the same sequence, then |
||
1345 | both channels jump simultaneously to the same sequence step. |
||
1346 | |||
1347 | @param channel: determines the channel number. If omitted, interpreted as 1 |
||
1348 | @param final_step: Step to jump to. Possible options are |
||
1349 | FIRSt - This enables the sequencer to jump to first step in the sequence. |
||
1350 | CURRent - This enables the sequencer to jump to the current sequence step, |
||
1351 | essentially starting the current step over. |
||
1352 | LAST - This enables the sequencer to jump to the last step in the sequence. |
||
1353 | END - This enables the sequencer to go to the end and play 0 V until play is |
||
1354 | stopped. |
||
1355 | <NR1> - This enables the sequencer to jump to the specified step, where the |
||
1356 | value is between 1 and 16383. |
||
1357 | |||
1358 | """ |
||
1359 | self.write('SOURCE{0:d}:JUMP:FORCE {1}'.format(channel, final_step)) |
||
1360 | return |
||
1361 | |||
1362 | def _get_all_channels(self): |
||
1363 | """ |
||
1364 | Helper method to return a sorted list of all technically available channel descriptors |
||
1365 | (e.g. ['a_ch1', 'a_ch2', 'd_ch1', 'd_ch2']) |
||
1366 | |||
1367 | @return list: Sorted list of channels |
||
1368 | """ |
||
1369 | configs = self.get_constraints().activation_config |
||
1370 | if 'all' in configs: |
||
1371 | largest_config = configs['all'] |
||
1372 | else: |
||
1373 | largest_config = list(configs.values())[0] |
||
1374 | for config in configs.values(): |
||
1375 | if len(largest_config) < len(config): |
||
1376 | largest_config = config |
||
1377 | return sorted(largest_config) |
||
1378 | |||
1379 | def _get_all_analog_channels(self): |
||
1380 | """ |
||
1381 | Helper method to return a sorted list of all technically available analog channel |
||
1382 | descriptors (e.g. ['a_ch1', 'a_ch2']) |
||
1383 | |||
1384 | @return list: Sorted list of analog channels |
||
1385 | """ |
||
1386 | return [chnl for chnl in self._get_all_channels() if chnl.startswith('a')] |
||
1387 | |||
1388 | def _get_all_digital_channels(self): |
||
1389 | """ |
||
1390 | Helper method to return a sorted list of all technically available digital channel |
||
1391 | descriptors (e.g. ['d_ch1', 'd_ch2']) |
||
1392 | |||
1393 | @return list: Sorted list of digital channels |
||
1394 | """ |
||
1395 | return [chnl for chnl in self._get_all_channels() if chnl.startswith('d')] |
||
1396 | |||
1397 | def _is_output_on(self): |
||
1398 | """ |
||
1399 | Aks the AWG if the output is enabled, i.e. if the AWG is running |
||
1400 | |||
1401 | @return: bool, (True: output on, False: output off) |
||
1402 | """ |
||
1403 | return bool(int(self.query('AWGC:RST?'))) |
||
1404 | |||
1405 | View Code Duplication | def _get_filenames_on_device(self): |
|
1406 | """ |
||
1407 | |||
1408 | @return list: filenames found in <ftproot>\\waves |
||
1409 | """ |
||
1410 | filename_list = list() |
||
1411 | with FTP(self._ip_address) as ftp: |
||
1412 | ftp.login(user=self._username, passwd=self._password) |
||
1413 | ftp.cwd(self.ftp_working_dir) |
||
1414 | # get only the files from the dir and skip possible directories |
||
1415 | log = list() |
||
1416 | ftp.retrlines('LIST', callback=log.append) |
||
1417 | for line in log: |
||
1418 | if '<DIR>' not in line: |
||
1419 | # that is how a potential line is looking like: |
||
1420 | # '05-10-16 05:22PM 292 SSR aom adjusted.seq' |
||
1421 | # The first part consists of the date information. Remove this information and |
||
1422 | # separate the first number, which indicates the size of the file. This is |
||
1423 | # necessary if the filename contains whitespaces. |
||
1424 | size_filename = line[18:].lstrip() |
||
1425 | # split after the first appearing whitespace and take the rest as filename. |
||
1426 | # Remove for safety all trailing and leading whitespaces: |
||
1427 | filename = size_filename.split(' ', 1)[1].strip() |
||
1428 | filename_list.append(filename) |
||
1429 | return filename_list |
||
1430 | |||
1431 | def _delete_file(self, filename): |
||
1432 | """ |
||
1433 | |||
1434 | @param str filename: |
||
1435 | """ |
||
1436 | if filename in self._get_filenames_on_device(): |
||
1437 | with FTP(self._ip_address) as ftp: |
||
1438 | ftp.login(user=self._username, passwd=self._password) |
||
1439 | ftp.cwd(self.ftp_working_dir) |
||
1440 | ftp.delete(filename) |
||
1441 | return |
||
1442 | |||
1443 | View Code Duplication | def _send_file(self, filename): |
|
1444 | """ |
||
1445 | |||
1446 | @param filename: |
||
1447 | @return: |
||
1448 | """ |
||
1449 | # check input |
||
1450 | if not filename: |
||
1451 | self.log.error('No filename provided for file upload to awg!\nCommand will be ignored.') |
||
1452 | return -1 |
||
1453 | |||
1454 | filepath = os.path.join(self._tmp_work_dir, filename) |
||
1455 | if not os.path.isfile(filepath): |
||
1456 | self.log.error('No file "{0}" found in "{1}". Unable to upload!' |
||
1457 | ''.format(filename, self._tmp_work_dir)) |
||
1458 | return -1 |
||
1459 | |||
1460 | # Delete old file on AWG by the same filename |
||
1461 | self._delete_file(filename) |
||
1462 | |||
1463 | # Transfer file |
||
1464 | with FTP(self._ip_address) as ftp: |
||
1465 | ftp.login(user=self._username, passwd=self._password) |
||
1466 | ftp.cwd(self.ftp_working_dir) |
||
1467 | with open(filepath, 'rb') as file: |
||
1468 | ftp.storbinary('STOR ' + filename, file) |
||
1469 | return 0 |
||
1470 | |||
1471 | def _write_wfmx(self, filename, analog_samples, marker_bytes, is_first_chunk, is_last_chunk, |
||
1472 | total_number_of_samples): |
||
1473 | """ |
||
1474 | Appends a sampled chunk of a whole waveform to a wfmx-file. Create the file |
||
1475 | if it is the first chunk. |
||
1476 | If both flags (is_first_chunk, is_last_chunk) are set to TRUE it means |
||
1477 | that the whole ensemble is written as a whole in one big chunk. |
||
1478 | |||
1479 | @param name: string, represents the name of the sampled ensemble |
||
1480 | @param analog_samples: dict containing float32 numpy ndarrays, contains the |
||
1481 | samples for the analog channels that |
||
1482 | are to be written by this function call. |
||
1483 | @param marker_bytes: np.ndarray containing bool numpy ndarrays, contains the samples |
||
1484 | for the digital channels that |
||
1485 | are to be written by this function call. |
||
1486 | @param total_number_of_samples: int, The total number of samples in the |
||
1487 | entire waveform. Has to be known in advance. |
||
1488 | @param is_first_chunk: bool, indicates if the current chunk is the |
||
1489 | first write to this file. |
||
1490 | @param is_last_chunk: bool, indicates if the current chunk is the last |
||
1491 | write to this file. |
||
1492 | |||
1493 | @return list: the list contains the string names of the created files for the passed |
||
1494 | presampled arrays |
||
1495 | """ |
||
1496 | # The memory overhead of the tmp file write/read process in bytes. Only used if wfmx file is |
||
1497 | # written in chunks in order to avoid excessive memory usage. |
||
1498 | tmp_bytes_overhead = 16777216 # 16 MB |
||
1499 | |||
1500 | if not filename.endswith('.wfmx'): |
||
1501 | filename += '.wfmx' |
||
1502 | wfmx_path = os.path.join(self._tmp_work_dir, filename) |
||
1503 | tmp_path = os.path.join(self._tmp_work_dir, 'digital_tmp.bin') |
||
1504 | |||
1505 | # if it is the first chunk, create the .WFMX file with header. |
||
1506 | if is_first_chunk: |
||
1507 | # create header |
||
1508 | header = self._create_xml_header(total_number_of_samples, marker_bytes is not None) |
||
1509 | # write header |
||
1510 | with open(wfmx_path, 'wb') as wfmxfile: |
||
1511 | wfmxfile.write(header) |
||
1512 | # Check if a tmp digital samples file is present and delete it if necessary. |
||
1513 | if os.path.isfile(tmp_path): |
||
1514 | os.remove(tmp_path) |
||
1515 | |||
1516 | # append analog samples to the .WFMX file. |
||
1517 | # Write digital samples in temporary file if not the entire samples are passed at once. |
||
1518 | with open(wfmx_path, 'ab') as wfmxfile: |
||
1519 | # append analog samples in binary format. One sample is 4 bytes (np.float32). |
||
1520 | wfmxfile.write(analog_samples) |
||
1521 | |||
1522 | # Write digital samples to tmp file if chunkwise writing is used and it's not the last chunk |
||
1523 | if not is_last_chunk and marker_bytes is not None: |
||
1524 | with open(tmp_path, 'ab') as tmp_file: |
||
1525 | tmp_file.write(marker_bytes) |
||
1526 | |||
1527 | # If this is the last chunk, write digital samples from tmp file to wfmx file (if present) |
||
1528 | # and also append the currently passed digital samples to wfmx file. |
||
1529 | # Read from tmp file in chunks of tmp_bytes_overhead in order to avoid too much memory |
||
1530 | # overhead. |
||
1531 | if is_last_chunk and marker_bytes is not None: |
||
1532 | with open(wfmx_path, 'ab') as wfmxfile: |
||
1533 | # Copy over digital samples from tmp file. Delete tmp file afterwards. |
||
1534 | if os.path.isfile(tmp_path): |
||
1535 | with open(tmp_path, 'rb') as tmp_file: |
||
1536 | while True: |
||
1537 | tmp = tmp_file.read(tmp_bytes_overhead) |
||
1538 | if not tmp: |
||
1539 | break |
||
1540 | wfmxfile.write(tmp) |
||
1541 | os.remove(tmp_path) |
||
1542 | # Append current digital samples array to wfmx file |
||
1543 | wfmxfile.write(marker_bytes) |
||
1544 | return |
||
1545 | |||
1546 | def _create_xml_header(self, number_of_samples, markers_active): |
||
1547 | """ |
||
1548 | This function creates an xml file containing the header for the wfmx-file format using |
||
1549 | etree. |
||
1550 | """ |
||
1551 | hdr = ET.Element('DataFile', offset='XXXXXXXXX', version='0.1') |
||
1552 | dsc = ET.SubElement(hdr, 'DataSetsCollection', xmlns='http://www.tektronix.com') |
||
1553 | datasets = ET.SubElement(dsc, 'DataSets', version='1', xmlns='http://www.tektronix.com') |
||
1554 | datadesc = ET.SubElement(datasets, 'DataDescription') |
||
1555 | sub_elem = ET.SubElement(datadesc, 'NumberSamples') |
||
1556 | sub_elem.text = str(int(number_of_samples)) |
||
1557 | sub_elem = ET.SubElement(datadesc, 'SamplesType') |
||
1558 | sub_elem.text = 'AWGWaveformSample' |
||
1559 | sub_elem = ET.SubElement(datadesc, 'MarkersIncluded') |
||
1560 | sub_elem.text = 'true' if markers_active else 'false' |
||
1561 | sub_elem = ET.SubElement(datadesc, 'NumberFormat') |
||
1562 | sub_elem.text = 'Single' |
||
1563 | sub_elem = ET.SubElement(datadesc, 'Endian') |
||
1564 | sub_elem.text = 'Little' |
||
1565 | sub_elem = ET.SubElement(datadesc, 'Timestamp') |
||
1566 | sub_elem.text = '2014-10-28T12:59:52.9004865-07:00' |
||
1567 | prodspec = ET.SubElement(datasets, 'ProductSpecific', name='') |
||
1568 | sub_elem = ET.SubElement(prodspec, 'ReccSamplingRate', units='Hz') |
||
1569 | sub_elem.text = str(self.get_sample_rate()) |
||
1570 | sub_elem = ET.SubElement(prodspec, 'ReccAmplitude', units='Volts') |
||
1571 | sub_elem.text = '0.5' |
||
1572 | sub_elem = ET.SubElement(prodspec, 'ReccOffset', units='Volts') |
||
1573 | sub_elem.text = '0' |
||
1574 | sub_elem = ET.SubElement(prodspec, 'SerialNumber') |
||
1575 | sub_elem = ET.SubElement(prodspec, 'SoftwareVersion') |
||
1576 | sub_elem.text = '4.0.0075' |
||
1577 | sub_elem = ET.SubElement(prodspec, 'UserNotes') |
||
1578 | sub_elem = ET.SubElement(prodspec, 'OriginalBitDepth') |
||
1579 | sub_elem.text = 'Floating' |
||
1580 | sub_elem = ET.SubElement(prodspec, 'Thumbnail') |
||
1581 | sub_elem = ET.SubElement(prodspec, 'CreatorProperties', name='Basic Waveform') |
||
1582 | sub_elem = ET.SubElement(hdr, 'Setup') |
||
1583 | |||
1584 | xml_header = ET.tostring(hdr, encoding='unicode') |
||
1585 | xml_header = xml_header.replace('><', '>\r\n<') |
||
1586 | |||
1587 | # Calculates the length of the header and replace placeholder with actual number |
||
1588 | xml_header = xml_header.replace('XXXXXXXXX', str(len(xml_header)).zfill(9)) |
||
1589 | return xml_header |
||
1590 |