Total Complexity | 174 |
Total Lines | 1366 |
Duplicated Lines | 19.77 % |
Changes | 1 | ||
Bugs | 0 | Features | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like AWG7122C often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # -*- coding: utf-8 -*- |
||
37 | class AWG7122C(Base, PulserInterface): |
||
38 | """ |
||
39 | Unstable and under construction, Jochen Scheuer |
||
40 | ... but about to be become awesome, Nikolas Tomek |
||
41 | """ |
||
42 | |||
43 | _modclass = 'awg7122c' |
||
44 | _modtype = 'hardware' |
||
45 | |||
46 | # config options |
||
47 | _tmp_work_dir = ConfigOption(name='tmp_work_dir', |
||
48 | default=os.path.join(get_home_dir(), 'pulsed_files'), |
||
49 | missing='warn') |
||
50 | _visa_address = ConfigOption(name='awg_visa_address', missing='error') |
||
51 | _ip_address = ConfigOption(name='awg_ip_address', missing='error') |
||
52 | _ftp_dir = ConfigOption(name='ftp_root_dir', default='C:\\inetpub\\ftproot', missing='warn') |
||
53 | _username = ConfigOption(name='ftp_login', default='anonymous', missing='warn') |
||
54 | _password = ConfigOption(name='ftp_passwd', default='anonymous@', missing='warn') |
||
55 | _default_sample_rate = ConfigOption(name='default_sample_rate', default=None, missing='warn') |
||
56 | _visa_timeout = ConfigOption(name='timeout', default=30, missing='nothing') |
||
57 | |||
58 | def __init__(self, config, **kwargs): |
||
59 | super().__init__(config=config, **kwargs) |
||
60 | |||
61 | # Get an instance of the visa resource manager |
||
62 | self._rm = visa.ResourceManager() |
||
63 | |||
64 | self.awg = None # This variable will hold a reference to the awg visa resource |
||
65 | |||
66 | self.ftp_working_dir = 'waves' # subfolder of FTP root dir on AWG disk to work in |
||
67 | |||
68 | self.installed_options = list() # will hold the encoded installed options available on awg |
||
69 | self.__loaded_sequence = '' # Helper variable since a loaded sequence can not be queried :( |
||
70 | self._marker_byte_dict = {0: b'\x00', 1: b'\x01', 2: b'\x02', 3: b'\x03'} |
||
71 | |||
72 | def on_activate(self): |
||
73 | """ Initialisation performed during activation of the module. |
||
74 | """ |
||
75 | # Create work directory if necessary |
||
76 | if not os.path.exists(self._tmp_work_dir): |
||
77 | os.makedirs(os.path.abspath(self._tmp_work_dir)) |
||
78 | |||
79 | # connect to awg using PyVISA |
||
80 | if self._visa_address not in self._rm.list_resources(): |
||
81 | self.awg = None |
||
82 | self.log.error( |
||
83 | 'VISA address "{0}" not found by the pyVISA resource manager.\nCheck ' |
||
84 | 'the connection by using for example "Agilent Connection Expert".' |
||
85 | ''.format(self._visa_address)) |
||
86 | else: |
||
87 | self.awg = self._rm.open_resource(self._visa_address) |
||
88 | # set timeout by default to 30 sec |
||
89 | self.awg.timeout = self._visa_timeout * 1000 |
||
90 | |||
91 | # try connecting to AWG using FTP protocol |
||
92 | with FTP(self._ip_address) as ftp: |
||
93 | ftp.login(user=self._username, passwd=self._password) |
||
94 | ftp.cwd(self.ftp_working_dir) |
||
95 | |||
96 | # Options of AWG7000 series: |
||
97 | # Option 01: Memory expansion to 64,8 MSamples (Million points) |
||
98 | # Option 06: Interleave and extended analog output bandwidth |
||
99 | # Option 08: Fast sequence switching |
||
100 | # Option 09: Subsequence and Table Jump |
||
101 | self.installed_options = self.query('*OPT?').split(',') |
||
102 | # TODO: inclulde proper routine to check and change zeroing functionality |
||
103 | |||
104 | # Set current directory on AWG |
||
105 | self.write('MMEM:CDIR "{0}"'.format(os.path.join(self._ftp_dir, self.ftp_working_dir))) |
||
106 | |||
107 | def on_deactivate(self): |
||
108 | """ Deinitialisation performed during deactivation of the module. |
||
109 | """ |
||
110 | # Closes the connection to the AWG |
||
111 | try: |
||
112 | self.awg.close() |
||
113 | except: |
||
114 | self.log.debug('Closing AWG connection using pyvisa failed.') |
||
115 | self.log.info('Closed connection to AWG') |
||
116 | return |
||
117 | |||
118 | # ========================================================================= |
||
119 | # Below all the Pulser Interface routines. |
||
120 | # ========================================================================= |
||
121 | |||
122 | def get_constraints(self): |
||
123 | """ |
||
124 | Retrieve the hardware constrains from the Pulsing device. |
||
125 | |||
126 | @return constraints object: object with pulser constraints as attributes. |
||
127 | |||
128 | Provides all the constraints (e.g. sample_rate, amplitude, total_length_bins, |
||
129 | channel_config, ...) related to the pulse generator hardware to the caller. |
||
130 | |||
131 | SEE PulserConstraints CLASS IN pulser_interface.py FOR AVAILABLE CONSTRAINTS!!! |
||
132 | |||
133 | If you are not sure about the meaning, look in other hardware files to get an impression. |
||
134 | If still additional constraints are needed, then they have to be added to the |
||
135 | PulserConstraints class. |
||
136 | |||
137 | Each scalar parameter is an ScalarConstraints object defined in cor.util.interfaces. |
||
138 | Essentially it contains min/max values as well as min step size, default value and unit of |
||
139 | the parameter. |
||
140 | |||
141 | PulserConstraints.activation_config differs, since it contain the channel |
||
142 | configuration/activation information of the form: |
||
143 | {<descriptor_str>: <channel_set>, |
||
144 | <descriptor_str>: <channel_set>, |
||
145 | ...} |
||
146 | |||
147 | If the constraints cannot be set in the pulsing hardware (e.g. because it might have no |
||
148 | sequence mode) just leave it out so that the default is used (only zeros). |
||
149 | """ |
||
150 | # TODO: Check values for AWG7122c |
||
151 | constraints = PulserConstraints() |
||
152 | |||
153 | if self.get_interleave(): |
||
154 | constraints.sample_rate.min = 12.0e9 |
||
155 | constraints.sample_rate.max = 24.0e9 |
||
156 | constraints.sample_rate.step = 5.0e2 |
||
157 | constraints.sample_rate.default = 24.0e9 |
||
158 | else: |
||
159 | constraints.sample_rate.min = 10.0e6 |
||
160 | constraints.sample_rate.max = 12.0e9 |
||
161 | constraints.sample_rate.step = 10.0e6 |
||
162 | constraints.sample_rate.default = 12.0e9 |
||
163 | |||
164 | constraints.a_ch_amplitude.max = 1.0 |
||
165 | constraints.a_ch_amplitude.step = 0.001 |
||
166 | constraints.a_ch_amplitude.default = 1.0 |
||
167 | if self._zeroing_enabled(): |
||
168 | constraints.a_ch_amplitude.min = 0.25 |
||
169 | else: |
||
170 | constraints.a_ch_amplitude.min = 0.5 |
||
171 | |||
172 | constraints.d_ch_low.min = -1.4 |
||
173 | constraints.d_ch_low.max = 0.9 |
||
174 | constraints.d_ch_low.step = 0.01 |
||
175 | constraints.d_ch_low.default = 0.0 |
||
176 | |||
177 | constraints.d_ch_high.min = -0.9 |
||
178 | constraints.d_ch_high.max = 1.4 |
||
179 | constraints.d_ch_high.step = 0.01 |
||
180 | constraints.d_ch_high.default = 1.4 |
||
181 | |||
182 | constraints.waveform_length.min = 1 |
||
183 | constraints.waveform_length.step = 1 |
||
184 | constraints.waveform_length.default = 80 |
||
185 | if '01' in self.installed_options: |
||
186 | constraints.waveform_length.max = 64800000 |
||
187 | else: |
||
188 | constraints.waveform_length.max = 32000000 |
||
189 | |||
190 | constraints.waveform_num.min = 1 |
||
191 | constraints.waveform_num.max = 32000 |
||
192 | constraints.waveform_num.step = 1 |
||
193 | constraints.waveform_num.default = 1 |
||
194 | |||
195 | constraints.sequence_num.min = 1 |
||
196 | constraints.sequence_num.max = 16000 |
||
197 | constraints.sequence_num.step = 1 |
||
198 | constraints.sequence_num.default = 1 |
||
199 | |||
200 | constraints.subsequence_num.min = 1 |
||
201 | constraints.subsequence_num.max = 8000 |
||
202 | constraints.subsequence_num.step = 1 |
||
203 | constraints.subsequence_num.default = 1 |
||
204 | |||
205 | # If sequencer mode is available then these should be specified |
||
206 | constraints.repetitions.min = 0 |
||
207 | constraints.repetitions.max = 65539 |
||
208 | constraints.repetitions.step = 1 |
||
209 | constraints.repetitions.default = 0 |
||
210 | |||
211 | # ToDo: Check how many external triggers this device has |
||
212 | constraints.event_triggers = ['A', 'B'] |
||
213 | constraints.flags = list() |
||
214 | |||
215 | constraints.sequence_steps.min = 0 |
||
216 | constraints.sequence_steps.max = 8000 |
||
217 | constraints.sequence_steps.step = 1 |
||
218 | constraints.sequence_steps.default = 0 |
||
219 | |||
220 | # the name a_ch<num> and d_ch<num> are generic names, which describe UNAMBIGUOUSLY the |
||
221 | # channels. Here all possible channel configurations are stated, where only the generic |
||
222 | # names should be used. The names for the different configurations can be customary chosen. |
||
223 | activation_config = OrderedDict() |
||
224 | activation_config['All'] = {'a_ch1', 'd_ch1', 'd_ch2', 'a_ch2', 'd_ch3', 'd_ch4'} |
||
225 | # Usage of channel 1 only: |
||
226 | activation_config['A1_M1_M2'] = {'a_ch1', 'd_ch1', 'd_ch2'} |
||
227 | # Usage of channel 2 only: |
||
228 | activation_config['A2_M3_M4'] = {'a_ch2', 'd_ch3', 'd_ch4'} |
||
229 | # Only both analog channels |
||
230 | activation_config['Two_Analog'] = {'a_ch1', 'a_ch2'} |
||
231 | # Usage of one analog channel without digital channel |
||
232 | activation_config['Analog1'] = {'a_ch1'} |
||
233 | # Usage of one analog channel without digital channel |
||
234 | activation_config['Analog2'] = {'a_ch2'} |
||
235 | constraints.activation_config = activation_config |
||
236 | return constraints |
||
237 | |||
238 | def pulser_on(self): |
||
239 | """ Switches the pulsing device on. |
||
240 | |||
241 | @return int: error code (0:OK, -1:error, higher number corresponds to |
||
242 | current status of the device. Check then the |
||
243 | class variable status_dic.) |
||
244 | """ |
||
245 | # do nothing if AWG is already running |
||
246 | if not self._is_output_on(): |
||
247 | self.write('AWGC:RUN') |
||
248 | # wait until the AWG is actually running |
||
249 | while not self._is_output_on(): |
||
250 | time.sleep(0.2) |
||
251 | return self.get_status() |
||
252 | |||
253 | def pulser_off(self): |
||
254 | """ Switches the pulsing device off. |
||
255 | |||
256 | @return int: error code (0:OK, -1:error, higher number corresponds to |
||
257 | current status of the device. Check then the |
||
258 | class variable status_dic.) |
||
259 | """ |
||
260 | # do nothing if AWG is already idle |
||
261 | if self._is_output_on(): |
||
262 | self.write('AWGC:STOP') |
||
263 | # wait until the AWG has actually stopped |
||
264 | while self._is_output_on(): |
||
265 | time.sleep(0.2) |
||
266 | return self.get_status() |
||
267 | |||
268 | View Code Duplication | def load_waveform(self, load_dict): |
|
|
|||
269 | """ Loads a waveform to the specified channel of the pulsing device. |
||
270 | For devices that have a workspace (i.e. AWG) this will load the waveform from the device |
||
271 | workspace into the channel. |
||
272 | For a device without mass memory this will make the waveform/pattern that has been |
||
273 | previously written with self.write_waveform ready to play. |
||
274 | |||
275 | @param load_dict: dict|list, a dictionary with keys being one of the available channel |
||
276 | index and values being the name of the already written |
||
277 | waveform to load into the channel. |
||
278 | Examples: {1: rabi_ch1, 2: rabi_ch2} or |
||
279 | {1: rabi_ch2, 2: rabi_ch1} |
||
280 | If just a list of waveform names if given, the channel |
||
281 | association will be invoked from the channel |
||
282 | suffix '_ch1', '_ch2' etc. |
||
283 | |||
284 | @return dict: Dictionary containing the actually loaded waveforms per channel. |
||
285 | """ |
||
286 | if isinstance(load_dict, list): |
||
287 | new_dict = dict() |
||
288 | for waveform in load_dict: |
||
289 | channel = int(waveform.rsplit('_ch', 1)[1]) |
||
290 | new_dict[channel] = waveform |
||
291 | load_dict = new_dict |
||
292 | |||
293 | # Get all active channels |
||
294 | chnl_activation = self.get_active_channels() |
||
295 | analog_channels = sorted( |
||
296 | chnl for chnl in chnl_activation if chnl.startswith('a') and chnl_activation[chnl]) |
||
297 | |||
298 | # Check if all channels to load to are active |
||
299 | channels_to_set = {'a_ch{0:d}'.format(chnl_num) for chnl_num in load_dict} |
||
300 | if not channels_to_set.issubset(analog_channels): |
||
301 | self.log.error('Unable to load all waveforms into channels.\n' |
||
302 | 'One or more channels to set are not active.') |
||
303 | return self.get_loaded_assets() |
||
304 | |||
305 | # Check if all waveforms to load are present on device memory |
||
306 | if not set(load_dict.values()).issubset(self.get_waveform_names()): |
||
307 | self.log.error('Unable to load waveforms into channels.\n' |
||
308 | 'One or more waveforms to load are missing on device memory.') |
||
309 | return self.get_loaded_assets() |
||
310 | |||
311 | # Load waveforms into channels |
||
312 | for chnl_num, waveform in load_dict.items(): |
||
313 | # load into channel |
||
314 | self.write('SOUR{0:d}:WAV "{1}"'.format(chnl_num, waveform)) |
||
315 | while self.query('SOUR{0:d}:WAV?'.format(chnl_num)) != waveform: |
||
316 | time.sleep(0.1) |
||
317 | |||
318 | self.__loaded_sequence = '' |
||
319 | return self.get_loaded_assets() |
||
320 | |||
321 | def load_sequence(self, sequence_name): |
||
322 | """ Loads a sequence to the channels of the device in order to be ready for playback. |
||
323 | For devices that have a workspace (i.e. AWG) this will load the sequence from the device |
||
324 | workspace into the channels. |
||
325 | For a device without mass memory this will make the waveform/pattern that has been |
||
326 | previously written with self.write_waveform ready to play. |
||
327 | |||
328 | @param sequence_name: dict|list, a dictionary with keys being one of the available channel |
||
329 | index and values being the name of the already written |
||
330 | waveform to load into the channel. |
||
331 | Examples: {1: rabi_ch1, 2: rabi_ch2} or |
||
332 | {1: rabi_ch2, 2: rabi_ch1} |
||
333 | If just a list of waveform names if given, the channel |
||
334 | association will be invoked from the channel |
||
335 | suffix '_ch1', '_ch2' etc. |
||
336 | |||
337 | @return dict: Dictionary containing the actually loaded waveforms per channel. |
||
338 | """ |
||
339 | if sequence_name not in self.get_sequence_names(): |
||
340 | self.log.error('Unable to load sequence.\n' |
||
341 | 'Sequence to load is missing on device memory.') |
||
342 | return self.get_loaded_assets() |
||
343 | |||
344 | # Load sequence |
||
345 | file_name = sequence_name + '{0}.seq'.format(sequence_name) |
||
346 | |||
347 | # self.tell('MMEMORY:IMPORT "{0}","{1}",SEQ \n'.format(asset_name , asset_name + '.seq')) |
||
348 | self.write('SOUR1:FUNC:USER "{0!s}"'.format(file_name)) |
||
349 | print(self.query('SOUR1:FUNC:USER?')) |
||
350 | # while self.query('SOUR1:FUNC:USER?') != sequence_name: |
||
351 | # time.sleep(0.2) |
||
352 | |||
353 | # set the AWG to the event jump mode: |
||
354 | self.write('AWGC:EVENT:JMODE EJUMP') |
||
355 | |||
356 | self.__loaded_sequence = sequence_name |
||
357 | return self.get_loaded_assets() |
||
358 | |||
359 | def get_loaded_assets(self): |
||
360 | """ |
||
361 | Retrieve the currently loaded asset names for each active channel of the device. |
||
362 | The returned dictionary will have the channel numbers as keys. |
||
363 | In case of loaded waveforms the dictionary values will be the waveform names. |
||
364 | In case of a loaded sequence the values will be the sequence name appended by a suffix |
||
365 | representing the track loaded to the respective channel (i.e. '<sequence_name>_1'). |
||
366 | |||
367 | @return (dict, str): Dictionary with keys being the channel number and values being the |
||
368 | respective asset loaded into the channel, |
||
369 | string describing the asset type ('waveform' or 'sequence') |
||
370 | """ |
||
371 | # Get all active channels |
||
372 | chnl_activation = self.get_active_channels() |
||
373 | channel_numbers = sorted(int(chnl.split('_ch')[1]) for chnl in chnl_activation if |
||
374 | chnl.startswith('a') and chnl_activation[chnl]) |
||
375 | |||
376 | # Get assets per channel |
||
377 | loaded_assets = dict() |
||
378 | current_type = None |
||
379 | for chnl_num in channel_numbers: |
||
380 | # Ask AWG for currently loaded waveform or sequence. The answer for a waveform will |
||
381 | # look like |
||
382 | # FIXME: What does an AWG7000 return with this query? |
||
383 | asset_name = self.query('SOUR1:FUNC:USER?') |
||
384 | |||
385 | return loaded_assets, current_type |
||
386 | |||
387 | def clear_all(self): |
||
388 | """ Clears all loaded waveforms from the pulse generators RAM/workspace. |
||
389 | |||
390 | @return int: error code (0:OK, -1:error) |
||
391 | """ |
||
392 | self.write('WLIS:WAV:DEL ALL') |
||
393 | self.__loaded_sequence = '' |
||
394 | return 0 |
||
395 | |||
396 | def get_status(self): |
||
397 | """ Retrieves the status of the pulsing hardware |
||
398 | |||
399 | @return (int, dict): inter value of the current status with the |
||
400 | corresponding dictionary containing status |
||
401 | description for all the possible status variables |
||
402 | of the pulse generator hardware |
||
403 | """ |
||
404 | status_dic = {-1: 'Failed Request or Communication', |
||
405 | 0: 'Device has stopped, but can receive commands', |
||
406 | 1: 'Device is active and running', |
||
407 | 2: 'Device is waiting for trigger.'} |
||
408 | current_status = -1 if self.awg is None else int(self.query('AWGC:RST?')) |
||
409 | return current_status, status_dic |
||
410 | |||
411 | def get_sample_rate(self): |
||
412 | """ Get the sample rate of the pulse generator hardware |
||
413 | |||
414 | @return float: The current sample rate of the device (in Hz) |
||
415 | |||
416 | Do not return a saved sample rate from an attribute, but instead retrieve the current |
||
417 | sample rate directly from the device. |
||
418 | """ |
||
419 | return float(self.query('SOUR1:FREQ?')) |
||
420 | |||
421 | def set_sample_rate(self, sample_rate): |
||
422 | """ Set the sample rate of the pulse generator hardware. |
||
423 | |||
424 | @param float sample_rate: The sampling rate to be set (in Hz) |
||
425 | |||
426 | @return float: the sample rate returned from the device (in Hz). |
||
427 | |||
428 | Note: After setting the sampling rate of the device, use the actually set return value for |
||
429 | further processing. |
||
430 | """ |
||
431 | self.write('SOUR1:FREQ {0:.4G}MHz\n'.format(sample_rate / 1e6)) |
||
432 | while int(self.query('*OPC?')) != 1: |
||
433 | time.sleep(0.1) |
||
434 | # Here we need to wait, because when the sampling rate is changed AWG is busy |
||
435 | # and therefore the ask in get_sample_rate will return an empty string. |
||
436 | time.sleep(1) |
||
437 | return self.get_sample_rate() |
||
438 | |||
439 | # def load_asset(self, asset_name, load_dict=None): |
||
440 | # """ Loads a sequence or waveform to the specified channel of the pulsing |
||
441 | # device. |
||
442 | # |
||
443 | # @param str asset_name: The name of the asset to be loaded |
||
444 | # |
||
445 | # @param dict load_dict: a dictionary with keys being one of the |
||
446 | # available channel numbers and items being the |
||
447 | # name of the already sampled |
||
448 | # waveform/sequence files. |
||
449 | # Examples: {1: rabi_Ch1, 2: rabi_Ch2} |
||
450 | # {1: rabi_Ch2, 2: rabi_Ch1} |
||
451 | # This parameter is optional. If none is given |
||
452 | # then the channel association is invoked from |
||
453 | # the sequence generation, |
||
454 | # i.e. the filename appendix (_Ch1, _Ch2 etc.) |
||
455 | # |
||
456 | # @return int: error code (0:OK, -1:error) |
||
457 | # |
||
458 | # Unused for digital pulse generators without sequence storage capability |
||
459 | # (PulseBlaster, FPGA). |
||
460 | # """ |
||
461 | # if load_dict is None: |
||
462 | # load_dict = {} |
||
463 | # path = self.ftp_path + self.get_asset_dir_on_device() |
||
464 | # |
||
465 | # # Find all files associated with the specified asset name |
||
466 | # file_list = self._get_filenames_on_device() |
||
467 | # filename = [] |
||
468 | # |
||
469 | # # Get current channel activation state to be restored after loading the asset |
||
470 | # chnl_activation = self.get_active_channels() |
||
471 | # |
||
472 | # if (asset_name + '.seq') in file_list: |
||
473 | # file_name = asset_name + '.seq' |
||
474 | # |
||
475 | # # self.tell('MMEMORY:IMPORT "{0}","{1}",SEQ \n'.format(asset_name , asset_name + '.seq')) |
||
476 | # self.tell('SOUR1:FUNC:USER "{0!s}/{1!s}"\n'.format(path, file_name)) |
||
477 | # # self.tell('SOUR1:FUNC:USER "{0}/{1}"\n'.format(path, file_name)) |
||
478 | # # set the AWG to the event jump mode: |
||
479 | # self.tell('AWGCONTROL:EVENT:JMODE EJUMP') |
||
480 | # |
||
481 | # self.current_loaded_asset = asset_name |
||
482 | # else: |
||
483 | # |
||
484 | # for file in file_list: |
||
485 | # |
||
486 | # if file == asset_name + '_ch1.wfm': |
||
487 | # #load into workspace |
||
488 | # self.tell('MMEMORY:IMPORT "{0}","{1}",WFM \n'.format(asset_name +'_ch1', asset_name + '_ch1.wfm')) |
||
489 | # #load into channel |
||
490 | # self.tell('SOUR1:WAVEFORM "{0}"\n'.format(asset_name + '_ch1')) |
||
491 | # self.log.debug('Ch1 loaded: "{0}"'.format(asset_name)) |
||
492 | # filename.append(file) |
||
493 | # elif file == asset_name + '_ch2.wfm': |
||
494 | # self.tell('MMEMORY:IMPORT "{0}","{1}",WFM \n'.format(asset_name + '_ch2', asset_name + '_ch2.wfm')) |
||
495 | # self.tell('SOUR2:WAVEFORM "{0}"\n'.format(asset_name + '_ch2')) |
||
496 | # self.log.debug('Ch2 loaded: "{0}"'.format(asset_name)) |
||
497 | # filename.append(file) |
||
498 | # |
||
499 | # if load_dict == {} and filename == []: |
||
500 | # self.log.warning('No file and channel provided for load!\nCorrect that!\n' |
||
501 | # 'Command will be ignored.') |
||
502 | # |
||
503 | # # for channel_num in list(load_dict): |
||
504 | # #asset_name = str(load_dict[channel_num]) |
||
505 | # #self.tell('MMEMORY:IMPORT "{0}","{1}",WFM \n'.format(asset_name + '_ch{0}'.format(int(channel_num)), asset_name + '_ch{0}.wfm'.format(int(channel_num)))) |
||
506 | # #self.tell('SOUR1:WAVEFORM "{0}"\n'.format(asset_name + '_ch{0}'.format(int(channel_num)))) |
||
507 | # |
||
508 | # #if len(list(load_dict)) > 0: |
||
509 | # self.current_loaded_asset = asset_name |
||
510 | # |
||
511 | # # Restore channel activation state |
||
512 | # self.set_active_channels(chnl_activation) |
||
513 | # return 0 |
||
514 | |||
515 | |||
516 | |||
517 | # file_list = self._get_filenames_on_device() |
||
518 | # filename = [] |
||
519 | # |
||
520 | # for file in file_list: |
||
521 | # if file == asset_name+'_ch1.wfm' or file == asset_name+'_ch2.wfm': |
||
522 | # filename.append(file) |
||
523 | # |
||
524 | # |
||
525 | # # Check if something could be found |
||
526 | # if len(filename) == 0: |
||
527 | # self.log.error('No files associated with asset {0} were found on AWG7122c.' |
||
528 | # 'Load to channels failed!'.format(asset_name) |
||
529 | # ) # if asset.split("_")[-1][:3] == 'ch1': |
||
530 | # self.tell('SOUR1:WAVEFORM "{0}"\n'.format(asset[:-4])) |
||
531 | # if asset.split("_")[-1][:3] == 'ch2': |
||
532 | # self.tell('SOUR2:WAVEFORM "{0}"\n'.format(asset[:-4])) |
||
533 | # self.current_loaded_asset = asset_name |
||
534 | # else: |
||
535 | # for channel in load_dict: |
||
536 | # return -1 |
||
537 | # |
||
538 | # self.log.info('The following files associated with the asset {0} were found on AWG7122c:\n' |
||
539 | # '"{1}"'.format(asset_name, filename)) |
||
540 | # |
||
541 | # # load files in AWG Waveform list |
||
542 | # for asset in filename: |
||
543 | # if asset.endswith('.wfm'): |
||
544 | # self.tell('MMEMORY:IMPORT "{0}","{1}",WFM \n'.format(asset[:-4], asset)) |
||
545 | # else: |
||
546 | # self.log.error('Could not load asset {0} to AWG7122c:\n' |
||
547 | # '"{1}"'.format(asset_name, filename)) |
||
548 | # |
||
549 | # file_path = self.ftp_path + self.get_asset_dir_on_device() |
||
550 | # # simply use the channel association of the filenames if no load_dict is given |
||
551 | # if load_dict == {}: |
||
552 | # for asset in filename: |
||
553 | # # load waveforms into channels as given in filename |
||
554 | |||
555 | # # load waveforms into channels |
||
556 | # name = load_dict[channel] |
||
557 | # self.tell('SOUR'+str(channel)+':FUNC:USER "{0}/{1}"\n'.format(file_path, name)) |
||
558 | # self.current_loaded_asset = name |
||
559 | # |
||
560 | # return 0 |
||
561 | |||
562 | def get_analog_level(self, amplitude=None, offset=None): |
||
563 | """ Retrieve the analog amplitude and offset of the provided channels. |
||
564 | |||
565 | @param list amplitude: optional, if the amplitude value (in Volt peak to peak, i.e. the |
||
566 | full amplitude) of a specific channel is desired. |
||
567 | @param list offset: optional, if the offset value (in Volt) of a specific channel is |
||
568 | desired. |
||
569 | |||
570 | @return: (dict, dict): tuple of two dicts, with keys being the channel descriptor string |
||
571 | (i.e. 'a_ch1') and items being the values for those channels. |
||
572 | Amplitude is always denoted in Volt-peak-to-peak and Offset in volts. |
||
573 | |||
574 | Note: Do not return a saved amplitude and/or offset value but instead retrieve the current |
||
575 | amplitude and/or offset directly from the device. |
||
576 | |||
577 | If nothing (or None) is passed then the levels of all channels will be returned. If no |
||
578 | analog channels are present in the device, return just empty dicts. |
||
579 | |||
580 | Example of a possible input: |
||
581 | amplitude = ['a_ch1', 'a_ch4'], offset = None |
||
582 | to obtain the amplitude of channel 1 and 4 and the offset of all channels |
||
583 | {'a_ch1': -0.5, 'a_ch4': 2.0} {'a_ch1': 0.0, 'a_ch2': 0.0, 'a_ch3': 1.0, 'a_ch4': 0.0} |
||
584 | """ |
||
585 | # FIXME: No sanity checking done here with constraints |
||
586 | amp = dict() |
||
587 | off = dict() |
||
588 | |||
589 | chnl_list = self._get_all_analog_channels() |
||
590 | |||
591 | # get pp amplitudes |
||
592 | View Code Duplication | if amplitude is None: |
|
593 | for ch_num, chnl in enumerate(chnl_list): |
||
594 | amp[chnl] = float(self.query('SOUR{0:d}:VOLT:AMPL?'.format(ch_num + 1))) |
||
595 | else: |
||
596 | for chnl in amplitude: |
||
597 | if chnl in chnl_list: |
||
598 | ch_num = int(chnl.rsplit('_ch', 1)[1]) |
||
599 | amp[chnl] = float(self.query('SOUR{0:d}:VOLT:AMPL?'.format(ch_num))) |
||
600 | else: |
||
601 | self.log.warning('Get analog amplitude from AWG7122c channel "{0}" failed. ' |
||
602 | 'Channel non-existent.'.format(chnl)) |
||
603 | |||
604 | # get voltage offsets |
||
605 | no_offset = '02' in self.installed_options or '06' in self.installed_options |
||
606 | View Code Duplication | if offset is None: |
|
607 | for ch_num, chnl in enumerate(chnl_list): |
||
608 | off[chnl] = 0.0 if no_offset else float( |
||
609 | self.query('SOUR{0:d}:VOLT:OFFS?'.format(ch_num))) |
||
610 | else: |
||
611 | for chnl in offset: |
||
612 | if chnl in chnl_list: |
||
613 | ch_num = int(chnl.rsplit('_ch', 1)[1]) |
||
614 | off[chnl] = 0.0 if no_offset else float( |
||
615 | self.query('SOUR{0:d}:VOLT:OFFS?'.format(ch_num))) |
||
616 | else: |
||
617 | self.log.warning('Get analog offset from AWG7122c channel "{0}" failed. ' |
||
618 | 'Channel non-existent.'.format(chnl)) |
||
619 | return amp, off |
||
620 | |||
621 | View Code Duplication | def set_analog_level(self, amplitude=None, offset=None): |
|
622 | """ Set amplitude and/or offset value of the provided analog channel(s). |
||
623 | |||
624 | @param dict amplitude: dictionary, with key being the channel descriptor string |
||
625 | (i.e. 'a_ch1', 'a_ch2') and items being the amplitude values |
||
626 | (in Volt peak to peak, i.e. the full amplitude) for the desired |
||
627 | channel. |
||
628 | @param dict offset: dictionary, with key being the channel descriptor string |
||
629 | (i.e. 'a_ch1', 'a_ch2') and items being the offset values |
||
630 | (in absolute volt) for the desired channel. |
||
631 | |||
632 | @return (dict, dict): tuple of two dicts with the actual set values for amplitude and |
||
633 | offset for ALL channels. |
||
634 | |||
635 | If nothing is passed then the command will return the current amplitudes/offsets. |
||
636 | |||
637 | Note: After setting the amplitude and/or offset values of the device, use the actual set |
||
638 | return values for further processing. |
||
639 | """ |
||
640 | # Check the inputs by using the constraints... |
||
641 | constraints = self.get_constraints() |
||
642 | # ...and the available analog channels |
||
643 | analog_channels = self._get_all_analog_channels() |
||
644 | |||
645 | # amplitude sanity check |
||
646 | if amplitude is not None: |
||
647 | for chnl in amplitude: |
||
648 | ch_num = int(chnl.rsplit('_ch', 1)[1]) |
||
649 | if chnl not in analog_channels: |
||
650 | self.log.warning('Channel to set (a_ch{0}) not available in AWG.\nSetting ' |
||
651 | 'analogue voltage for this channel ignored.'.format(chnl)) |
||
652 | del amplitude[chnl] |
||
653 | if amplitude[chnl] < constraints.a_ch_amplitude.min: |
||
654 | self.log.warning('Minimum Vpp for channel "{0}" is {1}. Requested Vpp of {2}V ' |
||
655 | 'was ignored and instead set to min value.' |
||
656 | ''.format(chnl, constraints.a_ch_amplitude.min, |
||
657 | amplitude[chnl])) |
||
658 | amplitude[chnl] = constraints.a_ch_amplitude.min |
||
659 | elif amplitude[chnl] > constraints.a_ch_amplitude.max: |
||
660 | self.log.warning('Maximum Vpp for channel "{0}" is {1}. Requested Vpp of {2}V ' |
||
661 | 'was ignored and instead set to max value.' |
||
662 | ''.format(chnl, constraints.a_ch_amplitude.max, |
||
663 | amplitude[chnl])) |
||
664 | amplitude[chnl] = constraints.a_ch_amplitude.max |
||
665 | # offset sanity check |
||
666 | if offset is not None: |
||
667 | for chnl in offset: |
||
668 | ch_num = int(chnl.rsplit('_ch', 1)[1]) |
||
669 | if chnl not in analog_channels: |
||
670 | self.log.warning('Channel to set (a_ch{0}) not available in AWG.\nSetting ' |
||
671 | 'offset voltage for this channel ignored.'.format(chnl)) |
||
672 | del offset[chnl] |
||
673 | if offset[chnl] < constraints.a_ch_offset.min: |
||
674 | self.log.warning('Minimum offset for channel "{0}" is {1}. Requested offset of ' |
||
675 | '{2}V was ignored and instead set to min value.' |
||
676 | ''.format(chnl, constraints.a_ch_offset.min, offset[chnl])) |
||
677 | offset[chnl] = constraints.a_ch_offset.min |
||
678 | elif offset[chnl] > constraints.a_ch_offset.max: |
||
679 | self.log.warning('Maximum offset for channel "{0}" is {1}. Requested offset of ' |
||
680 | '{2}V was ignored and instead set to max value.' |
||
681 | ''.format(chnl, constraints.a_ch_offset.max, |
||
682 | offset[chnl])) |
||
683 | offset[chnl] = constraints.a_ch_offset.max |
||
684 | |||
685 | if amplitude is not None: |
||
686 | for a_ch in amplitude: |
||
687 | ch_num = int(chnl.rsplit('_ch', 1)[1]) |
||
688 | self.write('SOUR{0:d}:VOLT:AMPL {1}'.format(ch_num, amplitude[a_ch])) |
||
689 | while int(self.query('*OPC?')) != 1: |
||
690 | time.sleep(0.1) |
||
691 | |||
692 | no_offset = '02' in self.installed_options or '06' in self.installed_options |
||
693 | if offset is not None and not no_offset: |
||
694 | for a_ch in offset: |
||
695 | ch_num = int(chnl.rsplit('_ch', 1)[1]) |
||
696 | self.write('SOUR{0:d}:VOLT:OFFSET {1}'.format(ch_num, offset[a_ch])) |
||
697 | while int(self.query('*OPC?')) != 1: |
||
698 | time.sleep(0.1) |
||
699 | return self.get_analog_level() |
||
700 | |||
701 | View Code Duplication | def get_digital_level(self, low=None, high=None): |
|
702 | """ Retrieve the digital low and high level of the provided/all channels. |
||
703 | |||
704 | @param list low: optional, if the low value (in Volt) of a specific channel is desired. |
||
705 | @param list high: optional, if the high value (in Volt) of a specific channel is desired. |
||
706 | |||
707 | @return: (dict, dict): tuple of two dicts, with keys being the channel descriptor strings |
||
708 | (i.e. 'd_ch1', 'd_ch2') and items being the values for those |
||
709 | channels. Both low and high value of a channel is denoted in volts. |
||
710 | |||
711 | Note: Do not return a saved low and/or high value but instead retrieve |
||
712 | the current low and/or high value directly from the device. |
||
713 | |||
714 | If nothing (or None) is passed then the levels of all channels are being returned. |
||
715 | If no digital channels are present, return just an empty dict. |
||
716 | |||
717 | Example of a possible input: |
||
718 | low = ['d_ch1', 'd_ch4'] |
||
719 | to obtain the low voltage values of digital channel 1 an 4. A possible answer might be |
||
720 | {'d_ch1': -0.5, 'd_ch4': 2.0} {'d_ch1': 1.0, 'd_ch2': 1.0, 'd_ch3': 1.0, 'd_ch4': 4.0} |
||
721 | Since no high request was performed, the high values for ALL channels are returned (here 4). |
||
722 | """ |
||
723 | low_val = {} |
||
724 | high_val = {} |
||
725 | |||
726 | digital_channels = self._get_all_digital_channels() |
||
727 | |||
728 | if low is None: |
||
729 | low = digital_channels |
||
730 | if high is None: |
||
731 | high = digital_channels |
||
732 | |||
733 | # get low marker levels |
||
734 | for chnl in low: |
||
735 | if chnl not in digital_channels: |
||
736 | continue |
||
737 | d_ch_number = int(chnl.rsplit('_ch', 1)[1]) |
||
738 | a_ch_number = (1 + d_ch_number) // 2 |
||
739 | marker_index = 2 - (d_ch_number % 2) |
||
740 | low_val[chnl] = float( |
||
741 | self.query('SOUR{0:d}:MARK{1:d}:VOLT:LOW?'.format(a_ch_number, marker_index))) |
||
742 | # get high marker levels |
||
743 | for chnl in high: |
||
744 | if chnl not in digital_channels: |
||
745 | continue |
||
746 | d_ch_number = int(chnl.rsplit('_ch', 1)[1]) |
||
747 | a_ch_number = (1 + d_ch_number) // 2 |
||
748 | marker_index = 2 - (d_ch_number % 2) |
||
749 | high_val[chnl] = float( |
||
750 | self.query('SOUR{0:d}:MARK{1:d}:VOLT:HIGH?'.format(a_ch_number, marker_index))) |
||
751 | |||
752 | return low_val, high_val |
||
753 | |||
754 | def set_digital_level(self, low=None, high=None): |
||
755 | """ Set low and/or high value of the provided digital channel. |
||
756 | |||
757 | @param dict low: dictionary, with key being the channel and items being |
||
758 | the low values (in volt) for the desired channel. |
||
759 | @param dict high: dictionary, with key being the channel and items being |
||
760 | the high values (in volt) for the desired channel. |
||
761 | |||
762 | @return (dict, dict): tuple of two dicts where first dict denotes the |
||
763 | current low value and the second dict the high |
||
764 | value. |
||
765 | |||
766 | If nothing is passed then the command will return two empty dicts. |
||
767 | |||
768 | Note: After setting the high and/or low values of the device, retrieve |
||
769 | them again for obtaining the actual set value(s) and use that |
||
770 | information for further processing. |
||
771 | |||
772 | The major difference to analog signals is that digital signals are |
||
773 | either ON or OFF, whereas analog channels have a varying amplitude |
||
774 | range. In contrast to analog output levels, digital output levels are |
||
775 | defined by a voltage, which corresponds to the ON status and a voltage |
||
776 | which corresponds to the OFF status (both denoted in (absolute) voltage) |
||
777 | |||
778 | In general there is no bijective correspondence between |
||
779 | (amplitude, offset) and (value high, value low)! |
||
780 | """ |
||
781 | # If you want to check the input use the constraints: |
||
782 | # constraints = self.get_constraints() |
||
783 | # |
||
784 | # for d_ch, value in low.items(): |
||
785 | # #FIXME: Tell the device the proper digital voltage low value: |
||
786 | # # self.tell('SOURCE1:MARKER{0}:VOLTAGE:LOW {1}'.format(d_ch, low[d_ch])) |
||
787 | # pass |
||
788 | # |
||
789 | # for d_ch, value in high.items(): |
||
790 | # #FIXME: Tell the device the proper digital voltage high value: |
||
791 | # # self.tell('SOURCE1:MARKER{0}:VOLTAGE:HIGH {1}'.format(d_ch, high[d_ch])) |
||
792 | # pass |
||
793 | return self.get_digital_level() |
||
794 | |||
795 | def get_active_channels(self, ch=None): |
||
796 | """ Get the active channels of the pulse generator hardware. |
||
797 | |||
798 | @param list ch: optional, if specific analog or digital channels are needed to be asked |
||
799 | without obtaining all the channels. |
||
800 | |||
801 | @return dict: where keys denoting the channel string and items boolean expressions whether |
||
802 | channel are active or not. |
||
803 | |||
804 | Example for an possible input (order is not important): |
||
805 | ch = ['a_ch2', 'd_ch2', 'a_ch1', 'd_ch5', 'd_ch1'] |
||
806 | then the output might look like |
||
807 | {'a_ch2': True, 'd_ch2': False, 'a_ch1': False, 'd_ch5': True, 'd_ch1': False} |
||
808 | |||
809 | If no parameter (or None) is passed to this method all channel states will be returned. |
||
810 | """ |
||
811 | # If you want to check the input use the constraints: |
||
812 | # constraints = self.get_constraints() |
||
813 | |||
814 | analog_channels = self._get_all_analog_channels() |
||
815 | |||
816 | active_ch = dict() |
||
817 | for ch_num, a_ch in enumerate(analog_channels): |
||
818 | ch_num = ch_num + 1 |
||
819 | # check what analog channels are active |
||
820 | active_ch[a_ch] = bool(int(self.query('OUTPUT{0:d}:STATE?'.format(ch_num)))) |
||
821 | # check how many markers are active on each channel, i.e. the DAC resolution |
||
822 | View Code Duplication | if active_ch[a_ch]: |
|
823 | digital_mrk = 10 - int(self.query('SOUR{0:d}:DAC:RES?'.format(ch_num))) |
||
824 | if digital_mrk == 2: |
||
825 | active_ch['d_ch{0:d}'.format(ch_num * 2)] = True |
||
826 | active_ch['d_ch{0:d}'.format(ch_num * 2 - 1)] = True |
||
827 | else: |
||
828 | active_ch['d_ch{0:d}'.format(ch_num * 2)] = False |
||
829 | active_ch['d_ch{0:d}'.format(ch_num * 2 - 1)] = False |
||
830 | else: |
||
831 | active_ch['d_ch{0:d}'.format(ch_num * 2)] = False |
||
832 | active_ch['d_ch{0:d}'.format(ch_num * 2 - 1)] = False |
||
833 | |||
834 | # return either all channel information or just the one asked for. |
||
835 | if ch is not None: |
||
836 | chnl_to_delete = [chnl for chnl in active_ch if chnl not in ch] |
||
837 | for chnl in chnl_to_delete: |
||
838 | del active_ch[chnl] |
||
839 | return active_ch |
||
840 | |||
841 | def set_active_channels(self, ch=None): |
||
842 | """ Set the active channels for the pulse generator hardware. |
||
843 | |||
844 | @param dict ch: dictionary with keys being the analog or digital string generic names for |
||
845 | the channels (i.e. 'd_ch1', 'a_ch2') with items being a boolean value. |
||
846 | True: Activate channel, False: Deactivate channel |
||
847 | |||
848 | @return dict: with the actual set values for ALL active analog and digital channels |
||
849 | |||
850 | If nothing is passed then the command will simply return the unchanged current state. |
||
851 | |||
852 | Note: After setting the active channels of the device, |
||
853 | use the returned dict for further processing. |
||
854 | |||
855 | Example for possible input: |
||
856 | ch={'a_ch2': True, 'd_ch1': False, 'd_ch3': True, 'd_ch4': True} |
||
857 | to activate analog channel 2 digital channel 3 and 4 and to deactivate |
||
858 | digital channel 1. |
||
859 | |||
860 | The hardware itself has to handle, whether separate channel activation is possible. |
||
861 | """ |
||
862 | current_channel_state = self.get_active_channels() |
||
863 | |||
864 | if ch is None: |
||
865 | return current_channel_state |
||
866 | |||
867 | if not set(current_channel_state).issuperset(ch): |
||
868 | self.log.error('Trying to (de)activate channels that are not present in AWG.\n' |
||
869 | 'Setting of channel activation aborted.') |
||
870 | return current_channel_state |
||
871 | |||
872 | # Determine new channel activation states |
||
873 | new_channels_state = current_channel_state.copy() |
||
874 | for chnl in ch: |
||
875 | new_channels_state[chnl] = ch[chnl] |
||
876 | |||
877 | # check if the channels to set are part of the activation_config constraints |
||
878 | constraints = self.get_constraints() |
||
879 | new_active_channels = {chnl for chnl in new_channels_state if new_channels_state[chnl]} |
||
880 | if new_active_channels not in constraints.activation_config.values(): |
||
881 | self.log.error('activation_config to set ({0}) is not allowed according to constraints.' |
||
882 | ''.format(new_active_channels)) |
||
883 | return current_channel_state |
||
884 | |||
885 | # get lists of all analog channels |
||
886 | analog_channels = self._get_all_analog_channels() |
||
887 | |||
888 | # calculate dac resolution for each analog channel and set it in hardware. |
||
889 | # Also (de)activate the analog channels accordingly |
||
890 | for a_ch in analog_channels: |
||
891 | ach_num = int(a_ch.rsplit('_ch', 1)[1]) |
||
892 | # determine number of markers for current a_ch |
||
893 | if new_channels_state['d_ch{0:d}'.format(2 * ach_num)]: |
||
894 | marker_num = 2 |
||
895 | else: |
||
896 | marker_num = 0 |
||
897 | # set DAC resolution for this channel |
||
898 | dac_res = 10 - marker_num |
||
899 | self.write('SOUR{0:d}:DAC:RES {1:d}'.format(ach_num, dac_res)) |
||
900 | # (de)activate the analog channel |
||
901 | if new_channels_state[a_ch]: |
||
902 | self.write('OUTPUT{0:d}:STATE ON'.format(ach_num)) |
||
903 | else: |
||
904 | self.write('OUTPUT{0:d}:STATE OFF'.format(ach_num)) |
||
905 | return self.get_active_channels() |
||
906 | |||
907 | def write_waveform(self, name, analog_samples, digital_samples, is_first_chunk, is_last_chunk, |
||
908 | total_number_of_samples): |
||
909 | """ |
||
910 | Write a new waveform or append samples to an already existing waveform on the device memory. |
||
911 | The flags is_first_chunk and is_last_chunk can be used as indicator if a new waveform should |
||
912 | be created or if the write process to a waveform should be terminated. |
||
913 | |||
914 | @param name: str, the name of the waveform to be created/append to |
||
915 | @param analog_samples: numpy.ndarray of type float32 containing the voltage samples |
||
916 | @param digital_samples: numpy.ndarray of type bool containing the marker states |
||
917 | (if analog channels are active, this must be the same length as |
||
918 | analog_samples) |
||
919 | @param is_first_chunk: bool, flag indicating if it is the first chunk to write. |
||
920 | If True this method will create a new empty wavveform. |
||
921 | If False the samples are appended to the existing waveform. |
||
922 | @param is_last_chunk: bool, flag indicating if it is the last chunk to write. |
||
923 | Some devices may need to know when to close the appending wfm. |
||
924 | @param total_number_of_samples: int, The number of sample points for the entire waveform |
||
925 | (not only the currently written chunk) |
||
926 | |||
927 | @return: (int, list) number of samples written (-1 indicates failed process) and list of |
||
928 | created waveform names |
||
929 | """ |
||
930 | waveforms = list() |
||
931 | |||
932 | # Sanity checks |
||
933 | constraints = self.get_constraints() |
||
934 | |||
935 | if len(analog_samples) == 0: |
||
936 | self.log.error('No analog samples passed to write_waveform method in awg7122c.') |
||
937 | return -1, waveforms |
||
938 | |||
939 | if total_number_of_samples < constraints.waveform_length.min: |
||
940 | self.log.error('Unable to write waveform.\nNumber of samples to write ({0:d}) is ' |
||
941 | 'smaller than the allowed minimum waveform length ({1:d}).' |
||
942 | ''.format(total_number_of_samples, constraints.waveform_length.min)) |
||
943 | return -1, waveforms |
||
944 | if total_number_of_samples > constraints.waveform_length.max: |
||
945 | self.log.error('Unable to write waveform.\nNumber of samples to write ({0:d}) is ' |
||
946 | 'greater than the allowed maximum waveform length ({1:d}).' |
||
947 | ''.format(total_number_of_samples, constraints.waveform_length.max)) |
||
948 | return -1, waveforms |
||
949 | |||
950 | # determine active channels |
||
951 | activation_dict = self.get_active_channels() |
||
952 | active_channels = {chnl for chnl in activation_dict if activation_dict[chnl]} |
||
953 | active_analog = sorted(chnl for chnl in active_channels if chnl.startswith('a')) |
||
954 | |||
955 | # Sanity check of channel numbers |
||
956 | if active_channels != set(analog_samples.keys()).union(set(digital_samples.keys())): |
||
957 | self.log.error('Mismatch of channel activation and sample array dimensions for ' |
||
958 | 'waveform creation.\nChannel activation is: {0}\nSample arrays have: ' |
||
959 | ''.format(active_channels, |
||
960 | set(analog_samples.keys()).union(set(digital_samples.keys())))) |
||
961 | return -1, waveforms |
||
962 | |||
963 | # Write waveforms. One for each analog channel. |
||
964 | for a_ch in active_analog: |
||
965 | # Get the integer analog channel number |
||
966 | a_ch_num = int(a_ch.rsplit('ch', 1)[1]) |
||
967 | # Get the digital channel specifiers belonging to this analog channel markers |
||
968 | mrk_ch_1 = 'd_ch{0:d}'.format(a_ch_num * 2 - 1) |
||
969 | mrk_ch_2 = 'd_ch{0:d}'.format(a_ch_num * 2) |
||
970 | |||
971 | start = time.time() |
||
972 | # Encode marker information in an array of bytes (uint8). Avoid intermediate copies!!! |
||
973 | if mrk_ch_1 in digital_samples and mrk_ch_2 in digital_samples: |
||
974 | mrk_bytes = digital_samples[mrk_ch_2].view('uint8') |
||
975 | tmp_bytes = digital_samples[mrk_ch_1].view('uint8') |
||
976 | np.left_shift(mrk_bytes, 7, out=mrk_bytes) |
||
977 | np.left_shift(tmp_bytes, 6, out=tmp_bytes) |
||
978 | np.add(mrk_bytes, tmp_bytes, out=mrk_bytes) |
||
979 | else: |
||
980 | mrk_bytes = None |
||
981 | print('Prepare digital channel data: {0}'.format(time.time() - start)) |
||
982 | |||
983 | # Create waveform name string |
||
984 | wfm_name = '{0}_ch{1:d}'.format(name, a_ch_num) |
||
985 | |||
986 | # Write WFM file for waveform |
||
987 | start = time.time() |
||
988 | self._write_wfm(filename=wfm_name, |
||
989 | analog_samples=analog_samples[a_ch], |
||
990 | digital_samples=mrk_bytes, |
||
991 | is_first_chunk=is_first_chunk, |
||
992 | is_last_chunk=is_last_chunk, |
||
993 | total_number_of_samples=total_number_of_samples) |
||
994 | |||
995 | print('Write WFM file: {0}'.format(time.time() - start)) |
||
996 | |||
997 | # transfer waveform to AWG and load into workspace |
||
998 | start = time.time() |
||
999 | self._send_file(filename=wfm_name + '.wfm') |
||
1000 | print('Send WFM file: {0}'.format(time.time() - start)) |
||
1001 | |||
1002 | start = time.time() |
||
1003 | self.write('MMEM:IMP "{0}","{1}",WFM'.format(wfm_name, wfm_name + '.wfm')) |
||
1004 | # Wait for everything to complete |
||
1005 | while int(self.query('*OPC?')) != 1: |
||
1006 | time.sleep(0.2) |
||
1007 | # Just to make sure |
||
1008 | while wfm_name not in self.get_waveform_names(): |
||
1009 | time.sleep(0.2) |
||
1010 | print('Load WFM file into workspace: {0}'.format(time.time() - start)) |
||
1011 | |||
1012 | # Append created waveform name to waveform list |
||
1013 | waveforms.append(wfm_name) |
||
1014 | return total_number_of_samples, waveforms |
||
1015 | |||
1016 | def write_sequence(self, name, sequence_parameters): |
||
1017 | """ |
||
1018 | Write a new sequence on the device memory. |
||
1019 | |||
1020 | @param name: str, the name of the waveform to be created/append to |
||
1021 | @param sequence_parameters: dict, dictionary containing the parameters for a sequence |
||
1022 | |||
1023 | @return: int, number of sequence steps written (-1 indicates failed process) |
||
1024 | """ |
||
1025 | # Check if device has sequencer option installed |
||
1026 | if not self.has_sequence_mode(): |
||
1027 | self.log.error('Direct sequence generation in AWG not possible. Sequencer option not ' |
||
1028 | 'installed.') |
||
1029 | return -1 |
||
1030 | # FIXME: I can not possibly implement that without the hardware to test it. |
||
1031 | return -1 |
||
1032 | |||
1033 | def get_waveform_names(self): |
||
1034 | """ Retrieve the names of all uploaded waveforms on the device. |
||
1035 | |||
1036 | @return list: List of all uploaded waveform name strings in the device workspace. |
||
1037 | """ |
||
1038 | wfm_list_len = int(self.query('WLIS:SIZE?')) |
||
1039 | wfm_list = list() |
||
1040 | for index in range(1, wfm_list_len + 1): |
||
1041 | wfm_list.append(self.query('WLIS:NAME? {0:d}'.format(index))) |
||
1042 | return sorted(wfm_list) |
||
1043 | |||
1044 | def get_sequence_names(self): |
||
1045 | """ Retrieve the names of all uploaded sequence on the device. |
||
1046 | |||
1047 | @return list: List of all uploaded sequence name strings in the device workspace. |
||
1048 | """ |
||
1049 | # FIXME: No idea without hardware to test |
||
1050 | return list() |
||
1051 | |||
1052 | def delete_waveform(self, waveform_name): |
||
1053 | """ Delete the waveform with name "waveform_name" from the device memory. |
||
1054 | |||
1055 | @param str waveform_name: The name of the waveform to be deleted |
||
1056 | Optionally a list of waveform names can be passed. |
||
1057 | |||
1058 | @return list: a list of deleted waveform names. |
||
1059 | """ |
||
1060 | if isinstance(waveform_name, str): |
||
1061 | waveform_name = [waveform_name] |
||
1062 | |||
1063 | avail_waveforms = self.get_waveform_names() |
||
1064 | deleted_waveforms = list() |
||
1065 | for waveform in waveform_name: |
||
1066 | if waveform in avail_waveforms: |
||
1067 | self.write('WLIS:WAV:DEL "{0}"'.format(waveform)) |
||
1068 | deleted_waveforms.append(waveform) |
||
1069 | return sorted(deleted_waveforms) |
||
1070 | |||
1071 | def delete_sequence(self, sequence_name): |
||
1072 | """ Delete the sequence with name "sequence_name" from the device memory. |
||
1073 | |||
1074 | @param str sequence_name: The name of the sequence to be deleted |
||
1075 | Optionally a list of sequence names can be passed. |
||
1076 | |||
1077 | @return list: a list of deleted sequence names. |
||
1078 | """ |
||
1079 | # FIXME: Again... no idea without hardware to play with |
||
1080 | return list() |
||
1081 | |||
1082 | def get_interleave(self): |
||
1083 | """ Check whether Interleave is ON or OFF in AWG. |
||
1084 | |||
1085 | @return bool: True: ON, False: OFF |
||
1086 | |||
1087 | Will always return False for pulse generator hardware without interleave. |
||
1088 | """ |
||
1089 | return bool(int(self.query('AWGC:INT:STAT?'))) |
||
1090 | |||
1091 | def set_interleave(self, state=False): |
||
1092 | """ Turns the interleave of an AWG on or off. |
||
1093 | |||
1094 | @param bool state: The state the interleave should be set to |
||
1095 | (True: ON, False: OFF) |
||
1096 | |||
1097 | @return bool: actual interleave status (True: ON, False: OFF) |
||
1098 | |||
1099 | Note: After setting the interleave of the device, retrieve the |
||
1100 | interleave again and use that information for further processing. |
||
1101 | |||
1102 | Unused for pulse generator hardware other than an AWG. |
||
1103 | """ |
||
1104 | if not isinstance(state, bool): |
||
1105 | return self.get_interleave() |
||
1106 | |||
1107 | # if the interleave state should not be changed from the current state, do nothing. |
||
1108 | if state is self.get_interleave(): |
||
1109 | return state |
||
1110 | |||
1111 | self.write('AWGC:INT:STAT {0:d}'.format(int(state))) |
||
1112 | while int(self.query('*OPC?')) != 1: |
||
1113 | time.sleep(0.1) |
||
1114 | return self.get_interleave() |
||
1115 | |||
1116 | def write(self, command): |
||
1117 | """ Sends a command string to the device. |
||
1118 | |||
1119 | @param string command: string containing the command |
||
1120 | |||
1121 | @return int: error code (0:OK, -1:error) |
||
1122 | """ |
||
1123 | bytes_written, enum_status_code = self.awg.write(command) |
||
1124 | return int(enum_status_code) |
||
1125 | |||
1126 | def query(self, question): |
||
1127 | """ Asks the device a 'question' and receive and return an answer from it. |
||
1128 | |||
1129 | @param string question: string containing the command |
||
1130 | |||
1131 | @return string: the answer of the device to the 'question' in a string |
||
1132 | """ |
||
1133 | answer = self.awg.query(question) |
||
1134 | answer = answer.strip() |
||
1135 | answer = answer.rstrip('\n') |
||
1136 | answer = answer.rstrip() |
||
1137 | answer = answer.strip('"') |
||
1138 | return answer |
||
1139 | |||
1140 | def reset(self): |
||
1141 | """ Reset the device. |
||
1142 | |||
1143 | @return int: error code (0:OK, -1:error) |
||
1144 | """ |
||
1145 | self.write('*RST') |
||
1146 | self.write('*WAI') |
||
1147 | return 0 |
||
1148 | |||
1149 | def has_sequence_mode(self): |
||
1150 | """ Asks the pulse generator whether sequence mode exists. |
||
1151 | |||
1152 | @return: bool, True for yes, False for no. |
||
1153 | """ |
||
1154 | return True |
||
1155 | |||
1156 | def set_lowpass_filter(self, a_ch, cutoff_freq): |
||
1157 | """ Set a lowpass filter to the analog channels of the AWG. |
||
1158 | |||
1159 | @param int a_ch: To which channel to apply, either 1 or 2. |
||
1160 | @param cutoff_freq: Cutoff Frequency of the lowpass filter in Hz. |
||
1161 | """ |
||
1162 | if a_ch not in (1, 2): |
||
1163 | return |
||
1164 | self.write('OUTPUT{0:d}:FILTER:LPASS:FREQUENCY {1:f}MHz'.format(a_ch, cutoff_freq / 1e6)) |
||
1165 | |||
1166 | def set_jump_timing(self, synchronous=False): |
||
1167 | """Sets control of the jump timing in the AWG. |
||
1168 | |||
1169 | @param bool synchronous: if True the jump timing will be set to synchornous, otherwise the |
||
1170 | jump timing will be set to asynchronous. |
||
1171 | |||
1172 | If the Jump timing is set to asynchornous the jump occurs as quickly as possible after an |
||
1173 | event occurs (e.g. event jump tigger), if set to synchornous the jump is made after the |
||
1174 | current waveform is output. The default value is asynchornous. |
||
1175 | """ |
||
1176 | timing = 'SYNC' if synchronous else 'ASYNC' |
||
1177 | self.write('EVEN:JTIM {0}'.format(timing)) |
||
1178 | |||
1179 | def set_mode(self, mode): |
||
1180 | """Change the output mode of the AWG5000 series. |
||
1181 | |||
1182 | @param str mode: Options for mode (case-insensitive): |
||
1183 | continuous - 'C' |
||
1184 | triggered - 'T' |
||
1185 | gated - 'G' |
||
1186 | sequence - 'S' |
||
1187 | |||
1188 | """ |
||
1189 | look_up = {'C': 'CONT', |
||
1190 | 'T': 'TRIG', |
||
1191 | 'G': 'GAT', |
||
1192 | 'E': 'ENH', |
||
1193 | 'S': 'SEQ'} |
||
1194 | self.write('AWGC:RMOD {0!s}'.format(look_up[mode.upper()])) |
||
1195 | |||
1196 | # works |
||
1197 | def get_sequencer_mode(self, output_as_int=False): |
||
1198 | """ Asks the AWG which sequencer mode it is using. |
||
1199 | |||
1200 | @param: bool output_as_int: optional boolean variable to set the output |
||
1201 | @return: str or int with the following meaning: |
||
1202 | 'HARD' or 0 indicates Hardware Mode |
||
1203 | 'SOFT' or 1 indicates Software Mode |
||
1204 | 'Error' or -1 indicates a failure of request |
||
1205 | |||
1206 | It can be either in Hardware Mode or in Software Mode. The optional |
||
1207 | variable output_as_int sets if the returned value should be either an |
||
1208 | integer number or string. |
||
1209 | """ |
||
1210 | message = self.query('AWGC:SEQ:TYPE?') |
||
1211 | if 'HARD' in message: |
||
1212 | return 0 if output_as_int else 'Hardware-Sequencer' |
||
1213 | elif 'SOFT' in message: |
||
1214 | return 1 if output_as_int else 'Software-Sequencer' |
||
1215 | return -1 if output_as_int else 'Request-Error' |
||
1216 | |||
1217 | def _delete_file(self, filename): |
||
1218 | """ |
||
1219 | |||
1220 | @param str filename: The full filename to delete from FTP cwd |
||
1221 | """ |
||
1222 | if filename in self._get_filenames_on_device(): |
||
1223 | with FTP(self._ip_address) as ftp: |
||
1224 | ftp.login(user=self._username, passwd=self._password) |
||
1225 | ftp.cwd(self.ftp_working_dir) |
||
1226 | ftp.delete(filename) |
||
1227 | return |
||
1228 | |||
1229 | View Code Duplication | def _send_file(self, filename): |
|
1230 | """ |
||
1231 | |||
1232 | @param filename: |
||
1233 | @return: |
||
1234 | """ |
||
1235 | # check input |
||
1236 | if not filename: |
||
1237 | self.log.error('No filename provided for file upload to awg!\nCommand will be ignored.') |
||
1238 | return -1 |
||
1239 | |||
1240 | filepath = os.path.join(self._tmp_work_dir, filename) |
||
1241 | if not os.path.isfile(filepath): |
||
1242 | self.log.error('No file "{0}" found in "{1}". Unable to upload!' |
||
1243 | ''.format(filename, self._tmp_work_dir)) |
||
1244 | return -1 |
||
1245 | |||
1246 | # Delete old file on AWG by the same filename |
||
1247 | self._delete_file(filename) |
||
1248 | |||
1249 | # Transfer file |
||
1250 | with FTP(self._ip_address) as ftp: |
||
1251 | ftp.login(user=self._username, passwd=self._password) |
||
1252 | ftp.cwd(self.ftp_working_dir) |
||
1253 | with open(filepath, 'rb') as file: |
||
1254 | ftp.storbinary('STOR ' + filename, file) |
||
1255 | return 0 |
||
1256 | |||
1257 | View Code Duplication | def _get_filenames_on_device(self): |
|
1258 | """ |
||
1259 | |||
1260 | @return list: filenames found in <ftproot>\\waves |
||
1261 | """ |
||
1262 | filename_list = list() |
||
1263 | with FTP(self._ip_address) as ftp: |
||
1264 | ftp.login(user=self._username, passwd=self._password) |
||
1265 | ftp.cwd(self.ftp_working_dir) |
||
1266 | # get only the files from the dir and skip possible directories |
||
1267 | log = list() |
||
1268 | ftp.retrlines('LIST', callback=log.append) |
||
1269 | for line in log: |
||
1270 | if '<DIR>' not in line: |
||
1271 | # that is how a potential line is looking like: |
||
1272 | # '05-10-16 05:22PM 292 SSR aom adjusted.seq' |
||
1273 | # The first part consists of the date information. Remove this information and |
||
1274 | # separate the first number, which indicates the size of the file. This is |
||
1275 | # necessary if the filename contains whitespaces. |
||
1276 | size_filename = line[18:].lstrip() |
||
1277 | # split after the first appearing whitespace and take the rest as filename. |
||
1278 | # Remove for safety all trailing and leading whitespaces: |
||
1279 | filename = size_filename.split(' ', 1)[1].strip() |
||
1280 | filename_list.append(filename) |
||
1281 | return filename_list |
||
1282 | |||
1283 | def _get_all_channels(self): |
||
1284 | """ |
||
1285 | Helper method to return a sorted list of all technically available channel descriptors |
||
1286 | (e.g. ['a_ch1', 'a_ch2', 'd_ch1', 'd_ch2']) |
||
1287 | |||
1288 | @return list: Sorted list of channels |
||
1289 | """ |
||
1290 | avail_channels = ['a_ch1', 'd_ch1', 'd_ch2'] |
||
1291 | if not self.get_interleave(): |
||
1292 | avail_channels.extend(['a_ch2', 'd_ch3', 'd_ch4']) |
||
1293 | return sorted(avail_channels) |
||
1294 | |||
1295 | def _get_all_analog_channels(self): |
||
1296 | """ |
||
1297 | Helper method to return a sorted list of all technically available analog channel |
||
1298 | descriptors (e.g. ['a_ch1', 'a_ch2']) |
||
1299 | |||
1300 | @return list: Sorted list of analog channels |
||
1301 | """ |
||
1302 | return sorted(chnl for chnl in self._get_all_channels() if chnl.startswith('a')) |
||
1303 | |||
1304 | def _get_all_digital_channels(self): |
||
1305 | """ |
||
1306 | Helper method to return a sorted list of all technically available digital channel |
||
1307 | descriptors (e.g. ['d_ch1', 'd_ch2']) |
||
1308 | |||
1309 | @return list: Sorted list of digital channels |
||
1310 | """ |
||
1311 | return sorted(chnl for chnl in self._get_all_channels() if chnl.startswith('d')) |
||
1312 | |||
1313 | def _is_output_on(self): |
||
1314 | """ |
||
1315 | Aks the AWG if the output is enabled, i.e. if the AWG is running |
||
1316 | |||
1317 | @return bool: True: output on, False: output off |
||
1318 | """ |
||
1319 | return bool(int(self.query('AWGC:RST?'))) |
||
1320 | |||
1321 | def _zeroing_enabled(self): |
||
1322 | """ |
||
1323 | Checks if the zeroing option is enabled. Only available on devices with option '06'. |
||
1324 | |||
1325 | @return bool: True: enabled, False: disabled |
||
1326 | """ |
||
1327 | if '06' not in self.installed_options: |
||
1328 | return False |
||
1329 | return bool(int(self.query('AWGC:INT:ZER?'))) |
||
1330 | |||
1331 | def _write_wfm(self, filename, analog_samples, marker_bytes, is_first_chunk, is_last_chunk, |
||
1332 | total_number_of_samples): |
||
1333 | """ |
||
1334 | Appends a sampled chunk of a whole waveform to a wfm-file. Create the file |
||
1335 | if it is the first chunk. |
||
1336 | If both flags (is_first_chunk, is_last_chunk) are set to TRUE it means |
||
1337 | that the whole ensemble is written as a whole in one big chunk. |
||
1338 | |||
1339 | @param filename: string, represents the name of the sampled waveform |
||
1340 | @param analog_samples: dict containing float32 numpy ndarrays, contains the |
||
1341 | samples for the analog channels that |
||
1342 | are to be written by this function call. |
||
1343 | @param marker_bytes: np.ndarray containing bool numpy ndarrays, contains the samples |
||
1344 | for the digital channels that |
||
1345 | are to be written by this function call. |
||
1346 | @param total_number_of_samples: int, The total number of samples in the |
||
1347 | entire waveform. Has to be known in advance. |
||
1348 | @param is_first_chunk: bool, indicates if the current chunk is the |
||
1349 | first write to this file. |
||
1350 | @param is_last_chunk: bool, indicates if the current chunk is the last |
||
1351 | write to this file. |
||
1352 | """ |
||
1353 | # The memory overhead of the tmp file write/read process in bytes. |
||
1354 | tmp_bytes_overhead = 104857600 # 100 MB |
||
1355 | tmp_samples = tmp_bytes_overhead // 5 |
||
1356 | if tmp_samples > len(analog_samples): |
||
1357 | tmp_samples = len(analog_samples) |
||
1358 | |||
1359 | if not filename.endswith('.wfm'): |
||
1360 | filename += '.wfm' |
||
1361 | wfm_path = os.path.join(self._tmp_work_dir, filename) |
||
1362 | |||
1363 | # if it is the first chunk, create the WFM file with header. |
||
1364 | if is_first_chunk: |
||
1365 | with open(wfm_path, 'wb') as wfm_file: |
||
1366 | # write the first line, which is the header file, if first chunk is passed: |
||
1367 | num_bytes = str(int(total_number_of_samples * 5)) |
||
1368 | num_digits = str(len(num_bytes)) |
||
1369 | header = 'MAGIC 1000\r\n#{0}{1}'.format(num_digits, num_bytes) |
||
1370 | wfm_file.write(header.encode()) |
||
1371 | |||
1372 | # For the WFM file format unfortunately we need to write the digital sampels together |
||
1373 | # with the analog samples. Therefore we need a temporary copy of all samples for each |
||
1374 | # analog channel. |
||
1375 | write_array = np.zeros(tmp_samples, dtype='float32, uint8') |
||
1376 | |||
1377 | # Consecutively prepare and write chunks of maximal size tmp_bytes_overhead to file |
||
1378 | samples_written = 0 |
||
1379 | with open(wfm_path, 'ab') as wfm_file: |
||
1380 | while samples_written < len(analog_samples): |
||
1381 | write_end = samples_written + write_array.size |
||
1382 | # Prepare tmp write array |
||
1383 | write_array['f0'] = analog_samples[samples_written:write_end] |
||
1384 | if marker_bytes is not None: |
||
1385 | write_array['f1'] = marker_bytes[samples_written:write_end] |
||
1386 | # Write to file |
||
1387 | wfm_file.write(write_array) |
||
1388 | # Increment write counter |
||
1389 | samples_written = write_end |
||
1390 | # Reduce write array size if |
||
1391 | if 0 < total_number_of_samples - samples_written < write_array.size: |
||
1392 | write_array.resize(total_number_of_samples - samples_written) |
||
1393 | |||
1394 | del write_array |
||
1395 | |||
1396 | # append footer if it's the last chunk to write |
||
1397 | if is_last_chunk: |
||
1398 | # the footer encodes the sample rate, which was used for that file: |
||
1399 | footer = 'CLOCK {0:16.10E}\r\n'.format(self.get_sample_rate()) |
||
1400 | with open(wfm_path, 'ab') as wfm_file: |
||
1401 | wfm_file.write(footer.encode()) |
||
1402 | return |
||
1403 |