Completed
Pull Request — master (#384)
by
unknown
02:02
created

AWG70K.get_digital_level()   C

Complexity

Conditions 7

Size

Total Lines 66

Duplication

Lines 66
Ratio 100 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 7
dl 66
loc 66
rs 5.9414
c 1
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# -*- coding: utf-8 -*-
2
3
"""
4
This file contains the Qudi hardware module for AWG70000 Series.
5
6
Qudi is free software: you can redistribute it and/or modify
7
it under the terms of the GNU General Public License as published by
8
the Free Software Foundation, either version 3 of the License, or
9
(at your option) any later version.
10
11
Qudi is distributed in the hope that it will be useful,
12
but WITHOUT ANY WARRANTY; without even the implied warranty of
13
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
GNU General Public License for more details.
15
16
You should have received a copy of the GNU General Public License
17
along with Qudi. If not, see <http://www.gnu.org/licenses/>.
18
19
Copyright (c) the Qudi Developers. See the COPYRIGHT.txt file at the
20
top-level directory of this distribution and at <https://github.com/Ulm-IQO/qudi/>
21
"""
22
23
24
import os
25
import time
26
import visa
27
import numpy as np
28
29
from collections import OrderedDict
30
from ftplib import FTP
31
from lxml import etree as ET
32
33
from core.module import Base, ConfigOption
34
from core.util.modules import get_home_dir
35
from interface.pulser_interface import PulserInterface, PulserConstraints
36
37
38
class AWG70K(Base, PulserInterface):
39
    """
40
41
    """
42
    _modclass = 'awg70k'
43
    _modtype = 'hardware'
44
45
    # config options
46
    _tmp_work_dir = ConfigOption(name='tmp_work_dir',
47
                                 default=os.path.join(get_home_dir(), 'pulsed_files'),
48
                                 missing='warn')
49
    _visa_address = ConfigOption(name='awg_visa_address', missing='error')
50
    _ip_address = ConfigOption(name='awg_ip_address', missing='error')
51
    _ftp_dir = ConfigOption(name='ftp_root_dir', default='C:\\inetpub\\ftproot', missing='warn')
52
    _username = ConfigOption(name='ftp_login', default='anonymous', missing='warn')
53
    _password = ConfigOption(name='ftp_passwd', default='anonymous@', missing='warn')
54
    _visa_timeout = ConfigOption(name='timeout', default=30, missing='nothing')
55
56
    # translation dict from qudi trigger descriptor to device command
57
    __event_triggers = {'OFF': 'OFF', 'A': 'ATR', 'B': 'BTR', 'INT': 'INT'}
58
59
    def __init__(self, *args, **kwargs):
60
        super().__init__(*args, **kwargs)
61
62
        # Get an instance of the visa resource manager
63
        self._rm = visa.ResourceManager()
64
65
        self.awg = None  # This variable will hold a reference to the awg visa resource
66
        self.awg_model = ''  # String describing the model
67
68
        self.ftp_working_dir = 'waves'  # subfolder of FTP root dir on AWG disk to work in
69
        return
70
71
    def on_activate(self):
72
        """ Initialisation performed during activation of the module.
73
        """
74
        # Create work directory if necessary
75
        if not os.path.exists(self._tmp_work_dir):
76
            os.makedirs(os.path.abspath(self._tmp_work_dir))
77
78
        # connect to awg using PyVISA
79
        if self._visa_address not in self._rm.list_resources():
80
            self.awg = None
81
            self.log.error('VISA address "{0}" not found by the pyVISA resource manager.\nCheck '
82
                           'the connection by using for example "Agilent Connection Expert".'
83
                           ''.format(self._visa_address))
84
        else:
85
            self.awg = self._rm.open_resource(self._visa_address)
86
            # set timeout by default to 30 sec
87
            self.awg.timeout = self._visa_timeout * 1000
88
89
        # try connecting to AWG using FTP protocol
90
        with FTP(self._ip_address) as ftp:
91
            ftp.login(user=self._username, passwd=self._password)
92
            ftp.cwd(self.ftp_working_dir)
93
94
        if self.awg is not None:
95
            self.awg_model = self.query('*IDN?').split(',')[1]
96
        else:
97
            self.awg_model = ''
98
        return
99
100
    def on_deactivate(self):
101
        """ Required tasks to be performed during deactivation of the module.
102
        """
103
        # Closes the connection to the AWG
104
        try:
105
            self.awg.close()
106
        except:
107
            self.log.debug('Closing AWG connection using pyvisa failed.')
108
        self.log.info('Closed connection to AWG')
109
        return
110
111
    def get_constraints(self):
112
        """
113
        Retrieve the hardware constrains from the Pulsing device.
114
115
        @return constraints object: object with pulser constraints as attributes.
116
117
        Provides all the constraints (e.g. sample_rate, amplitude, total_length_bins,
118
        channel_config, ...) related to the pulse generator hardware to the caller.
119
120
            SEE PulserConstraints CLASS IN pulser_interface.py FOR AVAILABLE CONSTRAINTS!!!
121
122
        If you are not sure about the meaning, look in other hardware files to get an impression.
123
        If still additional constraints are needed, then they have to be added to the
124
        PulserConstraints class.
125
126
        Each scalar parameter is an ScalarConstraints object defined in cor.util.interfaces.
127
        Essentially it contains min/max values as well as min step size, default value and unit of
128
        the parameter.
129
130
        PulserConstraints.activation_config differs, since it contain the channel
131
        configuration/activation information of the form:
132
            {<descriptor_str>: <channel_set>,
133
             <descriptor_str>: <channel_set>,
134
             ...}
135
136
        If the constraints cannot be set in the pulsing hardware (e.g. because it might have no
137
        sequence mode) just leave it out so that the default is used (only zeros).
138
        """
139
        constraints = PulserConstraints()
140
141
        if self.awg_model == 'AWG70002A':
142
            constraints.sample_rate.min = 1.5e3
143
            constraints.sample_rate.max = 25.0e9
144
            constraints.sample_rate.step = 5.0e2
145
            constraints.sample_rate.default = 25.0e9
146
        elif self.awg_model == 'AWG70001A':
147
            constraints.sample_rate.min = 3.0e3
148
            constraints.sample_rate.max = 50.0e9
149
            constraints.sample_rate.step = 1.0e3
150
            constraints.sample_rate.default = 50.0e9
151
152
        constraints.a_ch_amplitude.min = 0.25
153
        constraints.a_ch_amplitude.max = 0.5
154
        constraints.a_ch_amplitude.step = 0.001
155
        constraints.a_ch_amplitude.default = 0.5
156
        # FIXME: Enter the proper digital channel low constraints:
157
        constraints.d_ch_low.min = 0.0
158
        constraints.d_ch_low.max = 0.0
159
        constraints.d_ch_low.step = 0.0
160
        constraints.d_ch_low.default = 0.0
161
        # FIXME: Enter the proper digital channel high constraints:
162
        constraints.d_ch_high.min = 0.0
163
        constraints.d_ch_high.max = 1.4
164
        constraints.d_ch_high.step = 0.1
165
        constraints.d_ch_high.default = 1.4
166
167
        constraints.waveform_length.min = 1
168
        constraints.waveform_length.max = 8000000000
169
        constraints.waveform_length.step = 1
170
        constraints.waveform_length.default = 1
171
172
        # FIXME: Check the proper number for your device
173
        constraints.waveform_num.min = 1
174
        constraints.waveform_num.max = 32000
175
        constraints.waveform_num.step = 1
176
        constraints.waveform_num.default = 1
177
        # FIXME: Check the proper number for your device
178
        constraints.sequence_num.min = 1
179
        constraints.sequence_num.max = 4000
180
        constraints.sequence_num.step = 1
181
        constraints.sequence_num.default = 1
182
        # FIXME: Check the proper number for your device
183
        constraints.subsequence_num.min = 1
184
        constraints.subsequence_num.max = 8000
185
        constraints.subsequence_num.step = 1
186
        constraints.subsequence_num.default = 1
187
188
        # If sequencer mode is available then these should be specified
189
        constraints.repetitions.min = 0
190
        constraints.repetitions.max = 65536
191
        constraints.repetitions.step = 1
192
        constraints.repetitions.default = 0
193
        # ToDo: Check how many external triggers are available
194
        constraints.event_triggers = ['A', 'B']
195
        constraints.flags = ['A', 'B', 'C', 'D']
196
197
        constraints.sequence_steps.min = 0
198
        constraints.sequence_steps.max = 8000
199
        constraints.sequence_steps.step = 1
200
        constraints.sequence_steps.default = 0
201
202
        # the name a_ch<num> and d_ch<num> are generic names, which describe UNAMBIGUOUSLY the
203
        # channels. Here all possible channel configurations are stated, where only the generic
204
        # names should be used. The names for the different configurations can be customary chosen.
205
        activation_config = OrderedDict()
206
        if self.awg_model == 'AWG70002A':
207
            activation_config['all'] = {'a_ch1', 'd_ch1', 'd_ch2', 'a_ch2', 'd_ch3', 'd_ch4'}
208
            # Usage of both channels but reduced markers (higher analog resolution)
209
            activation_config['ch1_2mrk_ch2_1mrk'] = {'a_ch1', 'd_ch1', 'd_ch2', 'a_ch2', 'd_ch3'}
210
            activation_config['ch1_2mrk_ch2_0mrk'] = {'a_ch1', 'd_ch1', 'd_ch2', 'a_ch2'}
211
            activation_config['ch1_1mrk_ch2_2mrk'] = {'a_ch1', 'd_ch1', 'a_ch2', 'd_ch3', 'd_ch4'}
212
            activation_config['ch1_0mrk_ch2_2mrk'] = {'a_ch1', 'a_ch2', 'd_ch3', 'd_ch4'}
213
            activation_config['ch1_1mrk_ch2_1mrk'] = {'a_ch1', 'd_ch1', 'a_ch2', 'd_ch3'}
214
            activation_config['ch1_0mrk_ch2_1mrk'] = {'a_ch1', 'a_ch2', 'd_ch3'}
215
            activation_config['ch1_1mrk_ch2_0mrk'] = {'a_ch1', 'd_ch1', 'a_ch2'}
216
            # Usage of channel 1 only:
217
            activation_config['ch1_2mrk'] = {'a_ch1', 'd_ch1', 'd_ch2'}
218
            # Usage of channel 2 only:
219
            activation_config['ch2_2mrk'] = {'a_ch2', 'd_ch3', 'd_ch4'}
220
            # Usage of only channel 1 with one marker:
221
            activation_config['ch1_1mrk'] = {'a_ch1', 'd_ch1'}
222
            # Usage of only channel 2 with one marker:
223
            activation_config['ch2_1mrk'] = {'a_ch2', 'd_ch3'}
224
            # Usage of only channel 1 with no marker:
225
            activation_config['ch1_0mrk'] = {'a_ch1'}
226
            # Usage of only channel 2 with no marker:
227
            activation_config['ch2_0mrk'] = {'a_ch2'}
228
        elif self.awg_model == 'AWG70001A':
229
            activation_config['all'] = {'a_ch1', 'd_ch1', 'd_ch2'}
230
            # Usage of only channel 1 with one marker:
231
            activation_config['ch1_1mrk'] = {'a_ch1', 'd_ch1'}
232
            # Usage of only channel 1 with no marker:
233
            activation_config['ch1_0mrk'] = {'a_ch1'}
234
235
        constraints.activation_config = activation_config
236
237
        # FIXME: additional constraint really necessary?
238
        constraints.dac_resolution = {'min': 8, 'max': 10, 'step': 1, 'unit': 'bit'}
239
        return constraints
240
241
    def pulser_on(self):
242
        """ Switches the pulsing device on.
243
244
        @return int: error code (0:OK, -1:error, higher number corresponds to
245
                                 current status of the device. Check then the
246
                                 class variable status_dic.)
247
        """
248
        # do nothing if AWG is already running
249
        if not self._is_output_on():
250
            self.write('AWGC:RUN')
251
            # wait until the AWG is actually running
252
            while not self._is_output_on():
253
                time.sleep(0.25)
254
        return self.get_status()[0]
255
256
    def pulser_off(self):
257
        """ Switches the pulsing device off.
258
259
        @return int: error code (0:OK, -1:error, higher number corresponds to
260
                                 current status of the device. Check then the
261
                                 class variable status_dic.)
262
        """
263
        # do nothing if AWG is already idle
264
        if self._is_output_on():
265
            self.write('AWGC:STOP')
266
            # wait until the AWG has actually stopped
267
            while self._is_output_on():
268
                time.sleep(0.25)
269
        return self.get_status()[0]
270
271
    def write_waveform(self, name, analog_samples, digital_samples, is_first_chunk, is_last_chunk,
272
                       total_number_of_samples):
273
        """
274
        Write a new waveform or append samples to an already existing waveform on the device memory.
275
        The flags is_first_chunk and is_last_chunk can be used as indicator if a new waveform should
276
        be created or if the write process to a waveform should be terminated.
277
278
        @param name: str, the name of the waveform to be created/append to
279
        @param analog_samples: numpy.ndarray of type float32 containing the voltage samples
280
        @param digital_samples: numpy.ndarray of type bool containing the marker states
281
                                (if analog channels are active, this must be the same length as
282
                                analog_samples)
283
        @param is_first_chunk: bool, flag indicating if it is the first chunk to write.
284
                                     If True this method will create a new empty wavveform.
285
                                     If False the samples are appended to the existing waveform.
286
        @param is_last_chunk: bool, flag indicating if it is the last chunk to write.
287
                                    Some devices may need to know when to close the appending wfm.
288
        @param total_number_of_samples: int, The number of sample points for the entire waveform
289
                                        (not only the currently written chunk)
290
291
        @return: (int, list) number of samples written (-1 indicates failed process) and list of
292
                             created waveform names
293
        """
294
        waveforms = list()
295
296
        # Sanity checks
297
        if len(analog_samples) == 0:
298
            self.log.error('No analog samples passed to write_waveform method in awg70k.')
299
            return -1, waveforms
300
301
        min_samples = int(self.query('WLIS:WAV:LMIN?'))
302
        if total_number_of_samples < min_samples:
303
            self.log.error('Unable to write waveform.\nNumber of samples to write ({0:d}) is '
304
                           'smaller than the allowed minimum waveform length ({1:d}).'
305
                           ''.format(total_number_of_samples, min_samples))
306
            return -1, waveforms
307
308
        # determine active channels
309
        activation_dict = self.get_active_channels()
310
        active_channels = {chnl for chnl in activation_dict if activation_dict[chnl]}
311
        active_analog = sorted(chnl for chnl in active_channels if chnl.startswith('a'))
312
313
        # Sanity check of channel numbers
314
        if active_channels != set(analog_samples.keys()).union(set(digital_samples.keys())):
315
            self.log.error('Mismatch of channel activation and sample array dimensions for '
316
                           'waveform creation.\nChannel activation is: {0}\nSample arrays have: '
317
                           ''.format(active_channels,
318
                                     set(analog_samples.keys()).union(set(digital_samples.keys()))))
319
            return -1, waveforms
320
321
        # Write waveforms. One for each analog channel.
322
        for a_ch in active_analog:
323
            # Get the integer analog channel number
324
            a_ch_num = int(a_ch.split('ch')[-1])
325
            # Get the digital channel specifiers belonging to this analog channel markers
326
            mrk_ch_1 = 'd_ch{0:d}'.format(a_ch_num * 2 - 1)
327
            mrk_ch_2 = 'd_ch{0:d}'.format(a_ch_num * 2)
328
329
            start = time.time()
330
            # Encode marker information in an array of bytes (uint8). Avoid intermediate copies!!!
331
            if mrk_ch_1 in digital_samples and mrk_ch_2 in digital_samples:
332
                mrk_bytes = digital_samples[mrk_ch_2].view('uint8')
333
                tmp_bytes = digital_samples[mrk_ch_1].view('uint8')
334
                np.left_shift(mrk_bytes, 7, out=mrk_bytes)
335
                np.left_shift(tmp_bytes, 6, out=tmp_bytes)
336
                np.add(mrk_bytes, tmp_bytes, out=mrk_bytes)
337
            elif mrk_ch_1 in digital_samples:
338
                mrk_bytes = digital_samples[mrk_ch_1].view('uint8')
339
                np.left_shift(mrk_bytes, 6, out=mrk_bytes)
340
            else:
341
                mrk_bytes = None
342
            print('Prepare digital channel data: {0}'.format(time.time()-start))
343
344
            # Create waveform name string
345
            wfm_name = '{0}_ch{1:d}'.format(name, a_ch_num)
346
347
            # Check if waveform already exists and delete if necessary.
348
            if wfm_name in self.get_waveform_names():
349
                self.delete_waveform(wfm_name)
350
351
            # Write WFMX file for waveform
352
            start = time.time()
353
            self._write_wfmx(filename=wfm_name,
354
                             analog_samples=analog_samples[a_ch],
355
                             digital_samples=mrk_bytes,
356
                             is_first_chunk=is_first_chunk,
357
                             is_last_chunk=is_last_chunk,
358
                             total_number_of_samples=total_number_of_samples)
359
            print('Write WFMX file: {0}'.format(time.time() - start))
360
361
            # transfer waveform to AWG and load into workspace
362
            start = time.time()
363
            self._send_file(filename=wfm_name + '.wfmx')
364
            print('Send WFMX file: {0}'.format(time.time() - start))
365
366
            start = time.time()
367
            self.write('MMEM:OPEN "{0}"'.format(os.path.join(self._ftp_path, wfm_name + '.wfmx')))
368
            # Wait for everything to complete
369
            while int(self.query('*OPC?')) != 1:
370
                time.sleep(0.25)
371
            # Just to make sure
372
            while wfm_name not in self.get_waveform_names():
373
                time.sleep(0.25)
374
            print('Load WFMX file into workspace: {0}'.format(time.time() - start))
375
376
            # Append created waveform name to waveform list
377
            waveforms.append(wfm_name)
378
        return total_number_of_samples, waveforms
379
380
    def write_sequence(self, name, sequence_parameter_list):
381
        """
382
        Write a new sequence on the device memory.
383
384
        @param name: str, the name of the waveform to be created/append to
385
        @param sequence_parameter_list: list, contains the parameters for each sequence step and
386
                                        the according waveform names.
387
388
        @return: int, number of sequence steps written (-1 indicates failed process)
389
        """
390
        # Check if device has sequencer option installed
391
        if not self.has_sequence_mode():
392
            self.log.error('Direct sequence generation in AWG not possible. Sequencer option not '
393
                           'installed.')
394
            return -1
395
396
        # Check if all waveforms are present on device memory
397
        avail_waveforms = set(self.get_waveform_names())
398
        for waveform_tuple, param_dict in sequence_parameter_list:
399
            if not avail_waveforms.issuperset(waveform_tuple):
400
                self.log.error('Failed to create sequence "{0}" due to waveforms "{1}" not '
401
                               'present in device memory.'.format(name, waveform_tuple))
402
                return -1
403
404
        active_analog = sorted(chnl for chnl in self.get_active_channels() if chnl.startswith('a'))
405
        num_tracks = len(active_analog)
406
        num_steps = len(sequence_parameter_list)
407
408
        # Create new sequence and set jump timing to immediate.
409
        # Delete old sequence by the same name if present.
410
        self.new_sequence(name=name, steps=num_steps)
411
412
        # Fill in sequence information
413
        for step, (wfm_tuple, seq_params) in enumerate(sequence_parameter_list, 1):
414
            # Set waveforms to play
415
            if num_tracks == len(wfm_tuple):
416
                for track, waveform in enumerate(wfm_tuple, 1):
417
                    self.sequence_set_waveform(name, waveform, step, track)
418
            else:
419
                self.log.error('Unable to write sequence.\nLength of waveform tuple "{0}" does not '
420
                               'match the number of sequence tracks.'.format(waveform_tuple))
421
                return -1
422
423
            # Set event jump trigger
424
            self.sequence_set_event_jump(name,
425
                                         step,
426
                                         seq_params['event_trigger'],
427
                                         seq_params['event_jump_to'])
428
            # Set wait trigger
429
            self.sequence_set_wait_trigger(name, step, seq_params['wait_for'])
430
            # Set repetitions
431
            self.sequence_set_repetitions(name, step, seq_params['repetitions'])
432
            # Set go_to parameter
433
            self.sequence_set_goto(name, step, seq_params['go_to'])
434
            # Set flag states
435
            trigger = seq_params['flag_trigger'] != 'OFF'
436
            flag_list = [seq_params['flag_trigger']] if trigger else [seq_params['flag_high']]
437
            self.sequence_set_flags(name, step, flag_list, trigger)
438
439
        # Wait for everything to complete
440
        while int(self.query('*OPC?')) != 1:
441
            time.sleep(0.25)
442
        return num_steps
443
444
    def get_waveform_names(self):
445
        """ Retrieve the names of all uploaded waveforms on the device.
446
447
        @return list: List of all uploaded waveform name strings in the device workspace.
448
        """
449
        try:
450
            query_return = self.query('WLIS:LIST?')
451
        except visa.VisaIOError:
452
            query_return = None
453
            self.log.error('Unable to read waveform list from device. VisaIOError occured.')
454
        waveform_list = sorted(query_return.split(',')) if query_return else list()
455
        return waveform_list
456
457
    def get_sequence_names(self):
458
        """ Retrieve the names of all uploaded sequence on the device.
459
460
        @return list: List of all uploaded sequence name strings in the device workspace.
461
        """
462
        sequence_list = list()
463
464
        if not self.has_sequence_mode():
465
            return sequence_list
466
467
        try:
468
            number_of_seq = int(self.query('SLIS:SIZE?'))
469
            for ii in range(number_of_seq):
470
                sequence_list.append(self.query('SLIS:NAME? {0:d}'.format(ii + 1)))
471
        except visa.VisaIOError:
472
            self.log.error('Unable to read sequence list from device. VisaIOError occurred.')
473
        return sequence_list
474
475
    def delete_waveform(self, waveform_name):
476
        """ Delete the waveform with name "waveform_name" from the device memory.
477
478
        @param str waveform_name: The name of the waveform to be deleted
479
                                  Optionally a list of waveform names can be passed.
480
481
        @return list: a list of deleted waveform names.
482
        """
483
        if isinstance(waveform_name, str):
484
            waveform_name = [waveform_name]
485
486
        avail_waveforms = self.get_waveform_names()
487
        deleted_waveforms = list()
488
        for waveform in waveform_name:
489
            if waveform in avail_waveforms:
490
                self.write('WLIS:WAV:DEL "{0}"'.format(waveform))
491
                deleted_waveforms.append(waveform)
492
        return deleted_waveforms
493
494
    def delete_sequence(self, sequence_name):
495
        """ Delete the sequence with name "sequence_name" from the device memory.
496
497
        @param str sequence_name: The name of the sequence to be deleted
498
                                  Optionally a list of sequence names can be passed.
499
500
        @return list: a list of deleted sequence names.
501
        """
502
        if isinstance(sequence_name, str):
503
            sequence_name = [sequence_name]
504
505
        avail_sequences = self.get_sequence_names()
506
        deleted_sequences = list()
507
        for sequence in sequence_name:
508
            if sequence in avail_sequences:
509
                self.write('SLIS:SEQ:DEL "{0}"'.format(sequence))
510
                deleted_sequences.append(sequence)
511
        return deleted_sequences
512
513 View Code Duplication
    def load_waveform(self, load_dict):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
514
        """ Loads a waveform to the specified channel of the pulsing device.
515
        For devices that have a workspace (i.e. AWG) this will load the waveform from the device
516
        workspace into the channel.
517
        For a device without mass memory this will make the waveform/pattern that has been
518
        previously written with self.write_waveform ready to play.
519
520
        @param load_dict:  dict|list, a dictionary with keys being one of the available channel
521
                                      index and values being the name of the already written
522
                                      waveform to load into the channel.
523
                                      Examples:   {1: rabi_ch1, 2: rabi_ch2} or
524
                                                  {1: rabi_ch2, 2: rabi_ch1}
525
                                      If just a list of waveform names if given, the channel
526
                                      association will be invoked from the channel
527
                                      suffix '_ch1', '_ch2' etc.
528
529
        @return (dict, str): Dictionary with keys being the channel number and values being the
530
                             respective asset loaded into the channel, string describing the asset
531
                             type ('waveform' or 'sequence')
532
        """
533
        if isinstance(load_dict, list):
534
            new_dict = dict()
535
            for waveform in load_dict:
536
                channel = int(waveform.rsplit('_ch', 1)[1])
537
                new_dict[channel] = waveform
538
            load_dict = new_dict
539
540
        # Get all active channels
541
        chnl_activation = self.get_active_channels()
542
        analog_channels = sorted(
543
            chnl for chnl in chnl_activation if chnl.startswith('a') and chnl_activation[chnl])
544
545
        # Check if all channels to load to are active
546
        channels_to_set = {'a_ch{0:d}'.format(chnl_num) for chnl_num in load_dict}
547
        if not channels_to_set.issubset(analog_channels):
548
            self.log.error('Unable to load waveforms into channels.\n'
549
                           'One or more channels to set are not active.')
550
            return self.get_loaded_assets()
551
552
        # Check if all waveforms to load are present on device memory
553
        if not set(load_dict.values()).issubset(self.get_waveform_names()):
554
            self.log.error('Unable to load waveforms into channels.\n'
555
                           'One or more waveforms to load are missing on device memory.')
556
            return self.get_loaded_assets()
557
558
        # Load waveforms into channels
559
        for chnl_num, waveform in load_dict.items():
560
            self.write('SOUR{0:d}:CASS:WAV "{1}"'.format(chnl_num, waveform))
561
            while self.query('SOUR{0:d}:CASS?'.format(chnl_num)) != waveform:
562
                time.sleep(0.1)
563
564
        return self.get_loaded_assets()
565
566
    def load_sequence(self, sequence_name):
567
        """ Loads a sequence to the channels of the device in order to be ready for playback.
568
        For devices that have a workspace (i.e. AWG) this will load the sequence from the device
569
        workspace into the channels.
570
571
        @param sequence_name:  str, name of the sequence to load
572
573
        @return (dict, str): Dictionary with keys being the channel number and values being the
574
                             respective asset loaded into the channel, string describing the asset
575
                             type ('waveform' or 'sequence')
576
        """
577
        if sequence_name not in self.get_sequence_names():
578
            self.log.error('Unable to load sequence.\n'
579
                           'Sequence to load is missing on device memory.')
580
            return self.get_loaded_assets()
581
582
        # Get all active channels
583
        chnl_activation = self.get_active_channels()
584
        analog_channels = sorted(
585
            chnl for chnl in chnl_activation if chnl.startswith('a') and chnl_activation[chnl])
586
587
        # Check if number of sequence tracks matches the number of analog channels
588
        trac_num = int(self.query('SLIS:SEQ:TRAC? "{0}"'.format(sequence_name)))
589
        if trac_num != len(analog_channels):
590
            self.log.error('Unable to load sequence.\nNumber of tracks in sequence to load does '
591
                           'not match the number of active analog channels.')
592
            return self.get_loaded_assets()
593
594
        # Load sequence
595
        for chnl in range(1, trac_num + 1):
596
            self.write('SOUR{0:d}:CASS:SEQ "{1}", {2:d}'.format(chnl, sequence_name, chnl))
597
            while self.query('SOUR{0:d}:CASS?'.format(chnl))[1:-2] != '{0},{1:d}'.format(
598
                    sequence_name, chnl):
599
                time.sleep(0.2)
600
601
        return self.get_loaded_assets()
602
603
    def get_loaded_assets(self):
604
        """
605
        Retrieve the currently loaded asset names for each active channel of the device.
606
        The returned dictionary will have the channel numbers as keys.
607
        In case of loaded waveforms the dictionary values will be the waveform names.
608
        In case of a loaded sequence the values will be the sequence name appended by a suffix
609
        representing the track loaded to the respective channel (i.e. '<sequence_name>_1').
610
611
        @return (dict, str): Dictionary with keys being the channel number and values being the
612
                             respective asset loaded into the channel,
613
                             string describing the asset type ('waveform' or 'sequence')
614
        """
615
        # Get all active channels
616
        chnl_activation = self.get_active_channels()
617
        channel_numbers = sorted(int(chnl.split('_ch')[1]) for chnl in chnl_activation if
618
                                 chnl.startswith('a') and chnl_activation[chnl])
619
620
        # Get assets per channel
621
        loaded_assets = dict()
622
        current_type = None
623
        for chnl_num in channel_numbers:
624
            # Ask AWG for currently loaded waveform or sequence. The answer for a waveform will
625
            # look like '"waveformname"\n' and for a sequence '"sequencename,1"\n'
626
            # (where the number is the current track)
627
            asset_name = self.query('SOUR1:CASS?')
628
            # Figure out if a sequence or just a waveform is loaded by splitting after the comma
629
            splitted = asset_name.rsplit(',', 1)
630
            # If the length is 2 a sequence is loaded and if it is 1 a waveform is loaded
631
            asset_name = splitted[0]
632
            if len(splitted) > 1:
633
                if current_type is not None and current_type != 'sequence':
634
                    self.log.error('Unable to determine loaded assets.')
635
                    return dict(), ''
636
                current_type = 'sequence'
637
                asset_name += '_' + splitted[1]
638
            else:
639
                if current_type is not None and current_type != 'waveform':
640
                    self.log.error('Unable to determine loaded assets.')
641
                    return dict(), ''
642
                current_type = 'waveform'
643
            loaded_assets[chnl_num] = asset_name
644
645
        return loaded_assets, current_type
646
647
    def clear_all(self):
648
        """ Clears all loaded waveform from the pulse generators RAM.
649
650
        @return int: error code (0:OK, -1:error)
651
652
        Unused for digital pulse generators without storage capability
653
        (PulseBlaster, FPGA).
654
        """
655
        self.write('WLIS:WAV:DEL ALL')
656
        while int(self.query('*OPC?')) != 1:
657
            time.sleep(0.25)
658
        if self.has_sequence_mode():
659
            self.write('SLIS:SEQ:DEL ALL')
660
            while int(self.query('*OPC?')) != 1:
661
                time.sleep(0.25)
662
        return 0
663
664
    def get_status(self):
665
        """ Retrieves the status of the pulsing hardware
666
667
        @return (int, dict): inter value of the current status with the
668
                             corresponding dictionary containing status
669
                             description for all the possible status variables
670
                             of the pulse generator hardware
671
        """
672
        status_dic = {-1: 'Failed Request or Communication',
673
                       0: 'Device has stopped, but can receive commands',
674
                       1: 'Device is active and running'}
675
        current_status = -1 if self.awg is None else int(self._is_output_on())
676
        # All the other status messages should have higher integer values then 1.
677
        return current_status, status_dic
678
679
    def set_sample_rate(self, sample_rate):
680
        """ Set the sample rate of the pulse generator hardware
681
682
        @param float sample_rate: The sample rate to be set (in Hz)
683
684
        @return foat: the sample rate returned from the device (-1:error)
685
        """
686
        # Check if AWG is in function generator mode
687
        # self._activate_awg_mode()
688
689
        self.write('CLOCK:SRATE %.4G' % sample_rate)
690
        while int(self.query('*OPC?')) != 1:
691
            time.sleep(0.25)
692
        time.sleep(1)
693
        return self.get_sample_rate()
694
695
    def get_sample_rate(self):
696
        """ Set the sample rate of the pulse generator hardware
697
698
        @return float: The current sample rate of the device (in Hz)
699
        """
700
        return_rate = float(self.query('CLOCK:SRATE?'))
701
        return return_rate
702
703
    def get_analog_level(self, amplitude=None, offset=None):
704
        """ Retrieve the analog amplitude and offset of the provided channels.
705
706
        @param list amplitude: optional, if a specific amplitude value (in Volt
707
                               peak to peak, i.e. the full amplitude) of a
708
                               channel is desired.
709
        @param list offset: optional, if a specific high value (in Volt) of a
710
                            channel is desired.
711
712
        @return dict: with keys being the generic string channel names and items
713
                      being the values for those channels. Amplitude is always
714
                      denoted in Volt-peak-to-peak and Offset in (absolute)
715
                      Voltage.
716
717
        Note: Do not return a saved amplitude and/or offset value but instead
718
              retrieve the current amplitude and/or offset directly from the
719
              device.
720
721
        If no entries provided then the levels of all channels where simply
722
        returned. If no analog channels provided, return just an empty dict.
723
        Example of a possible input:
724
            amplitude = ['a_ch1','a_ch4'], offset =[1,3]
725
        to obtain the amplitude of channel 1 and 4 and the offset
726
            {'a_ch1': -0.5, 'a_ch4': 2.0} {'a_ch1': 0.0, 'a_ch3':-0.75}
727
        since no high request was performed.
728
729
        The major difference to digital signals is that analog signals are
730
        always oscillating or changing signals, otherwise you can use just
731
        digital output. In contrast to digital output levels, analog output
732
        levels are defined by an amplitude (here total signal span, denoted in
733
        Voltage peak to peak) and an offset (a value around which the signal
734
        oscillates, denoted by an (absolute) voltage).
735
736
        In general there is no bijective correspondence between
737
        (amplitude, offset) and (value high, value low)!
738
        """
739
        amp = dict()
740
        off = dict()
741
742
        chnl_list = self._get_all_analog_channels()
743
744
        # get pp amplitudes
745 View Code Duplication
        if amplitude is None:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
746
            for ch_num, chnl in enumerate(chnl_list, 1):
747
                amp[chnl] = float(self.query('SOUR{0:d}:VOLT:AMPL?'.format(ch_num)))
748
        else:
749
            for chnl in amplitude:
750
                if chnl in chnl_list:
751
                    ch_num = int(chnl.rsplit('_ch', 1)[1])
752
                    amp[chnl] = float(self.query('SOUR{0:d}:VOLT:AMPL?'.format(ch_num)))
753
                else:
754
                    self.log.warning('Get analog amplitude from AWG70k channel "{0}" failed. '
755
                                     'Channel non-existent.'.format(chnl))
756
757
        # get voltage offsets
758
        if offset is None:
759
            for ch_num, chnl in enumerate(chnl_list):
760
                off[chnl] = 0.0
761
        else:
762
            for chnl in offset:
763
                if chnl in chnl_list:
764
                    ch_num = int(chnl.rsplit('_ch', 1)[1])
765
                    off[chnl] = 0.0
766
                else:
767
                    self.log.warning('Get analog offset from AWG70k channel "{0}" failed. '
768
                                     'Channel non-existent.'.format(chnl))
769
        return amp, off
770
771 View Code Duplication
    def set_analog_level(self, amplitude=None, offset=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
772
        """ Set amplitude and/or offset value of the provided analog channel.
773
774
        @param dict amplitude: dictionary, with key being the channel and items
775
                               being the amplitude values (in Volt peak to peak,
776
                               i.e. the full amplitude) for the desired channel.
777
        @param dict offset: dictionary, with key being the channel and items
778
                            being the offset values (in absolute volt) for the
779
                            desired channel.
780
781
        @return (dict, dict): tuple of two dicts with the actual set values for
782
                              amplitude and offset.
783
784
        If nothing is passed then the command will return two empty dicts.
785
786
        Note: After setting the analog and/or offset of the device, retrieve
787
              them again for obtaining the actual set value(s) and use that
788
              information for further processing.
789
790
        The major difference to digital signals is that analog signals are
791
        always oscillating or changing signals, otherwise you can use just
792
        digital output. In contrast to digital output levels, analog output
793
        levels are defined by an amplitude (here total signal span, denoted in
794
        Voltage peak to peak) and an offset (a value around which the signal
795
        oscillates, denoted by an (absolute) voltage).
796
797
        In general there is no bijective correspondence between
798
        (amplitude, offset) and (value high, value low)!
799
        """
800
        # Check the inputs by using the constraints...
801
        constraints = self.get_constraints()
802
        # ...and the available analog channels
803
        analog_channels = self._get_all_analog_channels()
804
805
        # amplitude sanity check
806
        if amplitude is not None:
807
            for chnl in amplitude:
808
                ch_num = int(chnl.rsplit('_ch', 1)[1])
809
                if chnl not in analog_channels:
810
                    self.log.warning('Channel to set (a_ch{0}) not available in AWG.\nSetting '
811
                                     'analogue voltage for this channel ignored.'.format(chnl))
812
                    del amplitude[chnl]
813
                if amplitude[chnl] < constraints.a_ch_amplitude.min:
814
                    self.log.warning('Minimum Vpp for channel "{0}" is {1}. Requested Vpp of {2}V '
815
                                     'was ignored and instead set to min value.'
816
                                     ''.format(chnl, constraints.a_ch_amplitude.min,
817
                                               amplitude[chnl]))
818
                    amplitude[chnl] = constraints.a_ch_amplitude.min
819
                elif amplitude[chnl] > constraints.a_ch_amplitude.max:
820
                    self.log.warning('Maximum Vpp for channel "{0}" is {1}. Requested Vpp of {2}V '
821
                                     'was ignored and instead set to max value.'
822
                                     ''.format(chnl, constraints.a_ch_amplitude.max,
823
                                               amplitude[chnl]))
824
                    amplitude[chnl] = constraints.a_ch_amplitude.max
825
        # offset sanity check
826
        if offset is not None:
827
            for chnl in offset:
828
                ch_num = int(chnl.rsplit('_ch', 1)[1])
829
                if chnl not in analog_channels:
830
                    self.log.warning('Channel to set (a_ch{0}) not available in AWG.\nSetting '
831
                                     'offset voltage for this channel ignored.'.format(chnl))
832
                    del offset[chnl]
833
                if offset[chnl] < constraints.a_ch_offset.min:
834
                    self.log.warning('Minimum offset for channel "{0}" is {1}. Requested offset of '
835
                                     '{2}V was ignored and instead set to min value.'
836
                                     ''.format(chnl, constraints.a_ch_offset.min, offset[chnl]))
837
                    offset[chnl] = constraints.a_ch_offset.min
838
                elif offset[chnl] > constraints.a_ch_offset.max:
839
                    self.log.warning('Maximum offset for channel "{0}" is {1}. Requested offset of '
840
                                     '{2}V was ignored and instead set to max value.'
841
                                     ''.format(chnl, constraints.a_ch_offset.max,
842
                                               offset[chnl]))
843
                    offset[chnl] = constraints.a_ch_offset.max
844
845
        if amplitude is not None:
846
            for a_ch in amplitude:
847
                ch_num = int(chnl.rsplit('_ch', 1)[1])
848
                self.write('SOUR{0:d}:VOLT:AMPL {1}'.format(ch_num, amplitude[a_ch]))
849
                while int(self.query('*OPC?')) != 1:
850
                    time.sleep(0.25)
851
852
        if offset is not None:
853
            for a_ch in offset:
854
                ch_num = int(chnl.rsplit('_ch', 1)[1])
855
                self.write('SOUR{0:d}:VOLT:OFFSET {1}'.format(ch_num, offset[a_ch]))
856
                while int(self.query('*OPC?')) != 1:
857
                    time.sleep(0.25)
858
        return self.get_analog_level()
859
860 View Code Duplication
    def get_digital_level(self, low=None, high=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
861
        """ Retrieve the digital low and high level of the provided channels.
862
863
        @param list low: optional, if a specific low value (in Volt) of a
864
                         channel is desired.
865
        @param list high: optional, if a specific high value (in Volt) of a
866
                          channel is desired.
867
868
        @return: (dict, dict): tuple of two dicts, with keys being the channel
869
                               number and items being the values for those
870
                               channels. Both low and high value of a channel is
871
                               denoted in (absolute) Voltage.
872
873
        Note: Do not return a saved low and/or high value but instead retrieve
874
              the current low and/or high value directly from the device.
875
876
        If no entries provided then the levels of all channels where simply
877
        returned. If no digital channels provided, return just an empty dict.
878
879
        Example of a possible input:
880
            low = ['d_ch1', 'd_ch4']
881
        to obtain the low voltage values of digital channel 1 an 4. A possible
882
        answer might be
883
            {'d_ch1': -0.5, 'd_ch4': 2.0} {}
884
        since no high request was performed.
885
886
        The major difference to analog signals is that digital signals are
887
        either ON or OFF, whereas analog channels have a varying amplitude
888
        range. In contrast to analog output levels, digital output levels are
889
        defined by a voltage, which corresponds to the ON status and a voltage
890
        which corresponds to the OFF status (both denoted in (absolute) voltage)
891
892
        In general there is no bijective correspondence between
893
        (amplitude, offset) and (value high, value low)!
894
        """
895
        # TODO: Test with multiple channel AWG
896
        low_val = {}
897
        high_val = {}
898
899
        digital_channels = self._get_all_digital_channels()
900
901
        if low is None:
902
            low = digital_channels
903
        if high is None:
904
            high = digital_channels
905
906
        # get low marker levels
907
        for chnl in low:
908
            if chnl not in digital_channels:
909
                continue
910
            d_ch_number = int(chnl.rsplit('_ch', 1)[1])
911
            a_ch_number = (1 + d_ch_number) // 2
912
            marker_index = 2 - (d_ch_number % 2)
913
            low_val[chnl] = float(
914
                self.query('SOUR{0:d}:MARK{1:d}:VOLT:LOW?'.format(a_ch_number, marker_index)))
915
        # get high marker levels
916
        for chnl in high:
917
            if chnl not in digital_channels:
918
                continue
919
            d_ch_number = int(chnl.rsplit('_ch', 1)[1])
920
            a_ch_number = (1 + d_ch_number) // 2
921
            marker_index = 2 - (d_ch_number % 2)
922
            high_val[chnl] = float(
923
                self.query('SOUR{0:d}:MARK{1:d}:VOLT:HIGH?'.format(a_ch_number, marker_index)))
924
925
        return low_val, high_val
926
927
    def set_digital_level(self, low=None, high=None):
928
        """ Set low and/or high value of the provided digital channel.
929
930
        @param dict low: dictionary, with key being the channel and items being
931
                         the low values (in volt) for the desired channel.
932
        @param dict high: dictionary, with key being the channel and items being
933
                         the high values (in volt) for the desired channel.
934
935
        @return (dict, dict): tuple of two dicts where first dict denotes the
936
                              current low value and the second dict the high
937
                              value.
938
939
        If nothing is passed then the command will return two empty dicts.
940
941
        Note: After setting the high and/or low values of the device, retrieve
942
              them again for obtaining the actual set value(s) and use that
943
              information for further processing.
944
945
        The major difference to analog signals is that digital signals are
946
        either ON or OFF, whereas analog channels have a varying amplitude
947
        range. In contrast to analog output levels, digital output levels are
948
        defined by a voltage, which corresponds to the ON status and a voltage
949
        which corresponds to the OFF status (both denoted in (absolute) voltage)
950
951
        In general there is no bijective correspondence between
952
        (amplitude, offset) and (value high, value low)!
953
        """
954
        if low is None:
955
            low = dict()
956
        if high is None:
957
            high = dict()
958
959
        #If you want to check the input use the constraints:
960
        constraints = self.get_constraints()
961
962
        for d_ch, value in low.items():
963
            #FIXME: Tell the device the proper digital voltage low value:
964
            # self.tell('SOURCE1:MARKER{0}:VOLTAGE:LOW {1}'.format(d_ch, low[d_ch]))
965
            pass
966
967
        for d_ch, value in high.items():
968
            #FIXME: Tell the device the proper digital voltage high value:
969
            # self.tell('SOURCE1:MARKER{0}:VOLTAGE:HIGH {1}'.format(d_ch, high[d_ch]))
970
            pass
971
        return self.get_digital_level()
972
973
    def get_active_channels(self, ch=None):
974
        """ Get the active channels of the pulse generator hardware.
975
976
        @param list ch: optional, if specific analog or digital channels are
977
                        needed to be asked without obtaining all the channels.
978
979
        @return dict:  where keys denoting the channel number and items boolean
980
                       expressions whether channel are active or not.
981
982
        Example for an possible input (order is not important):
983
            ch = ['a_ch2', 'd_ch2', 'a_ch1', 'd_ch5', 'd_ch1']
984
        then the output might look like
985
            {'a_ch2': True, 'd_ch2': False, 'a_ch1': False, 'd_ch5': True, 'd_ch1': False}
986
987
        If no parameters are passed to this method all channels will be asked
988
        for their setting.
989
        """
990
        # If you want to check the input use the constraints:
991
        # constraints = self.get_constraints()
992
993
        analog_channels = self._get_all_analog_channels()
994
995
        active_ch = dict()
996
        for ch_num, a_ch in enumerate(analog_channels, 1):
997
            # check what analog channels are active
998
            active_ch[a_ch] = bool(int(self.query('OUTPUT{0:d}:STATE?'.format(ch_num))))
999
            # check how many markers are active on each channel, i.e. the DAC resolution
1000
            if active_ch[a_ch]:
1001
                digital_mrk = 10 - int(self.query('SOUR{0:d}:DAC:RES?'.format(ch_num)))
1002 View Code Duplication
                if digital_mrk == 2:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1003
                    active_ch['d_ch{0:d}'.format(ch_num * 2)] = True
1004
                    active_ch['d_ch{0:d}'.format(ch_num * 2 - 1)] = True
1005
                elif digital_mrk == 1:
1006
                    active_ch['d_ch{0:d}'.format(ch_num * 2)] = False
1007
                    active_ch['d_ch{0:d}'.format(ch_num * 2 - 1)] = True
1008
                else:
1009
                    active_ch['d_ch{0:d}'.format(ch_num * 2)] = False
1010
                    active_ch['d_ch{0:d}'.format(ch_num * 2 - 1)] = False
1011
            else:
1012
                active_ch['d_ch{0:d}'.format(ch_num * 2)] = False
1013
                active_ch['d_ch{0:d}'.format(ch_num * 2 - 1)] = False
1014
1015
        # return either all channel information or just the one asked for.
1016
        if ch is not None:
1017
            chnl_to_delete = [chnl for chnl in active_ch if chnl not in ch]
1018
            for chnl in chnl_to_delete:
1019
                del active_ch[chnl]
1020
        return active_ch
1021
1022
    def set_active_channels(self, ch=None):
1023
        """ Set the active channels for the pulse generator hardware.
1024
1025
        @param dict ch: dictionary with keys being the analog or digital
1026
                          string generic names for the channels with items being
1027
                          a boolean value.current_loaded_asset
1028
1029
        @return dict: with the actual set values for active channels for analog
1030
                      and digital values.
1031
1032
        If nothing is passed then the command will return an empty dict.
1033
1034
        Note: After setting the active channels of the device, retrieve them
1035
              again for obtaining the actual set value(s) and use that
1036
              information for further processing.
1037
1038
        Example for possible input:
1039
            ch={'a_ch2': True, 'd_ch1': False, 'd_ch3': True, 'd_ch4': True}
1040
        to activate analog channel 2 digital channel 3 and 4 and to deactivate
1041
        digital channel 1.
1042
1043
        The hardware itself has to handle, whether separate channel activation
1044
        is possible.
1045
        """
1046
        current_channel_state = self.get_active_channels()
1047
1048
        if ch is None:
1049
            return current_channel_state
1050
1051
        if not set(current_channel_state).issuperset(ch):
1052
            self.log.error('Trying to (de)activate channels that are not present in AWG70k.\n'
1053
                           'Setting of channel activation aborted.')
1054
            return current_channel_state
1055
1056
        # Determine new channel activation states
1057
        new_channels_state = current_channel_state.copy()
1058
        for chnl in ch:
1059
            new_channels_state[chnl] = ch[chnl]
1060
1061
        # check if the channels to set are part of the activation_config constraints
1062
        constraints = self.get_constraints()
1063
        new_active_channels = {chnl for chnl in new_channels_state if new_channels_state[chnl]}
1064
        if new_active_channels not in constraints.activation_config.values():
1065
            self.log.error('activation_config to set ({0}) is not allowed according to constraints.'
1066
                           ''.format(new_active_channels))
1067
            return current_channel_state
1068
1069
        # get lists of all analog channels
1070
        analog_channels = self._get_all_analog_channels()
1071
1072
        # calculate dac resolution for each analog channel and set it in hardware.
1073
        # Also (de)activate the analog channels accordingly
1074
        max_res = constraints.dac_resolution['max']
1075
        for a_ch in analog_channels:
1076
            ach_num = int(a_ch.rsplit('_ch', 1)[1])
1077
            # determine number of markers for current a_ch
1078
            if new_channels_state['d_ch{0:d}'.format(2 * ach_num - 1)]:
1079
                marker_num = 2 if new_channels_state['d_ch{0:d}'.format(2 * ach_num)] else 1
1080
            else:
1081
                marker_num = 0
1082
            # set DAC resolution for this channel
1083
            dac_res = max_res - marker_num
1084
            self.write('SOUR{0:d}:DAC:RES {1:d}'.format(ach_num, dac_res))
1085
            # (de)activate the analog channel
1086
            if new_channels_state[a_ch]:
1087
                self.write('OUTPUT{0:d}:STATE ON'.format(ach_num))
1088
            else:
1089
                self.write('OUTPUT{0:d}:STATE OFF'.format(ach_num))
1090
1091
        return self.get_active_channels()
1092
1093
    def get_interleave(self):
1094
        """ Check whether Interleave is ON or OFF in AWG.
1095
1096
        @return bool: True: ON, False: OFF
1097
1098
        Unused for pulse generator hardware other than an AWG.
1099
        """
1100
        return False
1101
1102
    def set_interleave(self, state=False):
1103
        """ Turns the interleave of an AWG on or off.
1104
1105
        @param bool state: The state the interleave should be set to
1106
                           (True: ON, False: OFF)
1107
1108
        @return bool: actual interleave status (True: ON, False: OFF)
1109
1110
        Note: After setting the interleave of the device, retrieve the
1111
              interleave again and use that information for further processing.
1112
1113
        Unused for pulse generator hardware other than an AWG.
1114
        """
1115
        if state:
1116
            self.log.warning('Interleave mode not available for the AWG 70000 Series!\n'
1117
                             'Method call will be ignored.')
1118
        return False
1119
1120
    def has_sequence_mode(self):
1121
        """ Asks the pulse generator whether sequence mode exists.
1122
1123
        @return: bool, True for yes, False for no.
1124
        """
1125
        options = self.query('*OPT?').split(',')
1126
        return '03' in options
1127
1128
    def reset(self):
1129
        """Reset the device.
1130
1131
        @return int: error code (0:OK, -1:error)
1132
        """
1133
        self.write('*RST')
1134
        self.write('*WAI')
1135
        return 0
1136
1137
    def query(self, question):
1138
        """ Asks the device a 'question' and receive and return an answer from it.
1139
1140
        @param string question: string containing the command
1141
1142
        @return string: the answer of the device to the 'question' in a string
1143
        """
1144
        return self.awg.query(question).strip().rstrip('\n').rstrip().strip('"')
1145
1146
    def write(self, command):
1147
        """ Sends a command string to the device.
1148
1149
        @param string command: string containing the command
1150
1151
        @return int: error code (0:OK, -1:error)
1152
        """
1153
        bytes_written, enum_status_code = self.awg.write(command)
1154
        return int(enum_status_code)
1155
1156
    def new_sequence(self, name, steps):
1157
        """
1158
        Generate a new sequence 'name' having 'steps' number of steps with immediate (async.) jump
1159
        timing.
1160
1161
        @param str name: Name of the sequence which should be generated
1162
        @param int steps: Number of steps
1163
1164
        @return int: error code
1165
        """
1166
        if not self.has_sequence_mode():
1167
            self.log.error('Sequence generation in AWG not possible. '
1168
                           'Sequencer option not installed.')
1169
            return -1
1170
1171
        if name in self.get_sequence_names():
1172
            self.delete_sequence(name)
1173
        self.write('SLIS:SEQ:NEW "{0}", {1:d}'.format(name, steps))
1174
        self.write('SLIS:SEQ:EVEN:JTIM "{0}", IMM'.format(name))
1175
        return 0
1176
1177
    def sequence_set_waveform(self, sequence_name, waveform_name, step, track):
1178
        """
1179
        Set the waveform 'waveform_name' to position 'step' in the sequence 'sequence_name'.
1180
1181
        @param str sequence_name: Name of the sequence which should be editted
1182
        @param str waveform_name: Name of the waveform which should be added
1183
        @param int step: Position of the added waveform
1184
        @param int track: track which should be editted
1185
1186
        @return int: error code
1187
        """
1188
        if not self.has_sequence_mode():
1189
            self.log.error('Direct sequence generation in AWG not possible. '
1190
                           'Sequencer option not installed.')
1191
            return -1
1192
1193
        self.write('SLIS:SEQ:STEP{0:d}:TASS{1:d}:WAV "{2}", "{3}"'.format(step,
1194
                                                                          track,
1195
                                                                          sequence_name,
1196
                                                                          waveform_name))
1197
        return 0
1198
1199
    def sequence_set_repetitions(self, sequence_name, step, repeat=1):
1200
        """
1201
        Set the repetition counter of sequence "sequence_name" at step "step" to "repeat".
1202
        A repeat value of -1 denotes infinite repetitions; 0 means the step is played once.
1203
1204
        @param str sequence_name: Name of the sequence to be edited
1205
        @param int step: Sequence step to be edited
1206
        @param int repeat: number of repetitions. (-1: infinite, 0: once, 1: twice, ...)
1207
1208
        @return int: error code
1209
        """
1210
        if not self.has_sequence_mode():
1211
            self.log.error('Direct sequence generation in AWG not possible. '
1212
                           'Sequencer option not installed.')
1213
            return -1
1214
        repeat = 'INF' if repeat < 0 else str(int(repeat))
1215
        self.write('SLIS:SEQ:STEP{0:d}:RCO "{1}", {2}'.format(step, sequence_name, repeat))
1216
        return 0
1217
1218
    def sequence_set_goto(self, sequence_name, step, goto=-1):
1219
        """
1220
1221
        @param str sequence_name:
1222
        @param int step:
1223
        @param int goto:
1224
1225
        @return int: error code
1226
        """
1227
        if not self.has_sequence_mode():
1228
            self.log.error('Direct sequence generation in AWG not possible. '
1229
                           'Sequencer option not installed.')
1230
            return -1
1231
1232
        goto = str(int(goto)) if seq_params['go_to'] > 0 else 'NEXT'
1233
        self.write('SLIS:SEQ:STEP{0:d}:GOTO "{1}", {2}'.format(step, sequence_name, goto))
1234
        return 0
1235
1236
    def sequence_set_event_jump(self, sequence_name, step, trigger='OFF', jumpto=0):
1237
        """
1238
        Set the event trigger input of the specified sequence step and the jump_to destination.
1239
1240
        @param str sequence_name: Name of the sequence to be edited
1241
        @param int step: Sequence step to be edited
1242
        @param str trigger: Trigger string specifier. ('OFF', 'A', 'B' or 'INT')
1243
        @param int jumpto: The sequence step to jump to. 0 or -1 is interpreted as next step
1244
1245
        @return int: error code
1246
        """
1247
        if not self.has_sequence_mode():
1248
            self.log.error('Direct sequence generation in AWG not possible. '
1249
                           'Sequencer option not installed.')
1250
            return -1
1251
1252
        trigger = self.__event_triggers.get(trigger)
1253
        if trigger is None:
1254
            self.log.error('Invalid trigger specifier "{0}".\n'
1255
                           'Please choose one of: "OFF", "A", "B", "INT"')
1256
            return -1
1257
1258
        self.write('SLIS:SEQ:STEP{0:d}:EJIN "{1}", {2}'.format(step, sequence_name, trigger))
1259
        # Set event_jump_to if event trigger is enabled
1260
        if trigger != 'OFF':
1261
            jumpto = 'NEXT' if jumpto <= 0 else str(int(jumpto))
1262
            self.write('SLIS:SEQ:STEP{0:d}:EJUM "{1}", {2}'.format(step, sequence_name, jumpto))
1263
        return 0
1264
1265
    def sequence_set_wait_trigger(self, sequence_name, step, trigger='OFF'):
1266
        """
1267
        Make a certain sequence step wait for a trigger to start playing.
1268
1269
        @param str sequence_name: Name of the sequence to be edited
1270
        @param int step: Sequence step to be edited
1271
        @param str trigger: Trigger string specifier. ('OFF', 'A', 'B' or 'INT')
1272
1273
        @return int: error code
1274
        """
1275
        if not self.has_sequence_mode():
1276
            self.log.error('Direct sequence generation in AWG not possible. '
1277
                           'Sequencer option not installed.')
1278
            return -1
1279
1280
        trigger = self.__event_triggers.get(trigger)
1281
        if trigger is None:
1282
            self.log.error('Invalid trigger specifier "{0}".\n'
1283
                           'Please choose one of: "OFF", "A", "B", "INT"')
1284
            return -1
1285
1286
        self.write('SLIS:SEQ:STEP{0:d}:WINP "{1}", {2}'.format(step, sequence_name, trigger))
1287
        return 0
1288
1289
    def sequence_set_flags(self, sequence_name, step, flags=None, trigger=False):
1290
        """
1291
        Set the flags in "flags" to HIGH (trigger=False) during the sequence step or let the flags
1292
        send out a fixed duration trigger pulse (trigger=True). All other flags are set to LOW.
1293
1294
        @param str sequence_name: Name of the sequence to be edited
1295
        @param int step: Sequence step to be edited
1296
        @param list flags: List of flag specifiers to be active during this sequence step
1297
        @param bool trigger: Whether the flag should be HIGH during the step (False) or send out a
1298
                             fixed length trigger pulse when starting to play the step (True).
1299
1300
        @return int: error code
1301
        """
1302
        if not self.has_sequence_mode():
1303
            self.log.error('Direct sequence generation in AWG not possible. '
1304
                           'Sequencer option not installed.')
1305
            return -1
1306
1307
        for flag in ('A', 'B', 'C', 'D'):
1308
            if flag in flags:
1309
                state = 'PULS' if trigger else 'HIGH'
1310
            else:
1311
                state = 'LOW'
1312
1313
            self.write('SLIS:SEQ:STEP{0:d}:TFL1:{2}FL "{3}",{4}'.format(step,
1314
                                                                        flag,
1315
                                                                        sequence_name,
1316
                                                                        state))
1317
        return 0
1318
1319
    def make_sequence_continuous(self, sequencename):
1320
        """
1321
        Usually after a run of a sequence the output stops. Many times it is desired that the full
1322
        sequence is repeated many times. This is achieved here by setting the 'jump to' value of
1323
        the last element to 'First'
1324
1325
        @param sequencename: Name of the sequence which should be made continous
1326
1327
        @return int last_step: The step number which 'jump to' has to be set to 'First'
1328
        """
1329
        if not self.has_sequence_mode():
1330
            self.log.error('Direct sequence generation in AWG not possible. '
1331
                           'Sequencer option not installed.')
1332
            return -1
1333
1334
        last_step = int(self.query('SLIS:SEQ:LENG? "{0}"'.format(sequencename)))
1335
        err = self.sequence_set_goto(sequencename, last_step, 1)
1336
        if err < 0:
1337
            last_step = err
1338
        return last_step
1339
1340
    def force_jump_sequence(self, final_step, channel=1):
1341
        """
1342
        This command forces the sequencer to jump to the specified step per channel. A
1343
        force jump does not require a trigger event to execute the jump.
1344
        For two channel instruments, if both channels are playing the same sequence, then
1345
        both channels jump simultaneously to the same sequence step.
1346
1347
        @param channel: determines the channel number. If omitted, interpreted as 1
1348
        @param final_step: Step to jump to. Possible options are
1349
            FIRSt - This enables the sequencer to jump to first step in the sequence.
1350
            CURRent - This enables the sequencer to jump to the current sequence step,
1351
            essentially starting the current step over.
1352
            LAST - This enables the sequencer to jump to the last step in the sequence.
1353
            END - This enables the sequencer to go to the end and play 0 V until play is
1354
            stopped.
1355
            <NR1> - This enables the sequencer to jump to the specified step, where the
1356
            value is between 1 and 16383.
1357
1358
        """
1359
        self.write('SOURCE{0:d}:JUMP:FORCE {1}'.format(channel, final_step))
1360
        return
1361
1362
    def _get_all_channels(self):
1363
        """
1364
        Helper method to return a sorted list of all technically available channel descriptors
1365
        (e.g. ['a_ch1', 'a_ch2', 'd_ch1', 'd_ch2'])
1366
1367
        @return list: Sorted list of channels
1368
        """
1369
        configs = self.get_constraints().activation_config
1370
        if 'all' in configs:
1371
            largest_config = configs['all']
1372
        else:
1373
            largest_config = list(configs.values())[0]
1374
            for config in configs.values():
1375
                if len(largest_config) < len(config):
1376
                    largest_config = config
1377
        return sorted(largest_config)
1378
1379
    def _get_all_analog_channels(self):
1380
        """
1381
        Helper method to return a sorted list of all technically available analog channel
1382
        descriptors (e.g. ['a_ch1', 'a_ch2'])
1383
1384
        @return list: Sorted list of analog channels
1385
        """
1386
        return [chnl for chnl in self._get_all_channels() if chnl.startswith('a')]
1387
1388
    def _get_all_digital_channels(self):
1389
        """
1390
        Helper method to return a sorted list of all technically available digital channel
1391
        descriptors (e.g. ['d_ch1', 'd_ch2'])
1392
1393
        @return list: Sorted list of digital channels
1394
        """
1395
        return [chnl for chnl in self._get_all_channels() if chnl.startswith('d')]
1396
1397
    def _is_output_on(self):
1398
        """
1399
        Aks the AWG if the output is enabled, i.e. if the AWG is running
1400
1401
        @return: bool, (True: output on, False: output off)
1402
        """
1403
        return bool(int(self.query('AWGC:RST?')))
1404
1405 View Code Duplication
    def _get_filenames_on_device(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1406
        """
1407
1408
        @return list: filenames found in <ftproot>\\waves
1409
        """
1410
        filename_list = list()
1411
        with FTP(self._ip_address) as ftp:
1412
            ftp.login(user=self._username, passwd=self._password)
1413
            ftp.cwd(self.ftp_working_dir)
1414
            # get only the files from the dir and skip possible directories
1415
            log = list()
1416
            ftp.retrlines('LIST', callback=log.append)
1417
            for line in log:
1418
                if '<DIR>' not in line:
1419
                    # that is how a potential line is looking like:
1420
                    #   '05-10-16  05:22PM                  292 SSR aom adjusted.seq'
1421
                    # The first part consists of the date information. Remove this information and
1422
                    # separate the first number, which indicates the size of the file. This is
1423
                    # necessary if the filename contains whitespaces.
1424
                    size_filename = line[18:].lstrip()
1425
                    # split after the first appearing whitespace and take the rest as filename.
1426
                    # Remove for safety all trailing and leading whitespaces:
1427
                    filename = size_filename.split(' ', 1)[1].strip()
1428
                    filename_list.append(filename)
1429
        return filename_list
1430
1431
    def _delete_file(self, filename):
1432
        """
1433
1434
        @param str filename:
1435
        """
1436
        if filename in self._get_filenames_on_device():
1437
            with FTP(self._ip_address) as ftp:
1438
                ftp.login(user=self._username, passwd=self._password)
1439
                ftp.cwd(self.ftp_working_dir)
1440
                ftp.delete(filename)
1441
        return
1442
1443 View Code Duplication
    def _send_file(self, filename):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1444
        """
1445
1446
        @param filename:
1447
        @return:
1448
        """
1449
        # check input
1450
        if not filename:
1451
            self.log.error('No filename provided for file upload to awg!\nCommand will be ignored.')
1452
            return -1
1453
1454
        filepath = os.path.join(self._tmp_work_dir, filename)
1455
        if not os.path.isfile(filepath):
1456
            self.log.error('No file "{0}" found in "{1}". Unable to upload!'
1457
                           ''.format(filename, self._tmp_work_dir))
1458
            return -1
1459
1460
        # Delete old file on AWG by the same filename
1461
        self._delete_file(filename)
1462
1463
        # Transfer file
1464
        with FTP(self._ip_address) as ftp:
1465
            ftp.login(user=self._username, passwd=self._password)
1466
            ftp.cwd(self.ftp_working_dir)
1467
            with open(filepath, 'rb') as file:
1468
                ftp.storbinary('STOR ' + filename, file)
1469
        return 0
1470
1471
    def _write_wfmx(self, filename, analog_samples, marker_bytes, is_first_chunk, is_last_chunk,
1472
                    total_number_of_samples):
1473
        """
1474
        Appends a sampled chunk of a whole waveform to a wfmx-file. Create the file
1475
        if it is the first chunk.
1476
        If both flags (is_first_chunk, is_last_chunk) are set to TRUE it means
1477
        that the whole ensemble is written as a whole in one big chunk.
1478
1479
        @param name: string, represents the name of the sampled ensemble
1480
        @param analog_samples: dict containing float32 numpy ndarrays, contains the
1481
                                       samples for the analog channels that
1482
                                       are to be written by this function call.
1483
        @param marker_bytes: np.ndarray containing bool numpy ndarrays, contains the samples
1484
                                      for the digital channels that
1485
                                      are to be written by this function call.
1486
        @param total_number_of_samples: int, The total number of samples in the
1487
                                        entire waveform. Has to be known in advance.
1488
        @param is_first_chunk: bool, indicates if the current chunk is the
1489
                               first write to this file.
1490
        @param is_last_chunk: bool, indicates if the current chunk is the last
1491
                              write to this file.
1492
1493
        @return list: the list contains the string names of the created files for the passed
1494
                      presampled arrays
1495
        """
1496
        # The memory overhead of the tmp file write/read process in bytes. Only used if wfmx file is
1497
        # written in chunks in order to avoid excessive memory usage.
1498
        tmp_bytes_overhead = 16777216  # 16 MB
1499
1500
        if not filename.endswith('.wfmx'):
1501
            filename += '.wfmx'
1502
        wfmx_path = os.path.join(self._tmp_work_dir, filename)
1503
        tmp_path = os.path.join(self._tmp_work_dir, 'digital_tmp.bin')
1504
1505
        # if it is the first chunk, create the .WFMX file with header.
1506
        if is_first_chunk:
1507
            # create header
1508
            header = self._create_xml_header(total_number_of_samples, marker_bytes is not None)
1509
            # write header
1510
            with open(wfmx_path, 'wb') as wfmxfile:
1511
                wfmxfile.write(header)
1512
            # Check if a tmp digital samples file is present and delete it if necessary.
1513
            if os.path.isfile(tmp_path):
1514
                os.remove(tmp_path)
1515
1516
        # append analog samples to the .WFMX file.
1517
        # Write digital samples in temporary file if not the entire samples are passed at once.
1518
        with open(wfmx_path, 'ab') as wfmxfile:
1519
            # append analog samples in binary format. One sample is 4 bytes (np.float32).
1520
            wfmxfile.write(analog_samples)
1521
1522
        # Write digital samples to tmp file if chunkwise writing is used and it's not the last chunk
1523
        if not is_last_chunk and marker_bytes is not None:
1524
            with open(tmp_path, 'ab') as tmp_file:
1525
                tmp_file.write(marker_bytes)
1526
1527
        # If this is the last chunk, write digital samples from tmp file to wfmx file (if present)
1528
        # and also append the currently passed digital samples to wfmx file.
1529
        # Read from tmp file in chunks of tmp_bytes_overhead in order to avoid too much memory
1530
        # overhead.
1531
        if is_last_chunk and marker_bytes is not None:
1532
            with open(wfmx_path, 'ab') as wfmxfile:
1533
                # Copy over digital samples from tmp file. Delete tmp file afterwards.
1534
                if os.path.isfile(tmp_path):
1535
                    with open(tmp_path, 'rb') as tmp_file:
1536
                        while True:
1537
                            tmp = tmp_file.read(tmp_bytes_overhead)
1538
                            if not tmp:
1539
                                break
1540
                            wfmxfile.write(tmp)
1541
                    os.remove(tmp_path)
1542
                # Append current digital samples array to wfmx file
1543
                wfmxfile.write(marker_bytes)
1544
        return
1545
1546
    def _create_xml_header(self, number_of_samples, markers_active):
1547
        """
1548
        This function creates an xml file containing the header for the wfmx-file format using
1549
        etree.
1550
        """
1551
        hdr = ET.Element('DataFile', offset='XXXXXXXXX', version='0.1')
1552
        dsc = ET.SubElement(hdr, 'DataSetsCollection', xmlns='http://www.tektronix.com')
1553
        datasets = ET.SubElement(dsc, 'DataSets', version='1', xmlns='http://www.tektronix.com')
1554
        datadesc = ET.SubElement(datasets, 'DataDescription')
1555
        sub_elem = ET.SubElement(datadesc, 'NumberSamples')
1556
        sub_elem.text = str(int(number_of_samples))
1557
        sub_elem = ET.SubElement(datadesc, 'SamplesType')
1558
        sub_elem.text = 'AWGWaveformSample'
1559
        sub_elem = ET.SubElement(datadesc, 'MarkersIncluded')
1560
        sub_elem.text = 'true' if markers_active else 'false'
1561
        sub_elem = ET.SubElement(datadesc, 'NumberFormat')
1562
        sub_elem.text = 'Single'
1563
        sub_elem = ET.SubElement(datadesc, 'Endian')
1564
        sub_elem.text = 'Little'
1565
        sub_elem = ET.SubElement(datadesc, 'Timestamp')
1566
        sub_elem.text = '2014-10-28T12:59:52.9004865-07:00'
1567
        prodspec = ET.SubElement(datasets, 'ProductSpecific', name='')
1568
        sub_elem = ET.SubElement(prodspec, 'ReccSamplingRate', units='Hz')
1569
        sub_elem.text = str(self.get_sample_rate())
1570
        sub_elem = ET.SubElement(prodspec, 'ReccAmplitude', units='Volts')
1571
        sub_elem.text = '0.5'
1572
        sub_elem = ET.SubElement(prodspec, 'ReccOffset', units='Volts')
1573
        sub_elem.text = '0'
1574
        sub_elem = ET.SubElement(prodspec, 'SerialNumber')
1575
        sub_elem = ET.SubElement(prodspec, 'SoftwareVersion')
1576
        sub_elem.text = '4.0.0075'
1577
        sub_elem = ET.SubElement(prodspec, 'UserNotes')
1578
        sub_elem = ET.SubElement(prodspec, 'OriginalBitDepth')
1579
        sub_elem.text = 'Floating'
1580
        sub_elem = ET.SubElement(prodspec, 'Thumbnail')
1581
        sub_elem = ET.SubElement(prodspec, 'CreatorProperties', name='Basic Waveform')
1582
        sub_elem = ET.SubElement(hdr, 'Setup')
1583
1584
        xml_header = ET.tostring(hdr, encoding='unicode')
1585
        xml_header = xml_header.replace('><', '>\r\n<')
1586
1587
        # Calculates the length of the header and replace placeholder with actual number
1588
        xml_header = xml_header.replace('XXXXXXXXX', str(len(xml_header)).zfill(9))
1589
        return xml_header
1590