Completed
Pull Request — master (#384)
by
unknown
01:25
created

AWG7122C.write_waveform()   F

Complexity

Conditions 14

Size

Total Lines 108

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 14
dl 0
loc 108
rs 2.52
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like AWG7122C.write_waveform() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
3
"""
4
This file contains the Qudi hardware module for AWG7000 Series.
5
6
Qudi is free software: you can redistribute it and/or modify
7
it under the terms of the GNU General Public License as published by
8
the Free Software Foundation, either version 3 of the License, or
9
(at your option) any later version.
10
11
Qudi is distributed in the hope that it will be useful,
12
but WITHOUT ANY WARRANTY; without even the implied warranty of
13
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
GNU General Public License for more details.
15
16
You should have received a copy of the GNU General Public License
17
along with Qudi. If not, see <http://www.gnu.org/licenses/>.
18
19
Copyright (c) the Qudi Developers. See the COPYRIGHT.txt file at the
20
top-level directory of this distribution and at <https://github.com/Ulm-IQO/qudi/>
21
"""
22
23
24
import os
25
import time
26
import visa
27
import numpy as np
28
from ftplib import FTP
29
from collections import OrderedDict
30
31
from core.util.modules import get_home_dir
32
from core.module import Base, ConfigOption
33
from interface.pulser_interface import PulserInterface, PulserConstraints
34
35
36
# TODO: add a method to sequencing to change from dynamic to jump in order to get triggers for odmr
37
class AWG7122C(Base, PulserInterface):
38
    """
39
    Unstable and under construction, Jochen Scheuer
40
    ... but about to be become awesome, Nikolas Tomek
41
    """
42
43
    _modclass = 'awg7122c'
44
    _modtype = 'hardware'
45
46
    # config options
47
    _tmp_work_dir = ConfigOption(name='tmp_work_dir',
48
                                 default=os.path.join(get_home_dir(), 'pulsed_files'),
49
                                 missing='warn')
50
    _visa_address = ConfigOption(name='awg_visa_address', missing='error')
51
    _ip_address = ConfigOption(name='awg_ip_address', missing='error')
52
    _ftp_dir = ConfigOption(name='ftp_root_dir', default='C:\\inetpub\\ftproot', missing='warn')
53
    _username = ConfigOption(name='ftp_login', default='anonymous', missing='warn')
54
    _password = ConfigOption(name='ftp_passwd', default='anonymous@', missing='warn')
55
    _default_sample_rate = ConfigOption(name='default_sample_rate', default=None, missing='warn')
56
    _visa_timeout = ConfigOption(name='timeout', default=30, missing='nothing')
57
58
    def __init__(self, config, **kwargs):
59
        super().__init__(config=config, **kwargs)
60
61
        # Get an instance of the visa resource manager
62
        self._rm = visa.ResourceManager()
63
64
        self.awg = None  # This variable will hold a reference to the awg visa resource
65
66
        self.ftp_working_dir = 'waves'  # subfolder of FTP root dir on AWG disk to work in
67
68
        self.installed_options = list()  # will hold the encoded installed options available on awg
69
        self.__loaded_sequence = ''  # Helper variable since a loaded sequence can not be queried :(
70
        self._marker_byte_dict = {0: b'\x00', 1: b'\x01', 2: b'\x02', 3: b'\x03'}
71
72
    def on_activate(self):
73
        """ Initialisation performed during activation of the module.
74
        """
75
        # Create work directory if necessary
76
        if not os.path.exists(self._tmp_work_dir):
77
            os.makedirs(os.path.abspath(self._tmp_work_dir))
78
79
        # connect to awg using PyVISA
80
        if self._visa_address not in self._rm.list_resources():
81
            self.awg = None
82
            self.log.error(
83
                'VISA address "{0}" not found by the pyVISA resource manager.\nCheck '
84
                'the connection by using for example "Agilent Connection Expert".'
85
                ''.format(self._visa_address))
86
        else:
87
            self.awg = self._rm.open_resource(self._visa_address)
88
            # set timeout by default to 30 sec
89
            self.awg.timeout = self._visa_timeout * 1000
90
91
        # try connecting to AWG using FTP protocol
92
        with FTP(self._ip_address) as ftp:
93
            ftp.login(user=self._username, passwd=self._password)
94
            ftp.cwd(self.ftp_working_dir)
95
96
        # Options of AWG7000 series:
97
        #              Option 01: Memory expansion to 64,8 MSamples (Million points)
98
        #              Option 06: Interleave and extended analog output bandwidth
99
        #              Option 08: Fast sequence switching
100
        #              Option 09: Subsequence and Table Jump
101
        self.installed_options = self.query('*OPT?').split(',')
102
        # TODO: inclulde proper routine to check and change zeroing functionality
103
104
        # Set current directory on AWG
105
        self.write('MMEM:CDIR "{0}"'.format(os.path.join(self._ftp_dir, self.ftp_working_dir)))
106
107
    def on_deactivate(self):
108
        """ Deinitialisation performed during deactivation of the module.
109
        """
110
        # Closes the connection to the AWG
111
        try:
112
            self.awg.close()
113
        except:
114
            self.log.debug('Closing AWG connection using pyvisa failed.')
115
        self.log.info('Closed connection to AWG')
116
        return
117
118
    # =========================================================================
119
    # Below all the Pulser Interface routines.
120
    # =========================================================================
121
122
    def get_constraints(self):
123
        """
124
        Retrieve the hardware constrains from the Pulsing device.
125
126
        @return constraints object: object with pulser constraints as attributes.
127
128
        Provides all the constraints (e.g. sample_rate, amplitude, total_length_bins,
129
        channel_config, ...) related to the pulse generator hardware to the caller.
130
131
            SEE PulserConstraints CLASS IN pulser_interface.py FOR AVAILABLE CONSTRAINTS!!!
132
133
        If you are not sure about the meaning, look in other hardware files to get an impression.
134
        If still additional constraints are needed, then they have to be added to the
135
        PulserConstraints class.
136
137
        Each scalar parameter is an ScalarConstraints object defined in cor.util.interfaces.
138
        Essentially it contains min/max values as well as min step size, default value and unit of
139
        the parameter.
140
141
        PulserConstraints.activation_config differs, since it contain the channel
142
        configuration/activation information of the form:
143
            {<descriptor_str>: <channel_set>,
144
             <descriptor_str>: <channel_set>,
145
             ...}
146
147
        If the constraints cannot be set in the pulsing hardware (e.g. because it might have no
148
        sequence mode) just leave it out so that the default is used (only zeros).
149
        """
150
        # TODO: Check values for AWG7122c
151
        constraints = PulserConstraints()
152
153
        if self.get_interleave():
154
            constraints.sample_rate.min = 12.0e9
155
            constraints.sample_rate.max = 24.0e9
156
            constraints.sample_rate.step = 5.0e2
157
            constraints.sample_rate.default = 24.0e9
158
        else:
159
            constraints.sample_rate.min = 10.0e6
160
            constraints.sample_rate.max = 12.0e9
161
            constraints.sample_rate.step = 10.0e6
162
            constraints.sample_rate.default = 12.0e9
163
164
        constraints.a_ch_amplitude.max = 1.0
165
        constraints.a_ch_amplitude.step = 0.001
166
        constraints.a_ch_amplitude.default = 1.0
167
        if self._zeroing_enabled():
168
            constraints.a_ch_amplitude.min = 0.25
169
        else:
170
            constraints.a_ch_amplitude.min = 0.5
171
172
        constraints.d_ch_low.min = -1.4
173
        constraints.d_ch_low.max = 0.9
174
        constraints.d_ch_low.step = 0.01
175
        constraints.d_ch_low.default = 0.0
176
177
        constraints.d_ch_high.min = -0.9
178
        constraints.d_ch_high.max = 1.4
179
        constraints.d_ch_high.step = 0.01
180
        constraints.d_ch_high.default = 1.4
181
182
        constraints.waveform_length.min = 1
183
        constraints.waveform_length.step = 1
184
        constraints.waveform_length.default = 80
185
        if '01' in self.installed_options:
186
            constraints.waveform_length.max = 64800000
187
        else:
188
            constraints.waveform_length.max = 32000000
189
190
        constraints.waveform_num.min = 1
191
        constraints.waveform_num.max = 32000
192
        constraints.waveform_num.step = 1
193
        constraints.waveform_num.default = 1
194
195
        constraints.sequence_num.min = 1
196
        constraints.sequence_num.max = 16000
197
        constraints.sequence_num.step = 1
198
        constraints.sequence_num.default = 1
199
200
        constraints.subsequence_num.min = 1
201
        constraints.subsequence_num.max = 8000
202
        constraints.subsequence_num.step = 1
203
        constraints.subsequence_num.default = 1
204
205
        # If sequencer mode is available then these should be specified
206
        constraints.repetitions.min = 0
207
        constraints.repetitions.max = 65539
208
        constraints.repetitions.step = 1
209
        constraints.repetitions.default = 0
210
211
        # ToDo: Check how many external triggers this device has
212
        constraints.event_triggers = ['A', 'B']
213
        constraints.flags = list()
214
215
        constraints.sequence_steps.min = 0
216
        constraints.sequence_steps.max = 8000
217
        constraints.sequence_steps.step = 1
218
        constraints.sequence_steps.default = 0
219
220
        # the name a_ch<num> and d_ch<num> are generic names, which describe UNAMBIGUOUSLY the
221
        # channels. Here all possible channel configurations are stated, where only the generic
222
        # names should be used. The names for the different configurations can be customary chosen.
223
        activation_config = OrderedDict()
224
        activation_config['All'] = {'a_ch1', 'd_ch1', 'd_ch2', 'a_ch2', 'd_ch3', 'd_ch4'}
225
        # Usage of channel 1 only:
226
        activation_config['A1_M1_M2'] = {'a_ch1', 'd_ch1', 'd_ch2'}
227
        # Usage of channel 2 only:
228
        activation_config['A2_M3_M4'] = {'a_ch2', 'd_ch3', 'd_ch4'}
229
        # Only both analog channels
230
        activation_config['Two_Analog'] = {'a_ch1', 'a_ch2'}
231
        # Usage of one analog channel without digital channel
232
        activation_config['Analog1'] = {'a_ch1'}
233
        # Usage of one analog channel without digital channel
234
        activation_config['Analog2'] = {'a_ch2'}
235
        constraints.activation_config = activation_config
236
        return constraints
237
238
    def pulser_on(self):
239
        """ Switches the pulsing device on.
240
241
        @return int: error code (0:OK, -1:error, higher number corresponds to
242
                                 current status of the device. Check then the
243
                                 class variable status_dic.)
244
        """
245
        # do nothing if AWG is already running
246
        if not self._is_output_on():
247
            self.write('AWGC:RUN')
248
            # wait until the AWG is actually running
249
            while not self._is_output_on():
250
                time.sleep(0.2)
251
        return self.get_status()
252
253
    def pulser_off(self):
254
        """ Switches the pulsing device off.
255
256
        @return int: error code (0:OK, -1:error, higher number corresponds to
257
                                 current status of the device. Check then the
258
                                 class variable status_dic.)
259
        """
260
        # do nothing if AWG is already idle
261
        if self._is_output_on():
262
            self.write('AWGC:STOP')
263
            # wait until the AWG has actually stopped
264
            while self._is_output_on():
265
                time.sleep(0.2)
266
        return self.get_status()
267
268 View Code Duplication
    def load_waveform(self, load_dict):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
269
        """ Loads a waveform to the specified channel of the pulsing device.
270
        For devices that have a workspace (i.e. AWG) this will load the waveform from the device
271
        workspace into the channel.
272
        For a device without mass memory this will make the waveform/pattern that has been
273
        previously written with self.write_waveform ready to play.
274
275
        @param load_dict:  dict|list, a dictionary with keys being one of the available channel
276
                                      index and values being the name of the already written
277
                                      waveform to load into the channel.
278
                                      Examples:   {1: rabi_ch1, 2: rabi_ch2} or
279
                                                  {1: rabi_ch2, 2: rabi_ch1}
280
                                      If just a list of waveform names if given, the channel
281
                                      association will be invoked from the channel
282
                                      suffix '_ch1', '_ch2' etc.
283
284
        @return dict: Dictionary containing the actually loaded waveforms per channel.
285
        """
286
        if isinstance(load_dict, list):
287
            new_dict = dict()
288
            for waveform in load_dict:
289
                channel = int(waveform.rsplit('_ch', 1)[1])
290
                new_dict[channel] = waveform
291
            load_dict = new_dict
292
293
        # Get all active channels
294
        chnl_activation = self.get_active_channels()
295
        analog_channels = sorted(
296
            chnl for chnl in chnl_activation if chnl.startswith('a') and chnl_activation[chnl])
297
298
        # Check if all channels to load to are active
299
        channels_to_set = {'a_ch{0:d}'.format(chnl_num) for chnl_num in load_dict}
300
        if not channels_to_set.issubset(analog_channels):
301
            self.log.error('Unable to load all waveforms into channels.\n'
302
                           'One or more channels to set are not active.')
303
            return self.get_loaded_assets()
304
305
        # Check if all waveforms to load are present on device memory
306
        if not set(load_dict.values()).issubset(self.get_waveform_names()):
307
            self.log.error('Unable to load waveforms into channels.\n'
308
                           'One or more waveforms to load are missing on device memory.')
309
            return self.get_loaded_assets()
310
311
        # Load waveforms into channels
312
        for chnl_num, waveform in load_dict.items():
313
            # load into channel
314
            self.write('SOUR{0:d}:WAV "{1}"'.format(chnl_num, waveform))
315
            while self.query('SOUR{0:d}:WAV?'.format(chnl_num)) != waveform:
316
                time.sleep(0.1)
317
318
        self.__loaded_sequence = ''
319
        return self.get_loaded_assets()
320
321
    def load_sequence(self, sequence_name):
322
        """ Loads a sequence to the channels of the device in order to be ready for playback.
323
        For devices that have a workspace (i.e. AWG) this will load the sequence from the device
324
        workspace into the channels.
325
        For a device without mass memory this will make the waveform/pattern that has been
326
        previously written with self.write_waveform ready to play.
327
328
        @param sequence_name:  dict|list, a dictionary with keys being one of the available channel
329
                                      index and values being the name of the already written
330
                                      waveform to load into the channel.
331
                                      Examples:   {1: rabi_ch1, 2: rabi_ch2} or
332
                                                  {1: rabi_ch2, 2: rabi_ch1}
333
                                      If just a list of waveform names if given, the channel
334
                                      association will be invoked from the channel
335
                                      suffix '_ch1', '_ch2' etc.
336
337
        @return dict: Dictionary containing the actually loaded waveforms per channel.
338
        """
339
        if sequence_name not in self.get_sequence_names():
340
            self.log.error('Unable to load sequence.\n'
341
                           'Sequence to load is missing on device memory.')
342
            return self.get_loaded_assets()
343
344
        # Load sequence
345
        file_name = sequence_name + '{0}.seq'.format(sequence_name)
346
347
        # self.tell('MMEMORY:IMPORT "{0}","{1}",SEQ \n'.format(asset_name , asset_name + '.seq'))
348
        self.write('SOUR1:FUNC:USER "{0!s}"'.format(file_name))
349
        print(self.query('SOUR1:FUNC:USER?'))
350
        # while self.query('SOUR1:FUNC:USER?') != sequence_name:
351
        #     time.sleep(0.2)
352
353
        # set the AWG to the event jump mode:
354
        self.write('AWGC:EVENT:JMODE EJUMP')
355
356
        self.__loaded_sequence = sequence_name
357
        return self.get_loaded_assets()
358
359
    def get_loaded_assets(self):
360
        """
361
        Retrieve the currently loaded asset names for each active channel of the device.
362
        The returned dictionary will have the channel numbers as keys.
363
        In case of loaded waveforms the dictionary values will be the waveform names.
364
        In case of a loaded sequence the values will be the sequence name appended by a suffix
365
        representing the track loaded to the respective channel (i.e. '<sequence_name>_1').
366
367
        @return (dict, str): Dictionary with keys being the channel number and values being the
368
                             respective asset loaded into the channel,
369
                             string describing the asset type ('waveform' or 'sequence')
370
        """
371
        # Get all active channels
372
        chnl_activation = self.get_active_channels()
373
        channel_numbers = sorted(int(chnl.split('_ch')[1]) for chnl in chnl_activation if
374
                                 chnl.startswith('a') and chnl_activation[chnl])
375
376
        # Get assets per channel
377
        loaded_assets = dict()
378
        current_type = None
379
        for chnl_num in channel_numbers:
380
            # Ask AWG for currently loaded waveform or sequence. The answer for a waveform will
381
            # look like
382
            # FIXME: What does an AWG7000 return with this query?
383
            asset_name = self.query('SOUR1:FUNC:USER?')
384
385
        return loaded_assets, current_type
386
387
    def clear_all(self):
388
        """ Clears all loaded waveforms from the pulse generators RAM/workspace.
389
390
        @return int: error code (0:OK, -1:error)
391
        """
392
        self.write('WLIS:WAV:DEL ALL')
393
        self.__loaded_sequence = ''
394
        return 0
395
396
    def get_status(self):
397
        """ Retrieves the status of the pulsing hardware
398
399
        @return (int, dict): inter value of the current status with the
400
                             corresponding dictionary containing status
401
                             description for all the possible status variables
402
                             of the pulse generator hardware
403
        """
404
        status_dic = {-1: 'Failed Request or Communication',
405
                      0: 'Device has stopped, but can receive commands',
406
                      1: 'Device is active and running',
407
                      2: 'Device is waiting for trigger.'}
408
        current_status = -1 if self.awg is None else int(self.query('AWGC:RST?'))
409
        return current_status, status_dic
410
411
    def get_sample_rate(self):
412
        """ Get the sample rate of the pulse generator hardware
413
414
        @return float: The current sample rate of the device (in Hz)
415
416
        Do not return a saved sample rate from an attribute, but instead retrieve the current
417
        sample rate directly from the device.
418
        """
419
        return float(self.query('SOUR1:FREQ?'))
420
421
    def set_sample_rate(self, sample_rate):
422
        """ Set the sample rate of the pulse generator hardware.
423
424
        @param float sample_rate: The sampling rate to be set (in Hz)
425
426
        @return float: the sample rate returned from the device (in Hz).
427
428
        Note: After setting the sampling rate of the device, use the actually set return value for
429
              further processing.
430
        """
431
        self.write('SOUR1:FREQ {0:.4G}MHz\n'.format(sample_rate / 1e6))
432
        while int(self.query('*OPC?')) != 1:
433
            time.sleep(0.1)
434
        # Here we need to wait, because when the sampling rate is changed AWG is busy
435
        # and therefore the ask in get_sample_rate will return an empty string.
436
        time.sleep(1)
437
        return self.get_sample_rate()
438
439
    # def load_asset(self, asset_name, load_dict=None):
440
    #     """ Loads a sequence or waveform to the specified channel of the pulsing
441
    #         device.
442
    #
443
    #     @param str asset_name: The name of the asset to be loaded
444
    #
445
    #     @param dict load_dict:  a dictionary with keys being one of the
446
    #                             available channel numbers and items being the
447
    #                             name of the already sampled
448
    #                             waveform/sequence files.
449
    #                             Examples:   {1: rabi_Ch1, 2: rabi_Ch2}
450
    #                                         {1: rabi_Ch2, 2: rabi_Ch1}
451
    #                             This parameter is optional. If none is given
452
    #                             then the channel association is invoked from
453
    #                             the sequence generation,
454
    #                             i.e. the filename appendix (_Ch1, _Ch2 etc.)
455
    #
456
    #     @return int: error code (0:OK, -1:error)
457
    #
458
    #     Unused for digital pulse generators without sequence storage capability
459
    #     (PulseBlaster, FPGA).
460
    #     """
461
    #     if load_dict is None:
462
    #         load_dict = {}
463
    #     path = self.ftp_path + self.get_asset_dir_on_device()
464
    #
465
    #     # Find all files associated with the specified asset name
466
    #     file_list = self._get_filenames_on_device()
467
    #     filename = []
468
    #
469
    #     # Get current channel activation state to be restored after loading the asset
470
    #     chnl_activation = self.get_active_channels()
471
    #
472
    #     if (asset_name + '.seq') in file_list:
473
    #         file_name = asset_name + '.seq'
474
    #
475
    #         # self.tell('MMEMORY:IMPORT "{0}","{1}",SEQ \n'.format(asset_name , asset_name + '.seq'))
476
    #         self.tell('SOUR1:FUNC:USER "{0!s}/{1!s}"\n'.format(path, file_name))
477
    #         # self.tell('SOUR1:FUNC:USER "{0}/{1}"\n'.format(path, file_name))
478
    #         # set the AWG to the event jump mode:
479
    #         self.tell('AWGCONTROL:EVENT:JMODE EJUMP')
480
    #
481
    #         self.current_loaded_asset = asset_name
482
    #     else:
483
    #
484
    #         for file in file_list:
485
    #
486
    #             if file == asset_name + '_ch1.wfm':
487
    #                 #load into workspace
488
    #                 self.tell('MMEMORY:IMPORT "{0}","{1}",WFM \n'.format(asset_name +'_ch1', asset_name + '_ch1.wfm'))
489
    #                 #load into channel
490
    #                 self.tell('SOUR1:WAVEFORM "{0}"\n'.format(asset_name + '_ch1'))
491
    #                 self.log.debug('Ch1 loaded: "{0}"'.format(asset_name))
492
    #                 filename.append(file)
493
    #             elif file == asset_name + '_ch2.wfm':
494
    #                 self.tell('MMEMORY:IMPORT "{0}","{1}",WFM \n'.format(asset_name + '_ch2', asset_name + '_ch2.wfm'))
495
    #                 self.tell('SOUR2:WAVEFORM "{0}"\n'.format(asset_name + '_ch2'))
496
    #                 self.log.debug('Ch2 loaded: "{0}"'.format(asset_name))
497
    #                 filename.append(file)
498
    #
499
    #         if load_dict == {} and filename == []:
500
    #             self.log.warning('No file and channel provided for load!\nCorrect that!\n'
501
    #                              'Command will be ignored.')
502
    #
503
    #     # for channel_num in list(load_dict):
504
    #         #asset_name = str(load_dict[channel_num])
505
    #         #self.tell('MMEMORY:IMPORT "{0}","{1}",WFM \n'.format(asset_name + '_ch{0}'.format(int(channel_num)), asset_name + '_ch{0}.wfm'.format(int(channel_num))))
506
    #         #self.tell('SOUR1:WAVEFORM "{0}"\n'.format(asset_name + '_ch{0}'.format(int(channel_num))))
507
    #
508
    #     #if len(list(load_dict)) > 0:
509
    #     self.current_loaded_asset = asset_name
510
    #
511
    #     # Restore channel activation state
512
    #     self.set_active_channels(chnl_activation)
513
    #     return 0
514
515
516
517
        # file_list = self._get_filenames_on_device()
518
        # filename = []
519
        #
520
        # for file in file_list:
521
        #     if file == asset_name+'_ch1.wfm' or file == asset_name+'_ch2.wfm':
522
        #         filename.append(file)
523
        #
524
        #
525
        # # Check if something could be found
526
        # if len(filename) == 0:
527
        #     self.log.error('No files associated with asset {0} were found on AWG7122c.'
528
        #                 'Load to channels failed!'.format(asset_name)
529
        #                 )        #         if asset.split("_")[-1][:3] == 'ch1':
530
        #             self.tell('SOUR1:WAVEFORM "{0}"\n'.format(asset[:-4]))
531
        #         if asset.split("_")[-1][:3] == 'ch2':
532
        #             self.tell('SOUR2:WAVEFORM "{0}"\n'.format(asset[:-4]))
533
        #         self.current_loaded_asset = asset_name
534
        # else:
535
        #     for channel in load_dict:
536
        #     return -1
537
        #
538
        # self.log.info('The following files associated with the asset {0} were found on AWG7122c:\n'
539
        #             '"{1}"'.format(asset_name, filename))
540
        #
541
        # # load files in AWG Waveform list
542
        # for asset in filename:
543
        #     if asset.endswith('.wfm'):
544
        #         self.tell('MMEMORY:IMPORT "{0}","{1}",WFM \n'.format(asset[:-4], asset))
545
        #     else:
546
        #         self.log.error('Could not load asset {0} to AWG7122c:\n'
547
        #             '"{1}"'.format(asset_name, filename))
548
        #
549
        # file_path = self.ftp_path + self.get_asset_dir_on_device()
550
        # # simply use the channel association of the filenames if no load_dict is given
551
        # if load_dict == {}:
552
        #     for asset in filename:
553
        #         # load waveforms into channels as given in filename
554
555
        #         # load waveforms into channels
556
        #         name = load_dict[channel]
557
        #         self.tell('SOUR'+str(channel)+':FUNC:USER "{0}/{1}"\n'.format(file_path, name))
558
        #     self.current_loaded_asset = name
559
        #
560
        # return 0
561
562
    def get_analog_level(self, amplitude=None, offset=None):
563
        """ Retrieve the analog amplitude and offset of the provided channels.
564
565
        @param list amplitude: optional, if the amplitude value (in Volt peak to peak, i.e. the
566
                               full amplitude) of a specific channel is desired.
567
        @param list offset: optional, if the offset value (in Volt) of a specific channel is
568
                            desired.
569
570
        @return: (dict, dict): tuple of two dicts, with keys being the channel descriptor string
571
                               (i.e. 'a_ch1') and items being the values for those channels.
572
                               Amplitude is always denoted in Volt-peak-to-peak and Offset in volts.
573
574
        Note: Do not return a saved amplitude and/or offset value but instead retrieve the current
575
              amplitude and/or offset directly from the device.
576
577
        If nothing (or None) is passed then the levels of all channels will be returned. If no
578
        analog channels are present in the device, return just empty dicts.
579
580
        Example of a possible input:
581
            amplitude = ['a_ch1', 'a_ch4'], offset = None
582
        to obtain the amplitude of channel 1 and 4 and the offset of all channels
583
            {'a_ch1': -0.5, 'a_ch4': 2.0} {'a_ch1': 0.0, 'a_ch2': 0.0, 'a_ch3': 1.0, 'a_ch4': 0.0}
584
        """
585
        # FIXME: No sanity checking done here with constraints
586
        amp = dict()
587
        off = dict()
588
589
        chnl_list = self._get_all_analog_channels()
590
591
        # get pp amplitudes
592 View Code Duplication
        if amplitude is None:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
593
            for ch_num, chnl in enumerate(chnl_list):
594
                amp[chnl] = float(self.query('SOUR{0:d}:VOLT:AMPL?'.format(ch_num + 1)))
595
        else:
596
            for chnl in amplitude:
597
                if chnl in chnl_list:
598
                    ch_num = int(chnl.rsplit('_ch', 1)[1])
599
                    amp[chnl] = float(self.query('SOUR{0:d}:VOLT:AMPL?'.format(ch_num)))
600
                else:
601
                    self.log.warning('Get analog amplitude from AWG7122c channel "{0}" failed. '
602
                                     'Channel non-existent.'.format(chnl))
603
604
        # get voltage offsets
605
        no_offset = '02' in self.installed_options or '06' in self.installed_options
606 View Code Duplication
        if offset is None:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
607
            for ch_num, chnl in enumerate(chnl_list):
608
                off[chnl] = 0.0 if no_offset else float(
609
                    self.query('SOUR{0:d}:VOLT:OFFS?'.format(ch_num)))
610
        else:
611
            for chnl in offset:
612
                if chnl in chnl_list:
613
                    ch_num = int(chnl.rsplit('_ch', 1)[1])
614
                    off[chnl] = 0.0 if no_offset else float(
615
                        self.query('SOUR{0:d}:VOLT:OFFS?'.format(ch_num)))
616
                else:
617
                    self.log.warning('Get analog offset from AWG7122c channel "{0}" failed. '
618
                                     'Channel non-existent.'.format(chnl))
619
        return amp, off
620
621 View Code Duplication
    def set_analog_level(self, amplitude=None, offset=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
622
        """ Set amplitude and/or offset value of the provided analog channel(s).
623
624
        @param dict amplitude: dictionary, with key being the channel descriptor string
625
                               (i.e. 'a_ch1', 'a_ch2') and items being the amplitude values
626
                               (in Volt peak to peak, i.e. the full amplitude) for the desired
627
                               channel.
628
        @param dict offset: dictionary, with key being the channel descriptor string
629
                            (i.e. 'a_ch1', 'a_ch2') and items being the offset values
630
                            (in absolute volt) for the desired channel.
631
632
        @return (dict, dict): tuple of two dicts with the actual set values for amplitude and
633
                              offset for ALL channels.
634
635
        If nothing is passed then the command will return the current amplitudes/offsets.
636
637
        Note: After setting the amplitude and/or offset values of the device, use the actual set
638
              return values for further processing.
639
        """
640
        # Check the inputs by using the constraints...
641
        constraints = self.get_constraints()
642
        # ...and the available analog channels
643
        analog_channels = self._get_all_analog_channels()
644
645
        # amplitude sanity check
646
        if amplitude is not None:
647
            for chnl in amplitude:
648
                ch_num = int(chnl.rsplit('_ch', 1)[1])
649
                if chnl not in analog_channels:
650
                    self.log.warning('Channel to set (a_ch{0}) not available in AWG.\nSetting '
651
                                     'analogue voltage for this channel ignored.'.format(chnl))
652
                    del amplitude[chnl]
653
                if amplitude[chnl] < constraints.a_ch_amplitude.min:
654
                    self.log.warning('Minimum Vpp for channel "{0}" is {1}. Requested Vpp of {2}V '
655
                                     'was ignored and instead set to min value.'
656
                                     ''.format(chnl, constraints.a_ch_amplitude.min,
657
                                               amplitude[chnl]))
658
                    amplitude[chnl] = constraints.a_ch_amplitude.min
659
                elif amplitude[chnl] > constraints.a_ch_amplitude.max:
660
                    self.log.warning('Maximum Vpp for channel "{0}" is {1}. Requested Vpp of {2}V '
661
                                     'was ignored and instead set to max value.'
662
                                     ''.format(chnl, constraints.a_ch_amplitude.max,
663
                                               amplitude[chnl]))
664
                    amplitude[chnl] = constraints.a_ch_amplitude.max
665
        # offset sanity check
666
        if offset is not None:
667
            for chnl in offset:
668
                ch_num = int(chnl.rsplit('_ch', 1)[1])
669
                if chnl not in analog_channels:
670
                    self.log.warning('Channel to set (a_ch{0}) not available in AWG.\nSetting '
671
                                     'offset voltage for this channel ignored.'.format(chnl))
672
                    del offset[chnl]
673
                if offset[chnl] < constraints.a_ch_offset.min:
674
                    self.log.warning('Minimum offset for channel "{0}" is {1}. Requested offset of '
675
                                     '{2}V was ignored and instead set to min value.'
676
                                     ''.format(chnl, constraints.a_ch_offset.min, offset[chnl]))
677
                    offset[chnl] = constraints.a_ch_offset.min
678
                elif offset[chnl] > constraints.a_ch_offset.max:
679
                    self.log.warning('Maximum offset for channel "{0}" is {1}. Requested offset of '
680
                                     '{2}V was ignored and instead set to max value.'
681
                                     ''.format(chnl, constraints.a_ch_offset.max,
682
                                               offset[chnl]))
683
                    offset[chnl] = constraints.a_ch_offset.max
684
685
        if amplitude is not None:
686
            for a_ch in amplitude:
687
                ch_num = int(chnl.rsplit('_ch', 1)[1])
688
                self.write('SOUR{0:d}:VOLT:AMPL {1}'.format(ch_num, amplitude[a_ch]))
689
                while int(self.query('*OPC?')) != 1:
690
                    time.sleep(0.1)
691
692
        no_offset = '02' in self.installed_options or '06' in self.installed_options
693
        if offset is not None and not no_offset:
694
            for a_ch in offset:
695
                ch_num = int(chnl.rsplit('_ch', 1)[1])
696
                self.write('SOUR{0:d}:VOLT:OFFSET {1}'.format(ch_num, offset[a_ch]))
697
                while int(self.query('*OPC?')) != 1:
698
                    time.sleep(0.1)
699
        return self.get_analog_level()
700
701 View Code Duplication
    def get_digital_level(self, low=None, high=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
702
        """ Retrieve the digital low and high level of the provided/all channels.
703
704
        @param list low: optional, if the low value (in Volt) of a specific channel is desired.
705
        @param list high: optional, if the high value (in Volt) of a specific channel is desired.
706
707
        @return: (dict, dict): tuple of two dicts, with keys being the channel descriptor strings
708
                               (i.e. 'd_ch1', 'd_ch2') and items being the values for those
709
                               channels. Both low and high value of a channel is denoted in volts.
710
711
        Note: Do not return a saved low and/or high value but instead retrieve
712
              the current low and/or high value directly from the device.
713
714
        If nothing (or None) is passed then the levels of all channels are being returned.
715
        If no digital channels are present, return just an empty dict.
716
717
        Example of a possible input:
718
            low = ['d_ch1', 'd_ch4']
719
        to obtain the low voltage values of digital channel 1 an 4. A possible answer might be
720
            {'d_ch1': -0.5, 'd_ch4': 2.0} {'d_ch1': 1.0, 'd_ch2': 1.0, 'd_ch3': 1.0, 'd_ch4': 4.0}
721
        Since no high request was performed, the high values for ALL channels are returned (here 4).
722
        """
723
        low_val = {}
724
        high_val = {}
725
726
        digital_channels = self._get_all_digital_channels()
727
728
        if low is None:
729
            low = digital_channels
730
        if high is None:
731
            high = digital_channels
732
733
        # get low marker levels
734
        for chnl in low:
735
            if chnl not in digital_channels:
736
                continue
737
            d_ch_number = int(chnl.rsplit('_ch', 1)[1])
738
            a_ch_number = (1 + d_ch_number) // 2
739
            marker_index = 2 - (d_ch_number % 2)
740
            low_val[chnl] = float(
741
                self.query('SOUR{0:d}:MARK{1:d}:VOLT:LOW?'.format(a_ch_number, marker_index)))
742
        # get high marker levels
743
        for chnl in high:
744
            if chnl not in digital_channels:
745
                continue
746
            d_ch_number = int(chnl.rsplit('_ch', 1)[1])
747
            a_ch_number = (1 + d_ch_number) // 2
748
            marker_index = 2 - (d_ch_number % 2)
749
            high_val[chnl] = float(
750
                self.query('SOUR{0:d}:MARK{1:d}:VOLT:HIGH?'.format(a_ch_number, marker_index)))
751
752
        return low_val, high_val
753
754
    def set_digital_level(self, low=None, high=None):
755
        """ Set low and/or high value of the provided digital channel.
756
757
        @param dict low: dictionary, with key being the channel and items being
758
                         the low values (in volt) for the desired channel.
759
        @param dict high: dictionary, with key being the channel and items being
760
                         the high values (in volt) for the desired channel.
761
762
        @return (dict, dict): tuple of two dicts where first dict denotes the
763
                              current low value and the second dict the high
764
                              value.
765
766
        If nothing is passed then the command will return two empty dicts.
767
768
        Note: After setting the high and/or low values of the device, retrieve
769
              them again for obtaining the actual set value(s) and use that
770
              information for further processing.
771
772
        The major difference to analog signals is that digital signals are
773
        either ON or OFF, whereas analog channels have a varying amplitude
774
        range. In contrast to analog output levels, digital output levels are
775
        defined by a voltage, which corresponds to the ON status and a voltage
776
        which corresponds to the OFF status (both denoted in (absolute) voltage)
777
778
        In general there is no bijective correspondence between
779
        (amplitude, offset) and (value high, value low)!
780
        """
781
        # If you want to check the input use the constraints:
782
        # constraints = self.get_constraints()
783
        #
784
        # for d_ch, value in low.items():
785
        #     #FIXME: Tell the device the proper digital voltage low value:
786
        #     # self.tell('SOURCE1:MARKER{0}:VOLTAGE:LOW {1}'.format(d_ch, low[d_ch]))
787
        #     pass
788
        #
789
        # for d_ch, value in high.items():
790
        #     #FIXME: Tell the device the proper digital voltage high value:
791
        #     # self.tell('SOURCE1:MARKER{0}:VOLTAGE:HIGH {1}'.format(d_ch, high[d_ch]))
792
        #     pass
793
        return self.get_digital_level()
794
795
    def get_active_channels(self, ch=None):
796
        """ Get the active channels of the pulse generator hardware.
797
798
        @param list ch: optional, if specific analog or digital channels are needed to be asked
799
                        without obtaining all the channels.
800
801
        @return dict:  where keys denoting the channel string and items boolean expressions whether
802
                       channel are active or not.
803
804
        Example for an possible input (order is not important):
805
            ch = ['a_ch2', 'd_ch2', 'a_ch1', 'd_ch5', 'd_ch1']
806
        then the output might look like
807
            {'a_ch2': True, 'd_ch2': False, 'a_ch1': False, 'd_ch5': True, 'd_ch1': False}
808
809
        If no parameter (or None) is passed to this method all channel states will be returned.
810
        """
811
        # If you want to check the input use the constraints:
812
        # constraints = self.get_constraints()
813
814
        analog_channels = self._get_all_analog_channels()
815
816
        active_ch = dict()
817
        for ch_num, a_ch in enumerate(analog_channels):
818
            ch_num = ch_num + 1
819
            # check what analog channels are active
820
            active_ch[a_ch] = bool(int(self.query('OUTPUT{0:d}:STATE?'.format(ch_num))))
821
            # check how many markers are active on each channel, i.e. the DAC resolution
822 View Code Duplication
            if active_ch[a_ch]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
823
                digital_mrk = 10 - int(self.query('SOUR{0:d}:DAC:RES?'.format(ch_num)))
824
                if digital_mrk == 2:
825
                    active_ch['d_ch{0:d}'.format(ch_num * 2)] = True
826
                    active_ch['d_ch{0:d}'.format(ch_num * 2 - 1)] = True
827
                else:
828
                    active_ch['d_ch{0:d}'.format(ch_num * 2)] = False
829
                    active_ch['d_ch{0:d}'.format(ch_num * 2 - 1)] = False
830
            else:
831
                active_ch['d_ch{0:d}'.format(ch_num * 2)] = False
832
                active_ch['d_ch{0:d}'.format(ch_num * 2 - 1)] = False
833
834
        # return either all channel information or just the one asked for.
835
        if ch is not None:
836
            chnl_to_delete = [chnl for chnl in active_ch if chnl not in ch]
837
            for chnl in chnl_to_delete:
838
                del active_ch[chnl]
839
        return active_ch
840
841
    def set_active_channels(self, ch=None):
842
        """ Set the active channels for the pulse generator hardware.
843
844
        @param dict ch: dictionary with keys being the analog or digital string generic names for
845
                        the channels (i.e. 'd_ch1', 'a_ch2') with items being a boolean value.
846
                        True: Activate channel, False: Deactivate channel
847
848
        @return dict: with the actual set values for ALL active analog and digital channels
849
850
        If nothing is passed then the command will simply return the unchanged current state.
851
852
        Note: After setting the active channels of the device,
853
              use the returned dict for further processing.
854
855
        Example for possible input:
856
            ch={'a_ch2': True, 'd_ch1': False, 'd_ch3': True, 'd_ch4': True}
857
        to activate analog channel 2 digital channel 3 and 4 and to deactivate
858
        digital channel 1.
859
860
        The hardware itself has to handle, whether separate channel activation is possible.
861
        """
862
        current_channel_state = self.get_active_channels()
863
864
        if ch is None:
865
            return current_channel_state
866
867
        if not set(current_channel_state).issuperset(ch):
868
            self.log.error('Trying to (de)activate channels that are not present in AWG.\n'
869
                           'Setting of channel activation aborted.')
870
            return current_channel_state
871
872
        # Determine new channel activation states
873
        new_channels_state = current_channel_state.copy()
874
        for chnl in ch:
875
            new_channels_state[chnl] = ch[chnl]
876
877
        # check if the channels to set are part of the activation_config constraints
878
        constraints = self.get_constraints()
879
        new_active_channels = {chnl for chnl in new_channels_state if new_channels_state[chnl]}
880
        if new_active_channels not in constraints.activation_config.values():
881
            self.log.error('activation_config to set ({0}) is not allowed according to constraints.'
882
                           ''.format(new_active_channels))
883
            return current_channel_state
884
885
        # get lists of all analog channels
886
        analog_channels = self._get_all_analog_channels()
887
888
        # calculate dac resolution for each analog channel and set it in hardware.
889
        # Also (de)activate the analog channels accordingly
890
        for a_ch in analog_channels:
891
            ach_num = int(a_ch.rsplit('_ch', 1)[1])
892
            # determine number of markers for current a_ch
893
            if new_channels_state['d_ch{0:d}'.format(2 * ach_num)]:
894
                marker_num = 2
895
            else:
896
                marker_num = 0
897
            # set DAC resolution for this channel
898
            dac_res = 10 - marker_num
899
            self.write('SOUR{0:d}:DAC:RES {1:d}'.format(ach_num, dac_res))
900
            # (de)activate the analog channel
901
            if new_channels_state[a_ch]:
902
                self.write('OUTPUT{0:d}:STATE ON'.format(ach_num))
903
            else:
904
                self.write('OUTPUT{0:d}:STATE OFF'.format(ach_num))
905
        return self.get_active_channels()
906
907
    def write_waveform(self, name, analog_samples, digital_samples, is_first_chunk, is_last_chunk,
908
                       total_number_of_samples):
909
        """
910
        Write a new waveform or append samples to an already existing waveform on the device memory.
911
        The flags is_first_chunk and is_last_chunk can be used as indicator if a new waveform should
912
        be created or if the write process to a waveform should be terminated.
913
914
        @param name: str, the name of the waveform to be created/append to
915
        @param analog_samples: numpy.ndarray of type float32 containing the voltage samples
916
        @param digital_samples: numpy.ndarray of type bool containing the marker states
917
                                (if analog channels are active, this must be the same length as
918
                                analog_samples)
919
        @param is_first_chunk: bool, flag indicating if it is the first chunk to write.
920
                                     If True this method will create a new empty wavveform.
921
                                     If False the samples are appended to the existing waveform.
922
        @param is_last_chunk: bool, flag indicating if it is the last chunk to write.
923
                                    Some devices may need to know when to close the appending wfm.
924
        @param total_number_of_samples: int, The number of sample points for the entire waveform
925
                                        (not only the currently written chunk)
926
927
        @return: (int, list) number of samples written (-1 indicates failed process) and list of
928
                             created waveform names
929
        """
930
        waveforms = list()
931
932
        # Sanity checks
933
        constraints = self.get_constraints()
934
935
        if len(analog_samples) == 0:
936
            self.log.error('No analog samples passed to write_waveform method in awg7122c.')
937
            return -1, waveforms
938
939
        if total_number_of_samples < constraints.waveform_length.min:
940
            self.log.error('Unable to write waveform.\nNumber of samples to write ({0:d}) is '
941
                           'smaller than the allowed minimum waveform length ({1:d}).'
942
                           ''.format(total_number_of_samples, constraints.waveform_length.min))
943
            return -1, waveforms
944
        if total_number_of_samples > constraints.waveform_length.max:
945
            self.log.error('Unable to write waveform.\nNumber of samples to write ({0:d}) is '
946
                           'greater than the allowed maximum waveform length ({1:d}).'
947
                           ''.format(total_number_of_samples, constraints.waveform_length.max))
948
            return -1, waveforms
949
950
        # determine active channels
951
        activation_dict = self.get_active_channels()
952
        active_channels = {chnl for chnl in activation_dict if activation_dict[chnl]}
953
        active_analog = sorted(chnl for chnl in active_channels if chnl.startswith('a'))
954
955
        # Sanity check of channel numbers
956
        if active_channels != set(analog_samples.keys()).union(set(digital_samples.keys())):
957
            self.log.error('Mismatch of channel activation and sample array dimensions for '
958
                           'waveform creation.\nChannel activation is: {0}\nSample arrays have: '
959
                           ''.format(active_channels,
960
                                     set(analog_samples.keys()).union(set(digital_samples.keys()))))
961
            return -1, waveforms
962
963
        # Write waveforms. One for each analog channel.
964
        for a_ch in active_analog:
965
            # Get the integer analog channel number
966
            a_ch_num = int(a_ch.rsplit('ch', 1)[1])
967
            # Get the digital channel specifiers belonging to this analog channel markers
968
            mrk_ch_1 = 'd_ch{0:d}'.format(a_ch_num * 2 - 1)
969
            mrk_ch_2 = 'd_ch{0:d}'.format(a_ch_num * 2)
970
971
            start = time.time()
972
            # Encode marker information in an array of bytes (uint8). Avoid intermediate copies!!!
973
            if mrk_ch_1 in digital_samples and mrk_ch_2 in digital_samples:
974
                mrk_bytes = digital_samples[mrk_ch_2].view('uint8')
975
                tmp_bytes = digital_samples[mrk_ch_1].view('uint8')
976
                np.left_shift(mrk_bytes, 7, out=mrk_bytes)
977
                np.left_shift(tmp_bytes, 6, out=tmp_bytes)
978
                np.add(mrk_bytes, tmp_bytes, out=mrk_bytes)
979
            else:
980
                mrk_bytes = None
981
            print('Prepare digital channel data: {0}'.format(time.time() - start))
982
983
            # Create waveform name string
984
            wfm_name = '{0}_ch{1:d}'.format(name, a_ch_num)
985
986
            # Write WFM file for waveform
987
            start = time.time()
988
            self._write_wfm(filename=wfm_name,
989
                            analog_samples=analog_samples[a_ch],
990
                            digital_samples=mrk_bytes,
991
                            is_first_chunk=is_first_chunk,
992
                            is_last_chunk=is_last_chunk,
993
                            total_number_of_samples=total_number_of_samples)
994
995
            print('Write WFM file: {0}'.format(time.time() - start))
996
997
            # transfer waveform to AWG and load into workspace
998
            start = time.time()
999
            self._send_file(filename=wfm_name + '.wfm')
1000
            print('Send WFM file: {0}'.format(time.time() - start))
1001
1002
            start = time.time()
1003
            self.write('MMEM:IMP "{0}","{1}",WFM'.format(wfm_name, wfm_name + '.wfm'))
1004
            # Wait for everything to complete
1005
            while int(self.query('*OPC?')) != 1:
1006
                time.sleep(0.2)
1007
            # Just to make sure
1008
            while wfm_name not in self.get_waveform_names():
1009
                time.sleep(0.2)
1010
            print('Load WFM file into workspace: {0}'.format(time.time() - start))
1011
1012
            # Append created waveform name to waveform list
1013
            waveforms.append(wfm_name)
1014
        return total_number_of_samples, waveforms
1015
1016
    def write_sequence(self, name, sequence_parameters):
1017
        """
1018
        Write a new sequence on the device memory.
1019
1020
        @param name: str, the name of the waveform to be created/append to
1021
        @param sequence_parameters: dict, dictionary containing the parameters for a sequence
1022
1023
        @return: int, number of sequence steps written (-1 indicates failed process)
1024
        """
1025
        # Check if device has sequencer option installed
1026
        if not self.has_sequence_mode():
1027
            self.log.error('Direct sequence generation in AWG not possible. Sequencer option not '
1028
                           'installed.')
1029
            return -1
1030
        # FIXME: I can not possibly implement that without the hardware to test it.
1031
        return -1
1032
1033
    def get_waveform_names(self):
1034
        """ Retrieve the names of all uploaded waveforms on the device.
1035
1036
        @return list: List of all uploaded waveform name strings in the device workspace.
1037
        """
1038
        wfm_list_len = int(self.query('WLIS:SIZE?'))
1039
        wfm_list = list()
1040
        for index in range(1, wfm_list_len + 1):
1041
            wfm_list.append(self.query('WLIS:NAME? {0:d}'.format(index)))
1042
        return sorted(wfm_list)
1043
1044
    def get_sequence_names(self):
1045
        """ Retrieve the names of all uploaded sequence on the device.
1046
1047
        @return list: List of all uploaded sequence name strings in the device workspace.
1048
        """
1049
        # FIXME: No idea without hardware to test
1050
        return list()
1051
1052
    def delete_waveform(self, waveform_name):
1053
        """ Delete the waveform with name "waveform_name" from the device memory.
1054
1055
        @param str waveform_name: The name of the waveform to be deleted
1056
                                  Optionally a list of waveform names can be passed.
1057
1058
        @return list: a list of deleted waveform names.
1059
        """
1060
        if isinstance(waveform_name, str):
1061
            waveform_name = [waveform_name]
1062
1063
        avail_waveforms = self.get_waveform_names()
1064
        deleted_waveforms = list()
1065
        for waveform in waveform_name:
1066
            if waveform in avail_waveforms:
1067
                self.write('WLIS:WAV:DEL "{0}"'.format(waveform))
1068
                deleted_waveforms.append(waveform)
1069
        return sorted(deleted_waveforms)
1070
1071
    def delete_sequence(self, sequence_name):
1072
        """ Delete the sequence with name "sequence_name" from the device memory.
1073
1074
        @param str sequence_name: The name of the sequence to be deleted
1075
                                  Optionally a list of sequence names can be passed.
1076
1077
        @return list: a list of deleted sequence names.
1078
        """
1079
        # FIXME: Again... no idea without hardware to play with
1080
        return list()
1081
1082
    def get_interleave(self):
1083
        """ Check whether Interleave is ON or OFF in AWG.
1084
1085
        @return bool: True: ON, False: OFF
1086
1087
        Will always return False for pulse generator hardware without interleave.
1088
        """
1089
        return bool(int(self.query('AWGC:INT:STAT?')))
1090
1091
    def set_interleave(self, state=False):
1092
        """ Turns the interleave of an AWG on or off.
1093
1094
        @param bool state: The state the interleave should be set to
1095
                           (True: ON, False: OFF)
1096
1097
        @return bool: actual interleave status (True: ON, False: OFF)
1098
1099
        Note: After setting the interleave of the device, retrieve the
1100
              interleave again and use that information for further processing.
1101
1102
        Unused for pulse generator hardware other than an AWG.
1103
        """
1104
        if not isinstance(state, bool):
1105
            return self.get_interleave()
1106
1107
        # if the interleave state should not be changed from the current state, do nothing.
1108
        if state is self.get_interleave():
1109
            return state
1110
1111
        self.write('AWGC:INT:STAT {0:d}'.format(int(state)))
1112
        while int(self.query('*OPC?')) != 1:
1113
            time.sleep(0.1)
1114
        return self.get_interleave()
1115
1116
    def write(self, command):
1117
        """ Sends a command string to the device.
1118
1119
        @param string command: string containing the command
1120
1121
        @return int: error code (0:OK, -1:error)
1122
        """
1123
        bytes_written, enum_status_code = self.awg.write(command)
1124
        return int(enum_status_code)
1125
1126
    def query(self, question):
1127
        """ Asks the device a 'question' and receive and return an answer from it.
1128
1129
        @param string question: string containing the command
1130
1131
        @return string: the answer of the device to the 'question' in a string
1132
        """
1133
        answer = self.awg.query(question)
1134
        answer = answer.strip()
1135
        answer = answer.rstrip('\n')
1136
        answer = answer.rstrip()
1137
        answer = answer.strip('"')
1138
        return answer
1139
1140
    def reset(self):
1141
        """ Reset the device.
1142
1143
        @return int: error code (0:OK, -1:error)
1144
        """
1145
        self.write('*RST')
1146
        self.write('*WAI')
1147
        return 0
1148
1149
    def has_sequence_mode(self):
1150
        """ Asks the pulse generator whether sequence mode exists.
1151
1152
        @return: bool, True for yes, False for no.
1153
        """
1154
        return True
1155
1156
    def set_lowpass_filter(self, a_ch, cutoff_freq):
1157
        """ Set a lowpass filter to the analog channels of the AWG.
1158
1159
        @param int a_ch: To which channel to apply, either 1 or 2.
1160
        @param cutoff_freq: Cutoff Frequency of the lowpass filter in Hz.
1161
        """
1162
        if a_ch not in (1, 2):
1163
            return
1164
        self.write('OUTPUT{0:d}:FILTER:LPASS:FREQUENCY {1:f}MHz'.format(a_ch, cutoff_freq / 1e6))
1165
1166
    def set_jump_timing(self, synchronous=False):
1167
        """Sets control of the jump timing in the AWG.
1168
1169
        @param bool synchronous: if True the jump timing will be set to synchornous, otherwise the
1170
                                 jump timing will be set to asynchronous.
1171
1172
        If the Jump timing is set to asynchornous the jump occurs as quickly as possible after an
1173
        event occurs (e.g. event jump tigger), if set to synchornous the jump is made after the
1174
        current waveform is output. The default value is asynchornous.
1175
        """
1176
        timing = 'SYNC' if synchronous else 'ASYNC'
1177
        self.write('EVEN:JTIM {0}'.format(timing))
1178
1179
    def set_mode(self, mode):
1180
        """Change the output mode of the AWG5000 series.
1181
1182
        @param str mode: Options for mode (case-insensitive):
1183
                            continuous - 'C'
1184
                            triggered  - 'T'
1185
                            gated      - 'G'
1186
                            sequence   - 'S'
1187
1188
        """
1189
        look_up = {'C': 'CONT',
1190
                   'T': 'TRIG',
1191
                   'G': 'GAT',
1192
                   'E': 'ENH',
1193
                   'S': 'SEQ'}
1194
        self.write('AWGC:RMOD {0!s}'.format(look_up[mode.upper()]))
1195
1196
    # works
1197
    def get_sequencer_mode(self, output_as_int=False):
1198
        """ Asks the AWG which sequencer mode it is using.
1199
1200
        @param: bool output_as_int: optional boolean variable to set the output
1201
        @return: str or int with the following meaning:
1202
                'HARD' or 0 indicates Hardware Mode
1203
                'SOFT' or 1 indicates Software Mode
1204
                'Error' or -1 indicates a failure of request
1205
1206
        It can be either in Hardware Mode or in Software Mode. The optional
1207
        variable output_as_int sets if the returned value should be either an
1208
        integer number or string.
1209
        """
1210
        message = self.query('AWGC:SEQ:TYPE?')
1211
        if 'HARD' in message:
1212
            return 0 if output_as_int else 'Hardware-Sequencer'
1213
        elif 'SOFT' in message:
1214
            return 1 if output_as_int else 'Software-Sequencer'
1215
        return -1 if output_as_int else 'Request-Error'
1216
1217
    def _delete_file(self, filename):
1218
        """
1219
1220
        @param str filename: The full filename to delete from FTP cwd
1221
        """
1222
        if filename in self._get_filenames_on_device():
1223
            with FTP(self._ip_address) as ftp:
1224
                ftp.login(user=self._username, passwd=self._password)
1225
                ftp.cwd(self.ftp_working_dir)
1226
                ftp.delete(filename)
1227
        return
1228
1229 View Code Duplication
    def _send_file(self, filename):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1230
        """
1231
1232
        @param filename:
1233
        @return:
1234
        """
1235
        # check input
1236
        if not filename:
1237
            self.log.error('No filename provided for file upload to awg!\nCommand will be ignored.')
1238
            return -1
1239
1240
        filepath = os.path.join(self._tmp_work_dir, filename)
1241
        if not os.path.isfile(filepath):
1242
            self.log.error('No file "{0}" found in "{1}". Unable to upload!'
1243
                           ''.format(filename, self._tmp_work_dir))
1244
            return -1
1245
1246
        # Delete old file on AWG by the same filename
1247
        self._delete_file(filename)
1248
1249
        # Transfer file
1250
        with FTP(self._ip_address) as ftp:
1251
            ftp.login(user=self._username, passwd=self._password)
1252
            ftp.cwd(self.ftp_working_dir)
1253
            with open(filepath, 'rb') as file:
1254
                ftp.storbinary('STOR ' + filename, file)
1255
        return 0
1256
1257 View Code Duplication
    def _get_filenames_on_device(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1258
        """
1259
1260
        @return list: filenames found in <ftproot>\\waves
1261
        """
1262
        filename_list = list()
1263
        with FTP(self._ip_address) as ftp:
1264
            ftp.login(user=self._username, passwd=self._password)
1265
            ftp.cwd(self.ftp_working_dir)
1266
            # get only the files from the dir and skip possible directories
1267
            log = list()
1268
            ftp.retrlines('LIST', callback=log.append)
1269
            for line in log:
1270
                if '<DIR>' not in line:
1271
                    # that is how a potential line is looking like:
1272
                    #   '05-10-16  05:22PM                  292 SSR aom adjusted.seq'
1273
                    # The first part consists of the date information. Remove this information and
1274
                    # separate the first number, which indicates the size of the file. This is
1275
                    # necessary if the filename contains whitespaces.
1276
                    size_filename = line[18:].lstrip()
1277
                    # split after the first appearing whitespace and take the rest as filename.
1278
                    # Remove for safety all trailing and leading whitespaces:
1279
                    filename = size_filename.split(' ', 1)[1].strip()
1280
                    filename_list.append(filename)
1281
        return filename_list
1282
1283
    def _get_all_channels(self):
1284
        """
1285
        Helper method to return a sorted list of all technically available channel descriptors
1286
        (e.g. ['a_ch1', 'a_ch2', 'd_ch1', 'd_ch2'])
1287
1288
        @return list: Sorted list of channels
1289
        """
1290
        avail_channels = ['a_ch1', 'd_ch1', 'd_ch2']
1291
        if not self.get_interleave():
1292
            avail_channels.extend(['a_ch2', 'd_ch3', 'd_ch4'])
1293
        return sorted(avail_channels)
1294
1295
    def _get_all_analog_channels(self):
1296
        """
1297
        Helper method to return a sorted list of all technically available analog channel
1298
        descriptors (e.g. ['a_ch1', 'a_ch2'])
1299
1300
        @return list: Sorted list of analog channels
1301
        """
1302
        return sorted(chnl for chnl in self._get_all_channels() if chnl.startswith('a'))
1303
1304
    def _get_all_digital_channels(self):
1305
        """
1306
        Helper method to return a sorted list of all technically available digital channel
1307
        descriptors (e.g. ['d_ch1', 'd_ch2'])
1308
1309
        @return list: Sorted list of digital channels
1310
        """
1311
        return sorted(chnl for chnl in self._get_all_channels() if chnl.startswith('d'))
1312
1313
    def _is_output_on(self):
1314
        """
1315
        Aks the AWG if the output is enabled, i.e. if the AWG is running
1316
1317
        @return bool: True: output on, False: output off
1318
        """
1319
        return bool(int(self.query('AWGC:RST?')))
1320
1321
    def _zeroing_enabled(self):
1322
        """
1323
        Checks if the zeroing option is enabled. Only available on devices with option '06'.
1324
1325
        @return bool: True: enabled, False: disabled
1326
        """
1327
        if '06' not in self.installed_options:
1328
            return False
1329
        return bool(int(self.query('AWGC:INT:ZER?')))
1330
1331
    def _write_wfm(self, filename, analog_samples, marker_bytes, is_first_chunk, is_last_chunk,
1332
                   total_number_of_samples):
1333
        """
1334
        Appends a sampled chunk of a whole waveform to a wfm-file. Create the file
1335
        if it is the first chunk.
1336
        If both flags (is_first_chunk, is_last_chunk) are set to TRUE it means
1337
        that the whole ensemble is written as a whole in one big chunk.
1338
1339
        @param filename: string, represents the name of the sampled waveform
1340
        @param analog_samples: dict containing float32 numpy ndarrays, contains the
1341
                                       samples for the analog channels that
1342
                                       are to be written by this function call.
1343
        @param marker_bytes: np.ndarray containing bool numpy ndarrays, contains the samples
1344
                                      for the digital channels that
1345
                                      are to be written by this function call.
1346
        @param total_number_of_samples: int, The total number of samples in the
1347
                                        entire waveform. Has to be known in advance.
1348
        @param is_first_chunk: bool, indicates if the current chunk is the
1349
                               first write to this file.
1350
        @param is_last_chunk: bool, indicates if the current chunk is the last
1351
                              write to this file.
1352
        """
1353
        # The memory overhead of the tmp file write/read process in bytes.
1354
        tmp_bytes_overhead = 104857600  # 100 MB
1355
        tmp_samples = tmp_bytes_overhead // 5
1356
        if tmp_samples > len(analog_samples):
1357
            tmp_samples = len(analog_samples)
1358
1359
        if not filename.endswith('.wfm'):
1360
            filename += '.wfm'
1361
        wfm_path = os.path.join(self._tmp_work_dir, filename)
1362
1363
        # if it is the first chunk, create the WFM file with header.
1364
        if is_first_chunk:
1365
            with open(wfm_path, 'wb') as wfm_file:
1366
                # write the first line, which is the header file, if first chunk is passed:
1367
                num_bytes = str(int(total_number_of_samples * 5))
1368
                num_digits = str(len(num_bytes))
1369
                header = 'MAGIC 1000\r\n#{0}{1}'.format(num_digits, num_bytes)
1370
                wfm_file.write(header.encode())
1371
1372
        # For the WFM file format unfortunately we need to write the digital sampels together
1373
        # with the analog samples. Therefore we need a temporary copy of all samples for each
1374
        # analog channel.
1375
        write_array = np.zeros(tmp_samples, dtype='float32, uint8')
1376
1377
        # Consecutively prepare and write chunks of maximal size tmp_bytes_overhead to file
1378
        samples_written = 0
1379
        with open(wfm_path, 'ab') as wfm_file:
1380
            while samples_written < len(analog_samples):
1381
                write_end = samples_written + write_array.size
1382
                # Prepare tmp write array
1383
                write_array['f0'] = analog_samples[samples_written:write_end]
1384
                if marker_bytes is not None:
1385
                    write_array['f1'] = marker_bytes[samples_written:write_end]
1386
                # Write to file
1387
                wfm_file.write(write_array)
1388
                # Increment write counter
1389
                samples_written = write_end
1390
                # Reduce write array size if
1391
                if 0 < total_number_of_samples - samples_written < write_array.size:
1392
                    write_array.resize(total_number_of_samples - samples_written)
1393
1394
        del write_array
1395
1396
        # append footer if it's the last chunk to write
1397
        if is_last_chunk:
1398
            # the footer encodes the sample rate, which was used for that file:
1399
            footer = 'CLOCK {0:16.10E}\r\n'.format(self.get_sample_rate())
1400
            with open(wfm_path, 'ab') as wfm_file:
1401
                wfm_file.write(footer.encode())
1402
        return
1403