Completed
Pull Request — master (#384)
by
unknown
02:57
created

AWG70K.write_waveform()   F

Complexity

Conditions 15

Size

Total Lines 108

Duplication

Lines 0
Ratio 0 %

Importance

Changes 5
Bugs 0 Features 0
Metric Value
cc 15
c 5
b 0
f 0
dl 0
loc 108
rs 2

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like AWG70K.write_waveform() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
3
"""
4
This file contains the Qudi hardware module for AWG70000 Series.
5
6
Qudi is free software: you can redistribute it and/or modify
7
it under the terms of the GNU General Public License as published by
8
the Free Software Foundation, either version 3 of the License, or
9
(at your option) any later version.
10
11
Qudi is distributed in the hope that it will be useful,
12
but WITHOUT ANY WARRANTY; without even the implied warranty of
13
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
GNU General Public License for more details.
15
16
You should have received a copy of the GNU General Public License
17
along with Qudi. If not, see <http://www.gnu.org/licenses/>.
18
19
Copyright (c) the Qudi Developers. See the COPYRIGHT.txt file at the
20
top-level directory of this distribution and at <https://github.com/Ulm-IQO/qudi/>
21
"""
22
23
24
import os
25
import time
26
import visa
27
import numpy as np
28
29
from collections import OrderedDict
30
from ftplib import FTP
31
from lxml import etree as ET
32
33
from core.module import Base, ConfigOption
34
from core.util.modules import get_home_dir
35
from interface.pulser_interface import PulserInterface, PulserConstraints
36
37
38
class AWG70K(Base, PulserInterface):
39
    """
40
41
    """
42
    _modclass = 'awg70k'
43
    _modtype = 'hardware'
44
45
    # config options
46
    _tmp_work_dir = ConfigOption(name='tmp_work_dir',
47
                                 default=os.path.join(get_home_dir(), 'pulsed_files'),
48
                                 missing='warn')
49
    _visa_address = ConfigOption(name='awg_visa_address', missing='error')
50
    _ip_address = ConfigOption(name='awg_ip_address', missing='error')
51
    _ftp_dir = ConfigOption(name='ftp_root_dir', default='C:\\inetpub\\ftproot', missing='warn')
52
    _username = ConfigOption(name='ftp_login', default='anonymous', missing='warn')
53
    _password = ConfigOption(name='ftp_passwd', default='anonymous@', missing='warn')
54
    _visa_timeout = ConfigOption(name='timeout', default=30, missing='nothing')
55
56
    def __init__(self, *args, **kwargs):
57
        super().__init__(*args, **kwargs)
58
59
        # Get an instance of the visa resource manager
60
        self._rm = visa.ResourceManager()
61
62
        self.awg = None  # This variable will hold a reference to the awg visa resource
63
        self.awg_model = ''  # String describing the model
64
65
        self.ftp_working_dir = 'waves'  # subfolder of FTP root dir on AWG disk to work in
66
        return
67
68
    def on_activate(self):
69
        """ Initialisation performed during activation of the module.
70
        """
71
        # Create work directory if necessary
72
        if not os.path.exists(self._tmp_work_dir):
73
            os.makedirs(os.path.abspath(self._tmp_work_dir))
74
75
        # connect to awg using PyVISA
76
        if self._visa_address not in self._rm.list_resources():
77
            self.awg = None
78
            self.log.error('VISA address "{0}" not found by the pyVISA resource manager.\nCheck '
79
                           'the connection by using for example "Agilent Connection Expert".'
80
                           ''.format(self._visa_address))
81
        else:
82
            self.awg = self._rm.open_resource(self._visa_address)
83
            # set timeout by default to 30 sec
84
            self.awg.timeout = self._visa_timeout * 1000
85
86
        # try connecting to AWG using FTP protocol
87
        with FTP(self._ip_address) as ftp:
88
            ftp.login(user=self._username, passwd=self._password)
89
            ftp.cwd(self.ftp_working_dir)
90
91
        if self.awg is not None:
92
            self.awg_model = self.query('*IDN?').split(',')[1]
93
        else:
94
            self.awg_model = ''
95
        return
96
97
    def on_deactivate(self):
98
        """ Required tasks to be performed during deactivation of the module.
99
        """
100
        # Closes the connection to the AWG
101
        try:
102
            self.awg.close()
103
        except:
104
            self.log.debug('Closing AWG connection using pyvisa failed.')
105
        self.log.info('Closed connection to AWG')
106
        return
107
108
    def get_constraints(self):
109
        """
110
        Retrieve the hardware constrains from the Pulsing device.
111
112
        @return constraints object: object with pulser constraints as attributes.
113
114
        Provides all the constraints (e.g. sample_rate, amplitude, total_length_bins,
115
        channel_config, ...) related to the pulse generator hardware to the caller.
116
117
            SEE PulserConstraints CLASS IN pulser_interface.py FOR AVAILABLE CONSTRAINTS!!!
118
119
        If you are not sure about the meaning, look in other hardware files to get an impression.
120
        If still additional constraints are needed, then they have to be added to the
121
        PulserConstraints class.
122
123
        Each scalar parameter is an ScalarConstraints object defined in cor.util.interfaces.
124
        Essentially it contains min/max values as well as min step size, default value and unit of
125
        the parameter.
126
127
        PulserConstraints.activation_config differs, since it contain the channel
128
        configuration/activation information of the form:
129
            {<descriptor_str>: <channel_set>,
130
             <descriptor_str>: <channel_set>,
131
             ...}
132
133
        If the constraints cannot be set in the pulsing hardware (e.g. because it might have no
134
        sequence mode) just leave it out so that the default is used (only zeros).
135
        """
136
        constraints = PulserConstraints()
137
138
        if self.awg_model == 'AWG70002A':
139
            constraints.sample_rate.min = 1.5e3
140
            constraints.sample_rate.max = 25.0e9
141
            constraints.sample_rate.step = 5.0e2
142
            constraints.sample_rate.default = 25.0e9
143
        elif self.awg_model == 'AWG70001A':
144
            constraints.sample_rate.min = 3.0e3
145
            constraints.sample_rate.max = 50.0e9
146
            constraints.sample_rate.step = 1.0e3
147
            constraints.sample_rate.default = 50.0e9
148
149
        constraints.a_ch_amplitude.min = 0.25
150
        constraints.a_ch_amplitude.max = 0.5
151
        constraints.a_ch_amplitude.step = 0.001
152
        constraints.a_ch_amplitude.default = 0.5
153
        # FIXME: Enter the proper digital channel low constraints:
154
        constraints.d_ch_low.min = 0.0
155
        constraints.d_ch_low.max = 0.0
156
        constraints.d_ch_low.step = 0.0
157
        constraints.d_ch_low.default = 0.0
158
        # FIXME: Enter the proper digital channel high constraints:
159
        constraints.d_ch_high.min = 0.0
160
        constraints.d_ch_high.max = 1.4
161
        constraints.d_ch_high.step = 0.1
162
        constraints.d_ch_high.default = 1.4
163
164
        constraints.waveform_length.min = 1
165
        constraints.waveform_length.max = 8000000000
166
        constraints.waveform_length.step = 1
167
        constraints.waveform_length.default = 1
168
169
        # FIXME: Check the proper number for your device
170
        constraints.waveform_num.min = 1
171
        constraints.waveform_num.max = 32000
172
        constraints.waveform_num.step = 1
173
        constraints.waveform_num.default = 1
174
        # FIXME: Check the proper number for your device
175
        constraints.sequence_num.min = 1
176
        constraints.sequence_num.max = 4000
177
        constraints.sequence_num.step = 1
178
        constraints.sequence_num.default = 1
179
        # FIXME: Check the proper number for your device
180
        constraints.subsequence_num.min = 1
181
        constraints.subsequence_num.max = 8000
182
        constraints.subsequence_num.step = 1
183
        constraints.subsequence_num.default = 1
184
185
        # If sequencer mode is available then these should be specified
186
        constraints.repetitions.min = 0
187
        constraints.repetitions.max = 65536
188
        constraints.repetitions.step = 1
189
        constraints.repetitions.default = 0
190
        # ToDo: Check how many external triggers are available
191
        constraints.event_triggers = ['A', 'B']
192
        constraints.flags = ['A', 'B', 'C', 'D']
193
194
        constraints.sequence_steps.min = 0
195
        constraints.sequence_steps.max = 8000
196
        constraints.sequence_steps.step = 1
197
        constraints.sequence_steps.default = 0
198
199
        # the name a_ch<num> and d_ch<num> are generic names, which describe UNAMBIGUOUSLY the
200
        # channels. Here all possible channel configurations are stated, where only the generic
201
        # names should be used. The names for the different configurations can be customary chosen.
202
        activation_config = OrderedDict()
203
        if self.awg_model == 'AWG70002A':
204
            activation_config['all'] = {'a_ch1', 'd_ch1', 'd_ch2', 'a_ch2', 'd_ch3', 'd_ch4'}
205
            # Usage of both channels but reduced markers (higher analog resolution)
206
            activation_config['ch1_2mrk_ch2_1mrk'] = {'a_ch1', 'd_ch1', 'd_ch2', 'a_ch2', 'd_ch3'}
207
            activation_config['ch1_2mrk_ch2_0mrk'] = {'a_ch1', 'd_ch1', 'd_ch2', 'a_ch2'}
208
            activation_config['ch1_1mrk_ch2_2mrk'] = {'a_ch1', 'd_ch1', 'a_ch2', 'd_ch3', 'd_ch4'}
209
            activation_config['ch1_0mrk_ch2_2mrk'] = {'a_ch1', 'a_ch2', 'd_ch3', 'd_ch4'}
210
            activation_config['ch1_1mrk_ch2_1mrk'] = {'a_ch1', 'd_ch1', 'a_ch2', 'd_ch3'}
211
            activation_config['ch1_0mrk_ch2_1mrk'] = {'a_ch1', 'a_ch2', 'd_ch3'}
212
            activation_config['ch1_1mrk_ch2_0mrk'] = {'a_ch1', 'd_ch1', 'a_ch2'}
213
            # Usage of channel 1 only:
214
            activation_config['ch1_2mrk'] = {'a_ch1', 'd_ch1', 'd_ch2'}
215
            # Usage of channel 2 only:
216
            activation_config['ch2_2mrk'] = {'a_ch2', 'd_ch3', 'd_ch4'}
217
            # Usage of only channel 1 with one marker:
218
            activation_config['ch1_1mrk'] = {'a_ch1', 'd_ch1'}
219
            # Usage of only channel 2 with one marker:
220
            activation_config['ch2_1mrk'] = {'a_ch2', 'd_ch3'}
221
            # Usage of only channel 1 with no marker:
222
            activation_config['ch1_0mrk'] = {'a_ch1'}
223
            # Usage of only channel 2 with no marker:
224
            activation_config['ch2_0mrk'] = {'a_ch2'}
225
        elif self.awg_model == 'AWG70001A':
226
            activation_config['all'] = {'a_ch1', 'd_ch1', 'd_ch2'}
227
            # Usage of only channel 1 with one marker:
228
            activation_config['ch1_1mrk'] = {'a_ch1', 'd_ch1'}
229
            # Usage of only channel 1 with no marker:
230
            activation_config['ch1_0mrk'] = {'a_ch1'}
231
232
        constraints.activation_config = activation_config
233
234
        # FIXME: additional constraint really necessary?
235
        constraints.dac_resolution = {'min': 8, 'max': 10, 'step': 1, 'unit': 'bit'}
236
        return constraints
237
238
    def pulser_on(self):
239
        """ Switches the pulsing device on.
240
241
        @return int: error code (0:OK, -1:error, higher number corresponds to
242
                                 current status of the device. Check then the
243
                                 class variable status_dic.)
244
        """
245
        # do nothing if AWG is already running
246
        if not self._is_output_on():
247
            self.write('AWGC:RUN')
248
            # wait until the AWG is actually running
249
            while not self._is_output_on():
250
                time.sleep(0.25)
251
        return self.get_status()[0]
252
253
    def pulser_off(self):
254
        """ Switches the pulsing device off.
255
256
        @return int: error code (0:OK, -1:error, higher number corresponds to
257
                                 current status of the device. Check then the
258
                                 class variable status_dic.)
259
        """
260
        # do nothing if AWG is already idle
261
        if self._is_output_on():
262
            self.write('AWGC:STOP')
263
            # wait until the AWG has actually stopped
264
            while self._is_output_on():
265
                time.sleep(0.25)
266
        return self.get_status()[0]
267
268
    def write_waveform(self, name, analog_samples, digital_samples, is_first_chunk, is_last_chunk,
269
                       total_number_of_samples):
270
        """
271
        Write a new waveform or append samples to an already existing waveform on the device memory.
272
        The flags is_first_chunk and is_last_chunk can be used as indicator if a new waveform should
273
        be created or if the write process to a waveform should be terminated.
274
275
        @param name: str, the name of the waveform to be created/append to
276
        @param analog_samples: numpy.ndarray of type float32 containing the voltage samples
277
        @param digital_samples: numpy.ndarray of type bool containing the marker states
278
                                (if analog channels are active, this must be the same length as
279
                                analog_samples)
280
        @param is_first_chunk: bool, flag indicating if it is the first chunk to write.
281
                                     If True this method will create a new empty wavveform.
282
                                     If False the samples are appended to the existing waveform.
283
        @param is_last_chunk: bool, flag indicating if it is the last chunk to write.
284
                                    Some devices may need to know when to close the appending wfm.
285
        @param total_number_of_samples: int, The number of sample points for the entire waveform
286
                                        (not only the currently written chunk)
287
288
        @return: (int, list) number of samples written (-1 indicates failed process) and list of
289
                             created waveform names
290
        """
291
        waveforms = list()
292
293
        # Sanity checks
294
        if len(analog_samples) == 0:
295
            self.log.error('No analog samples passed to write_waveform method in awg70k.')
296
            return -1, waveforms
297
298
        min_samples = int(self.query('WLIS:WAV:LMIN?'))
299
        if total_number_of_samples < min_samples:
300
            self.log.error('Unable to write waveform.\nNumber of samples to write ({0:d}) is '
301
                           'smaller than the allowed minimum waveform length ({1:d}).'
302
                           ''.format(total_number_of_samples, min_samples))
303
            return -1, waveforms
304
305
        # determine active channels
306
        activation_dict = self.get_active_channels()
307
        active_channels = {chnl for chnl in activation_dict if activation_dict[chnl]}
308
        active_analog = sorted(chnl for chnl in active_channels if chnl.startswith('a'))
309
310
        # Sanity check of channel numbers
311
        if active_channels != set(analog_samples.keys()).union(set(digital_samples.keys())):
312
            self.log.error('Mismatch of channel activation and sample array dimensions for '
313
                           'waveform creation.\nChannel activation is: {0}\nSample arrays have: '
314
                           ''.format(active_channels,
315
                                     set(analog_samples.keys()).union(set(digital_samples.keys()))))
316
            return -1, waveforms
317
318
        # Write waveforms. One for each analog channel.
319
        for a_ch in active_analog:
320
            # Get the integer analog channel number
321
            a_ch_num = int(a_ch.split('ch')[-1])
322
            # Get the digital channel specifiers belonging to this analog channel markers
323
            mrk_ch_1 = 'd_ch{0:d}'.format(a_ch_num * 2 - 1)
324
            mrk_ch_2 = 'd_ch{0:d}'.format(a_ch_num * 2)
325
326
            start = time.time()
327
            # Encode marker information in an array of bytes (uint8). Avoid intermediate copies!!!
328
            if mrk_ch_1 in digital_samples and mrk_ch_2 in digital_samples:
329
                mrk_bytes = digital_samples[mrk_ch_2].view('uint8')
330
                tmp_bytes = digital_samples[mrk_ch_1].view('uint8')
331
                np.left_shift(mrk_bytes, 7, out=mrk_bytes)
332
                np.left_shift(tmp_bytes, 6, out=tmp_bytes)
333
                np.add(mrk_bytes, tmp_bytes, out=mrk_bytes)
334
            elif mrk_ch_1 in digital_samples:
335
                mrk_bytes = digital_samples[mrk_ch_1].view('uint8')
336
                np.left_shift(mrk_bytes, 6, out=mrk_bytes)
337
            else:
338
                mrk_bytes = None
339
            print('Prepare digital channel data: {0}'.format(time.time()-start))
340
341
            # Create waveform name string
342
            wfm_name = '{0}_ch{1:d}'.format(name, a_ch_num)
343
344
            # Check if waveform already exists and delete if necessary.
345
            if wfm_name in self.get_waveform_names():
346
                self.delete_waveform(wfm_name)
347
348
            # Write WFMX file for waveform
349
            start = time.time()
350
            self._write_wfmx(filename=wfm_name,
351
                             analog_samples=analog_samples[a_ch],
352
                             digital_samples=mrk_bytes,
353
                             is_first_chunk=is_first_chunk,
354
                             is_last_chunk=is_last_chunk,
355
                             total_number_of_samples=total_number_of_samples)
356
            print('Write WFMX file: {0}'.format(time.time() - start))
357
358
            # transfer waveform to AWG and load into workspace
359
            start = time.time()
360
            self._send_file(filename=wfm_name + '.wfmx')
361
            print('Send WFMX file: {0}'.format(time.time() - start))
362
363
            start = time.time()
364
            self.write('MMEM:OPEN "{0}"'.format(os.path.join(self._ftp_path, wfm_name + '.wfmx')))
365
            # Wait for everything to complete
366
            while int(self.query('*OPC?')) != 1:
367
                time.sleep(0.25)
368
            # Just to make sure
369
            while wfm_name not in self.get_waveform_names():
370
                time.sleep(0.25)
371
            print('Load WFMX file into workspace: {0}'.format(time.time() - start))
372
373
            # Append created waveform name to waveform list
374
            waveforms.append(wfm_name)
375
        return total_number_of_samples, waveforms
376
377
    def write_sequence(self, name, sequence_parameter_list):
378
        """
379
        Write a new sequence on the device memory.
380
381
        @param name: str, the name of the waveform to be created/append to
382
        @param sequence_parameter_list: list, contains the parameters for each sequence step and
383
                                        the according waveform names.
384
385
        @return: int, number of sequence steps written (-1 indicates failed process)
386
        """
387
        # Check if device has sequencer option installed
388
        if not self.has_sequence_mode():
389
            self.log.error('Direct sequence generation in AWG not possible. Sequencer option not '
390
                           'installed.')
391
            return -1
392
393
        # Check if all waveforms are present on device memory
394
        avail_waveforms = set(self.get_waveform_names())
395
        for waveform_tuple, param_dict in sequence_parameter_list:
396
            if not avail_waveforms.issuperset(waveform_tuple):
397
                self.log.error('Failed to create sequence "{0}" due to waveforms "{1}" not '
398
                               'present in device memory.'.format(name, waveform_tuple))
399
                return -1
400
401
        trig_dict = {-1: 'OFF', 0: 'OFF', 1: 'ATR', 2: 'BTR'}
402
        active_analog = sorted(chnl for chnl in self.get_active_channels() if chnl.startswith('a'))
403
        num_tracks = len(active_analog)
404
        num_steps = len(sequence_parameter_list)
405
406
        # Create new sequence and set jump timing to immediate.
407
        # Delete old sequence by the same name if present.
408
        self.generate_sequence(name=name, steps=num_steps, tracks=num_tracks)
409
410
        # Fill in sequence information
411
        for step, (wfm_tuple, seq_params) in enumerate(sequence_parameter_list):
412
            # Set waveforms to play
413
            if num_tracks == len(wfm_tuple):
414
                for track, waveform in enumerate(wfm_tuple):
415
                    self.write('SLIS:SEQ:STEP{0:d}:TASS{1:d}:WAV "{2}", "{3}"'.format(
416
                        step + 1, track + 1, name, waveform))
417
            else:
418
                self.log.error('Unable to write sequence.\nLength of waveform tuple "{0}" does not '
419
                               'match the number of sequence tracks.'.format(waveform_tuple))
420
                return -1
421
422
            # Set event trigger
423
            jumpto = str(seq_params['event_jump_to']) if seq_params['event_jump_to'] > 0 else 'NEXT'
424
            self.write('SLIS:SEQ:STEP{0:d}:EJUM "{1}", {2}'.format(step + 1, name, jumpto))
425
            if seq_params['repetitions'] > 0:
426
                self.write('SLIS:SEQ:STEP{0:d}:EJIN "{1}", {2}'.format(step + 1, name, 'OFF'))
427
            else:
428
                self.write('SLIS:SEQ:STEP{0:d}:EJIN "{1}", {2}'.format(step + 1, name, 'ATR'))
429
430
            # Set repetitions
431
            repeat = str(seq_params['repetitions']) if seq_params['repetitions'] > 0 else 'INF'
432
            self.write('SLIS:SEQ:STEP{0:d}:RCO "{1}", {2}'.format(step + 1, name, repeat))
433
434
            # Set go_to parameter
435
            goto = str(seq_params['go_to']) if seq_params['go_to'] > 0 else 'NEXT'
436
            self.write('SLIS:SEQ:STEP{0:d}:GOTO "{1}", {2}'.format(step + 1, name, goto))
437
438
        # Wait for everything to complete
439
        while int(self.query('*OPC?')) != 1:
440
            time.sleep(0.25)
441
        return num_steps
442
443
    def get_waveform_names(self):
444
        """ Retrieve the names of all uploaded waveforms on the device.
445
446
        @return list: List of all uploaded waveform name strings in the device workspace.
447
        """
448
        try:
449
            query_return = self.query('WLIS:LIST?')
450
        except visa.VisaIOError:
451
            query_return = None
452
            self.log.error('Unable to read waveform list from device. VisaIOError occured.')
453
        waveform_list = sorted(query_return.split(',')) if query_return else list()
454
        return waveform_list
455
456
    def get_sequence_names(self):
457
        """ Retrieve the names of all uploaded sequence on the device.
458
459
        @return list: List of all uploaded sequence name strings in the device workspace.
460
        """
461
        sequence_list = list()
462
463
        if not self.has_sequence_mode():
464
            return sequence_list
465
466
        try:
467
            number_of_seq = int(self.query('SLIS:SIZE?'))
468
            for ii in range(number_of_seq):
469
                sequence_list.append(self.query('SLIS:NAME? {0:d}'.format(ii + 1)))
470
        except visa.VisaIOError:
471
            self.log.error('Unable to read sequence list from device. VisaIOError occurred.')
472
        return sequence_list
473
474
    def delete_waveform(self, waveform_name):
475
        """ Delete the waveform with name "waveform_name" from the device memory.
476
477
        @param str waveform_name: The name of the waveform to be deleted
478
                                  Optionally a list of waveform names can be passed.
479
480
        @return list: a list of deleted waveform names.
481
        """
482
        if isinstance(waveform_name, str):
483
            waveform_name = [waveform_name]
484
485
        avail_waveforms = self.get_waveform_names()
486
        deleted_waveforms = list()
487
        for waveform in waveform_name:
488
            if waveform in avail_waveforms:
489
                self.write('WLIS:WAV:DEL "{0}"'.format(waveform))
490
                deleted_waveforms.append(waveform)
491
        return deleted_waveforms
492
493
    def delete_sequence(self, sequence_name):
494
        """ Delete the sequence with name "sequence_name" from the device memory.
495
496
        @param str sequence_name: The name of the sequence to be deleted
497
                                  Optionally a list of sequence names can be passed.
498
499
        @return list: a list of deleted sequence names.
500
        """
501
        if isinstance(sequence_name, str):
502
            sequence_name = [sequence_name]
503
504
        avail_sequences = self.get_sequence_names()
505
        deleted_sequences = list()
506
        for sequence in sequence_name:
507
            if sequence in avail_sequences:
508
                self.write('SLIS:SEQ:DEL "{0}"'.format(sequence))
509
                deleted_sequences.append(sequence)
510
        return deleted_sequences
511
512
    def load_waveform(self, load_dict):
513
        """ Loads a waveform to the specified channel of the pulsing device.
514
        For devices that have a workspace (i.e. AWG) this will load the waveform from the device
515
        workspace into the channel.
516
        For a device without mass memory this will make the waveform/pattern that has been
517
        previously written with self.write_waveform ready to play.
518
519 View Code Duplication
        @param load_dict:  dict|list, a dictionary with keys being one of the available channel
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
520
                                      index and values being the name of the already written
521
                                      waveform to load into the channel.
522
                                      Examples:   {1: rabi_ch1, 2: rabi_ch2} or
523
                                                  {1: rabi_ch2, 2: rabi_ch1}
524
                                      If just a list of waveform names if given, the channel
525
                                      association will be invoked from the channel
526
                                      suffix '_ch1', '_ch2' etc.
527
528
        @return (dict, str): Dictionary with keys being the channel number and values being the
529
                             respective asset loaded into the channel, string describing the asset
530
                             type ('waveform' or 'sequence')
531
        """
532
        if isinstance(load_dict, list):
533
            new_dict = dict()
534
            for waveform in load_dict:
535
                channel = int(waveform.rsplit('_ch', 1)[1])
536
                new_dict[channel] = waveform
537
            load_dict = new_dict
538
539
        # Get all active channels
540
        chnl_activation = self.get_active_channels()
541
        analog_channels = sorted(
542
            chnl for chnl in chnl_activation if chnl.startswith('a') and chnl_activation[chnl])
543
544
        # Check if all channels to load to are active
545
        channels_to_set = {'a_ch{0:d}'.format(chnl_num) for chnl_num in load_dict}
546
        if not channels_to_set.issubset(analog_channels):
547
            self.log.error('Unable to load waveforms into channels.\n'
548
                           'One or more channels to set are not active.')
549
            return self.get_loaded_assets()
550
551
        # Check if all waveforms to load are present on device memory
552
        if not set(load_dict.values()).issubset(self.get_waveform_names()):
553
            self.log.error('Unable to load waveforms into channels.\n'
554
                           'One or more waveforms to load are missing on device memory.')
555
            return self.get_loaded_assets()
556
557
        # Load waveforms into channels
558
        for chnl_num, waveform in load_dict.items():
559
            self.write('SOUR{0:d}:CASS:WAV "{1}"'.format(chnl_num, waveform))
560
            while self.query('SOUR{0:d}:CASS?'.format(chnl_num)) != waveform:
561
                time.sleep(0.1)
562
563
        return self.get_loaded_assets()
564
565
    def load_sequence(self, sequence_name):
566
        """ Loads a sequence to the channels of the device in order to be ready for playback.
567
        For devices that have a workspace (i.e. AWG) this will load the sequence from the device
568
        workspace into the channels.
569
570
        @param sequence_name:  str, name of the sequence to load
571
572
        @return (dict, str): Dictionary with keys being the channel number and values being the
573
                             respective asset loaded into the channel, string describing the asset
574
                             type ('waveform' or 'sequence')
575
        """
576
        if sequence_name not in self.get_sequence_names():
577
            self.log.error('Unable to load sequence.\n'
578
                           'Sequence to load is missing on device memory.')
579
            return self.get_loaded_assets()
580
581
        # Get all active channels
582
        chnl_activation = self.get_active_channels()
583
        analog_channels = sorted(
584
            chnl for chnl in chnl_activation if chnl.startswith('a') and chnl_activation[chnl])
585
586
        # Check if number of sequence tracks matches the number of analog channels
587
        trac_num = int(self.query('SLIS:SEQ:TRAC? "{0}"'.format(sequence_name)))
588
        if trac_num != len(analog_channels):
589
            self.log.error('Unable to load sequence.\nNumber of tracks in sequence to load does '
590
                           'not match the number of active analog channels.')
591
            return self.get_loaded_assets()
592
593
        # Load sequence
594
        for chnl in range(1, trac_num + 1):
595
            self.write('SOUR{0:d}:CASS:SEQ "{1}", {2:d}'.format(chnl, sequence_name, chnl))
596
            while self.query('SOUR{0:d}:CASS?'.format(chnl))[1:-2] != '{0},{1:d}'.format(
597
                    sequence_name, chnl):
598
                time.sleep(0.2)
599
600
        return self.get_loaded_assets()
601
602
    def get_loaded_assets(self):
603
        """
604
        Retrieve the currently loaded asset names for each active channel of the device.
605
        The returned dictionary will have the channel numbers as keys.
606
        In case of loaded waveforms the dictionary values will be the waveform names.
607
        In case of a loaded sequence the values will be the sequence name appended by a suffix
608
        representing the track loaded to the respective channel (i.e. '<sequence_name>_1').
609
610
        @return (dict, str): Dictionary with keys being the channel number and values being the
611
                             respective asset loaded into the channel,
612
                             string describing the asset type ('waveform' or 'sequence')
613
        """
614
        # Get all active channels
615
        chnl_activation = self.get_active_channels()
616
        channel_numbers = sorted(int(chnl.split('_ch')[1]) for chnl in chnl_activation if
617
                                 chnl.startswith('a') and chnl_activation[chnl])
618
619
        # Get assets per channel
620
        loaded_assets = dict()
621
        current_type = None
622
        for chnl_num in channel_numbers:
623
            # Ask AWG for currently loaded waveform or sequence. The answer for a waveform will
624
            # look like '"waveformname"\n' and for a sequence '"sequencename,1"\n'
625
            # (where the number is the current track)
626
            asset_name = self.query('SOUR1:CASS?')
627
            # Figure out if a sequence or just a waveform is loaded by splitting after the comma
628
            splitted = asset_name.rsplit(',', 1)
629
            # If the length is 2 a sequence is loaded and if it is 1 a waveform is loaded
630
            asset_name = splitted[0]
631
            if len(splitted) > 1:
632
                if current_type is not None and current_type != 'sequence':
633
                    self.log.error('Unable to determine loaded assets.')
634
                    return dict(), ''
635
                current_type = 'sequence'
636
                asset_name += '_' + splitted[1]
637
            else:
638
                if current_type is not None and current_type != 'waveform':
639
                    self.log.error('Unable to determine loaded assets.')
640
                    return dict(), ''
641
                current_type = 'waveform'
642
            loaded_assets[chnl_num] = asset_name
643
644
        return loaded_assets, current_type
645
646
    def clear_all(self):
647
        """ Clears all loaded waveform from the pulse generators RAM.
648
649
        @return int: error code (0:OK, -1:error)
650
651
        Unused for digital pulse generators without storage capability
652
        (PulseBlaster, FPGA).
653
        """
654
        self.write('WLIS:WAV:DEL ALL')
655
        while int(self.query('*OPC?')) != 1:
656
            time.sleep(0.25)
657
        if self.has_sequence_mode():
658
            self.write('SLIS:SEQ:DEL ALL')
659
            while int(self.query('*OPC?')) != 1:
660
                time.sleep(0.25)
661
        return 0
662
663
    def get_status(self):
664
        """ Retrieves the status of the pulsing hardware
665
666
        @return (int, dict): inter value of the current status with the
667
                             corresponding dictionary containing status
668
                             description for all the possible status variables
669
                             of the pulse generator hardware
670
        """
671
        status_dic = {-1: 'Failed Request or Communication',
672
                       0: 'Device has stopped, but can receive commands',
673
                       1: 'Device is active and running'}
674
        current_status = -1 if self.awg is None else int(self._is_output_on())
675
        # All the other status messages should have higher integer values then 1.
676
        return current_status, status_dic
677
678
    def set_sample_rate(self, sample_rate):
679
        """ Set the sample rate of the pulse generator hardware
680
681
        @param float sample_rate: The sample rate to be set (in Hz)
682
683
        @return foat: the sample rate returned from the device (-1:error)
684
        """
685
        # Check if AWG is in function generator mode
686
        # self._activate_awg_mode()
687
688
        self.write('CLOCK:SRATE %.4G' % sample_rate)
689
        while int(self.query('*OPC?')) != 1:
690
            time.sleep(0.25)
691
        time.sleep(1)
692
        return self.get_sample_rate()
693
694
    def get_sample_rate(self):
695
        """ Set the sample rate of the pulse generator hardware
696
697
        @return float: The current sample rate of the device (in Hz)
698
        """
699
        return_rate = float(self.query('CLOCK:SRATE?'))
700
        return return_rate
701
702
    def get_analog_level(self, amplitude=None, offset=None):
703
        """ Retrieve the analog amplitude and offset of the provided channels.
704
705
        @param list amplitude: optional, if a specific amplitude value (in Volt
706
                               peak to peak, i.e. the full amplitude) of a
707
                               channel is desired.
708
        @param list offset: optional, if a specific high value (in Volt) of a
709
                            channel is desired.
710
711
        @return dict: with keys being the generic string channel names and items
712
                      being the values for those channels. Amplitude is always
713
                      denoted in Volt-peak-to-peak and Offset in (absolute)
714
                      Voltage.
715
716
        Note: Do not return a saved amplitude and/or offset value but instead
717
              retrieve the current amplitude and/or offset directly from the
718
              device.
719
720
        If no entries provided then the levels of all channels where simply
721
        returned. If no analog channels provided, return just an empty dict.
722
        Example of a possible input:
723
            amplitude = ['a_ch1','a_ch4'], offset =[1,3]
724
        to obtain the amplitude of channel 1 and 4 and the offset
725
            {'a_ch1': -0.5, 'a_ch4': 2.0} {'a_ch1': 0.0, 'a_ch3':-0.75}
726
        since no high request was performed.
727
728
        The major difference to digital signals is that analog signals are
729
        always oscillating or changing signals, otherwise you can use just
730
        digital output. In contrast to digital output levels, analog output
731
        levels are defined by an amplitude (here total signal span, denoted in
732
        Voltage peak to peak) and an offset (a value around which the signal
733
        oscillates, denoted by an (absolute) voltage).
734
735
        In general there is no bijective correspondence between
736
        (amplitude, offset) and (value high, value low)!
737
        """
738
        amp = dict()
739
        off = dict()
740
741
        chnl_list = self._get_all_analog_channels()
742
743
        # get pp amplitudes
744
        if amplitude is None:
745
            for ch_num, chnl in enumerate(chnl_list):
746
                amp[chnl] = float(self.query('SOUR{0:d}:VOLT:AMPL?'.format(ch_num + 1)))
747
        else:
748
            for chnl in amplitude:
749
                if chnl in chnl_list:
750
                    ch_num = int(chnl.rsplit('_ch', 1)[1])
751 View Code Duplication
                    amp[chnl] = float(self.query('SOUR{0:d}:VOLT:AMPL?'.format(ch_num)))
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
752
                else:
753
                    self.log.warning('Get analog amplitude from AWG70k channel "{0}" failed. '
754
                                     'Channel non-existent.'.format(chnl))
755
756
        # get voltage offsets
757
        if offset is None:
758
            for ch_num, chnl in enumerate(chnl_list):
759
                off[chnl] = 0.0
760
        else:
761
            for chnl in offset:
762
                if chnl in chnl_list:
763
                    ch_num = int(chnl.rsplit('_ch', 1)[1])
764
                    off[chnl] = 0.0
765
                else:
766
                    self.log.warning('Get analog offset from AWG70k channel "{0}" failed. '
767
                                     'Channel non-existent.'.format(chnl))
768
        return amp, off
769
770
    def set_analog_level(self, amplitude=None, offset=None):
771
        """ Set amplitude and/or offset value of the provided analog channel.
772
773
        @param dict amplitude: dictionary, with key being the channel and items
774
                               being the amplitude values (in Volt peak to peak,
775
                               i.e. the full amplitude) for the desired channel.
776
        @param dict offset: dictionary, with key being the channel and items
777
                            being the offset values (in absolute volt) for the
778
                            desired channel.
779
780
        @return (dict, dict): tuple of two dicts with the actual set values for
781
                              amplitude and offset.
782
783
        If nothing is passed then the command will return two empty dicts.
784
785
        Note: After setting the analog and/or offset of the device, retrieve
786
              them again for obtaining the actual set value(s) and use that
787
              information for further processing.
788
789
        The major difference to digital signals is that analog signals are
790
        always oscillating or changing signals, otherwise you can use just
791
        digital output. In contrast to digital output levels, analog output
792
        levels are defined by an amplitude (here total signal span, denoted in
793
        Voltage peak to peak) and an offset (a value around which the signal
794
        oscillates, denoted by an (absolute) voltage).
795
796
        In general there is no bijective correspondence between
797
        (amplitude, offset) and (value high, value low)!
798
        """
799
        # Check the inputs by using the constraints...
800
        constraints = self.get_constraints()
801
        # ...and the available analog channels
802
        analog_channels = self._get_all_analog_channels()
803
804
        # amplitude sanity check
805
        if amplitude is not None:
806
            for chnl in amplitude:
807
                ch_num = int(chnl.rsplit('_ch', 1)[1])
808
                if chnl not in analog_channels:
809
                    self.log.warning('Channel to set (a_ch{0}) not available in AWG.\nSetting '
810
                                     'analogue voltage for this channel ignored.'.format(chnl))
811
                    del amplitude[chnl]
812
                if amplitude[chnl] < constraints.a_ch_amplitude.min:
813
                    self.log.warning('Minimum Vpp for channel "{0}" is {1}. Requested Vpp of {2}V '
814
                                     'was ignored and instead set to min value.'
815
                                     ''.format(chnl, constraints.a_ch_amplitude.min,
816
                                               amplitude[chnl]))
817
                    amplitude[chnl] = constraints.a_ch_amplitude.min
818
                elif amplitude[chnl] > constraints.a_ch_amplitude.max:
819
                    self.log.warning('Maximum Vpp for channel "{0}" is {1}. Requested Vpp of {2}V '
820
                                     'was ignored and instead set to max value.'
821
                                     ''.format(chnl, constraints.a_ch_amplitude.max,
822
                                               amplitude[chnl]))
823
                    amplitude[chnl] = constraints.a_ch_amplitude.max
824
        # offset sanity check
825
        if offset is not None:
826
            for chnl in offset:
827
                ch_num = int(chnl.rsplit('_ch', 1)[1])
828
                if chnl not in analog_channels:
829
                    self.log.warning('Channel to set (a_ch{0}) not available in AWG.\nSetting '
830
                                     'offset voltage for this channel ignored.'.format(chnl))
831
                    del offset[chnl]
832
                if offset[chnl] < constraints.a_ch_offset.min:
833
                    self.log.warning('Minimum offset for channel "{0}" is {1}. Requested offset of '
834
                                     '{2}V was ignored and instead set to min value.'
835
                                     ''.format(chnl, constraints.a_ch_offset.min, offset[chnl]))
836
                    offset[chnl] = constraints.a_ch_offset.min
837
                elif offset[chnl] > constraints.a_ch_offset.max:
838
                    self.log.warning('Maximum offset for channel "{0}" is {1}. Requested offset of '
839
                                     '{2}V was ignored and instead set to max value.'
840
                                     ''.format(chnl, constraints.a_ch_offset.max,
841
                                               offset[chnl]))
842
                    offset[chnl] = constraints.a_ch_offset.max
843
844
        if amplitude is not None:
845
            for a_ch in amplitude:
846
                ch_num = int(chnl.rsplit('_ch', 1)[1])
847
                self.write('SOUR{0:d}:VOLT:AMPL {1}'.format(ch_num, amplitude[a_ch]))
848
                while int(self.query('*OPC?')) != 1:
849
                    time.sleep(0.25)
850
851
        if offset is not None:
852
            for a_ch in offset:
853
                ch_num = int(chnl.rsplit('_ch', 1)[1])
854
                self.write('SOUR{0:d}:VOLT:OFFSET {1}'.format(ch_num, offset[a_ch]))
855
                while int(self.query('*OPC?')) != 1:
856
                    time.sleep(0.25)
857
        return self.get_analog_level()
858
859 View Code Duplication
    def get_digital_level(self, low=None, high=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
860
        """ Retrieve the digital low and high level of the provided channels.
861
862
        @param list low: optional, if a specific low value (in Volt) of a
863
                         channel is desired.
864
        @param list high: optional, if a specific high value (in Volt) of a
865
                          channel is desired.
866
867
        @return: (dict, dict): tuple of two dicts, with keys being the channel
868
                               number and items being the values for those
869
                               channels. Both low and high value of a channel is
870
                               denoted in (absolute) Voltage.
871
872
        Note: Do not return a saved low and/or high value but instead retrieve
873
              the current low and/or high value directly from the device.
874
875
        If no entries provided then the levels of all channels where simply
876
        returned. If no digital channels provided, return just an empty dict.
877
878
        Example of a possible input:
879
            low = ['d_ch1', 'd_ch4']
880
        to obtain the low voltage values of digital channel 1 an 4. A possible
881
        answer might be
882
            {'d_ch1': -0.5, 'd_ch4': 2.0} {}
883
        since no high request was performed.
884
885
        The major difference to analog signals is that digital signals are
886
        either ON or OFF, whereas analog channels have a varying amplitude
887
        range. In contrast to analog output levels, digital output levels are
888
        defined by a voltage, which corresponds to the ON status and a voltage
889
        which corresponds to the OFF status (both denoted in (absolute) voltage)
890
891
        In general there is no bijective correspondence between
892
        (amplitude, offset) and (value high, value low)!
893
        """
894
        # TODO: Test with multiple channel AWG
895
        low_val = {}
896
        high_val = {}
897
898
        digital_channels = self._get_all_digital_channels()
899
900
        if low is None:
901
            low = digital_channels
902
        if high is None:
903
            high = digital_channels
904
905
        # get low marker levels
906
        for chnl in low:
907
            if chnl not in digital_channels:
908
                continue
909
            d_ch_number = int(chnl.rsplit('_ch', 1)[1])
910
            a_ch_number = (1 + d_ch_number) // 2
911
            marker_index = 2 - (d_ch_number % 2)
912
            low_val[chnl] = float(
913
                self.query('SOUR{0:d}:MARK{1:d}:VOLT:LOW?'.format(a_ch_number, marker_index)))
914
        # get high marker levels
915
        for chnl in high:
916
            if chnl not in digital_channels:
917
                continue
918
            d_ch_number = int(chnl.rsplit('_ch', 1)[1])
919
            a_ch_number = (1 + d_ch_number) // 2
920
            marker_index = 2 - (d_ch_number % 2)
921
            high_val[chnl] = float(
922
                self.query('SOUR{0:d}:MARK{1:d}:VOLT:HIGH?'.format(a_ch_number, marker_index)))
923
924
        return low_val, high_val
925
926
    def set_digital_level(self, low=None, high=None):
927
        """ Set low and/or high value of the provided digital channel.
928
929
        @param dict low: dictionary, with key being the channel and items being
930
                         the low values (in volt) for the desired channel.
931
        @param dict high: dictionary, with key being the channel and items being
932
                         the high values (in volt) for the desired channel.
933
934
        @return (dict, dict): tuple of two dicts where first dict denotes the
935
                              current low value and the second dict the high
936
                              value.
937
938
        If nothing is passed then the command will return two empty dicts.
939
940
        Note: After setting the high and/or low values of the device, retrieve
941
              them again for obtaining the actual set value(s) and use that
942
              information for further processing.
943
944
        The major difference to analog signals is that digital signals are
945
        either ON or OFF, whereas analog channels have a varying amplitude
946
        range. In contrast to analog output levels, digital output levels are
947
        defined by a voltage, which corresponds to the ON status and a voltage
948
        which corresponds to the OFF status (both denoted in (absolute) voltage)
949
950
        In general there is no bijective correspondence between
951
        (amplitude, offset) and (value high, value low)!
952
        """
953
        if low is None:
954
            low = dict()
955
        if high is None:
956
            high = dict()
957
958
        #If you want to check the input use the constraints:
959
        constraints = self.get_constraints()
960
961
        for d_ch, value in low.items():
962
            #FIXME: Tell the device the proper digital voltage low value:
963
            # self.tell('SOURCE1:MARKER{0}:VOLTAGE:LOW {1}'.format(d_ch, low[d_ch]))
964
            pass
965
966
        for d_ch, value in high.items():
967
            #FIXME: Tell the device the proper digital voltage high value:
968
            # self.tell('SOURCE1:MARKER{0}:VOLTAGE:HIGH {1}'.format(d_ch, high[d_ch]))
969
            pass
970
        return self.get_digital_level()
971
972
    def get_active_channels(self, ch=None):
973
        """ Get the active channels of the pulse generator hardware.
974
975
        @param list ch: optional, if specific analog or digital channels are
976
                        needed to be asked without obtaining all the channels.
977
978
        @return dict:  where keys denoting the channel number and items boolean
979
                       expressions whether channel are active or not.
980
981
        Example for an possible input (order is not important):
982
            ch = ['a_ch2', 'd_ch2', 'a_ch1', 'd_ch5', 'd_ch1']
983
        then the output might look like
984
            {'a_ch2': True, 'd_ch2': False, 'a_ch1': False, 'd_ch5': True, 'd_ch1': False}
985
986
        If no parameters are passed to this method all channels will be asked
987
        for their setting.
988
        """
989
        # If you want to check the input use the constraints:
990
        # constraints = self.get_constraints()
991
992
        analog_channels = self._get_all_analog_channels()
993
994
        active_ch = dict()
995
        for ch_num, a_ch in enumerate(analog_channels):
996
            ch_num = ch_num + 1
997
            # check what analog channels are active
998
            active_ch[a_ch] = bool(int(self.query('OUTPUT{0:d}:STATE?'.format(ch_num))))
999
            # check how many markers are active on each channel, i.e. the DAC resolution
1000
            if active_ch[a_ch]:
1001
                digital_mrk = 10 - int(self.query('SOUR{0:d}:DAC:RES?'.format(ch_num)))
1002
                if digital_mrk == 2:
1003
                    active_ch['d_ch{0:d}'.format(ch_num * 2)] = True
1004
                    active_ch['d_ch{0:d}'.format(ch_num * 2 - 1)] = True
1005
                elif digital_mrk == 1:
1006
                    active_ch['d_ch{0:d}'.format(ch_num * 2)] = False
1007
                    active_ch['d_ch{0:d}'.format(ch_num * 2 - 1)] = True
1008
                else:
1009 View Code Duplication
                    active_ch['d_ch{0:d}'.format(ch_num * 2)] = False
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1010
                    active_ch['d_ch{0:d}'.format(ch_num * 2 - 1)] = False
1011
            else:
1012
                active_ch['d_ch{0:d}'.format(ch_num * 2)] = False
1013
                active_ch['d_ch{0:d}'.format(ch_num * 2 - 1)] = False
1014
1015
        # return either all channel information or just the one asked for.
1016
        if ch is not None:
1017
            chnl_to_delete = [chnl for chnl in active_ch if chnl not in ch]
1018
            for chnl in chnl_to_delete:
1019
                del active_ch[chnl]
1020
        return active_ch
1021
1022
    def set_active_channels(self, ch=None):
1023
        """ Set the active channels for the pulse generator hardware.
1024
1025
        @param dict ch: dictionary with keys being the analog or digital
1026
                          string generic names for the channels with items being
1027
                          a boolean value.current_loaded_asset
1028
1029
        @return dict: with the actual set values for active channels for analog
1030
                      and digital values.
1031
1032
        If nothing is passed then the command will return an empty dict.
1033
1034
        Note: After setting the active channels of the device, retrieve them
1035
              again for obtaining the actual set value(s) and use that
1036
              information for further processing.
1037
1038
        Example for possible input:
1039
            ch={'a_ch2': True, 'd_ch1': False, 'd_ch3': True, 'd_ch4': True}
1040
        to activate analog channel 2 digital channel 3 and 4 and to deactivate
1041
        digital channel 1.
1042
1043
        The hardware itself has to handle, whether separate channel activation
1044
        is possible.
1045
        """
1046
        current_channel_state = self.get_active_channels()
1047
1048
        if ch is None:
1049
            return current_channel_state
1050
1051
        if not set(current_channel_state).issuperset(ch):
1052
            self.log.error('Trying to (de)activate channels that are not present in AWG70k.\n'
1053
                           'Setting of channel activation aborted.')
1054
            return current_channel_state
1055
1056
        # Determine new channel activation states
1057
        new_channels_state = current_channel_state.copy()
1058
        for chnl in ch:
1059
            new_channels_state[chnl] = ch[chnl]
1060
1061
        # check if the channels to set are part of the activation_config constraints
1062
        constraints = self.get_constraints()
1063
        new_active_channels = {chnl for chnl in new_channels_state if new_channels_state[chnl]}
1064
        if new_active_channels not in constraints.activation_config.values():
1065
            self.log.error('activation_config to set ({0}) is not allowed according to constraints.'
1066
                           ''.format(new_active_channels))
1067
            return current_channel_state
1068
1069
        # get lists of all analog channels
1070
        analog_channels = self._get_all_analog_channels()
1071
1072
        # calculate dac resolution for each analog channel and set it in hardware.
1073
        # Also (de)activate the analog channels accordingly
1074
        max_res = constraints.dac_resolution['max']
1075
        for a_ch in analog_channels:
1076
            ach_num = int(a_ch.rsplit('_ch', 1)[1])
1077
            # determine number of markers for current a_ch
1078
            if new_channels_state['d_ch{0:d}'.format(2 * ach_num - 1)]:
1079
                marker_num = 2 if new_channels_state['d_ch{0:d}'.format(2 * ach_num)] else 1
1080
            else:
1081
                marker_num = 0
1082
            # set DAC resolution for this channel
1083
            dac_res = max_res - marker_num
1084
            self.write('SOUR{0:d}:DAC:RES {1:d}'.format(ach_num, dac_res))
1085
            # (de)activate the analog channel
1086
            if new_channels_state[a_ch]:
1087
                self.write('OUTPUT{0:d}:STATE ON'.format(ach_num))
1088
            else:
1089
                self.write('OUTPUT{0:d}:STATE OFF'.format(ach_num))
1090
1091
        return self.get_active_channels()
1092
1093
    def get_interleave(self):
1094
        """ Check whether Interleave is ON or OFF in AWG.
1095
1096
        @return bool: True: ON, False: OFF
1097
1098
        Unused for pulse generator hardware other than an AWG.
1099
        """
1100
        return False
1101
1102
    def set_interleave(self, state=False):
1103
        """ Turns the interleave of an AWG on or off.
1104
1105
        @param bool state: The state the interleave should be set to
1106
                           (True: ON, False: OFF)
1107
1108
        @return bool: actual interleave status (True: ON, False: OFF)
1109
1110
        Note: After setting the interleave of the device, retrieve the
1111
              interleave again and use that information for further processing.
1112
1113
        Unused for pulse generator hardware other than an AWG.
1114
        """
1115
        if state:
1116
            self.log.warning('Interleave mode not available for the AWG 70000 Series!\n'
1117
                             'Method call will be ignored.')
1118
        return False
1119
1120
    def has_sequence_mode(self):
1121
        """ Asks the pulse generator whether sequence mode exists.
1122
1123
        @return: bool, True for yes, False for no.
1124
        """
1125
        options = self.query('*OPT?').split(',')
1126
        return '03' in options
1127
1128
    def reset(self):
1129
        """Reset the device.
1130
1131
        @return int: error code (0:OK, -1:error)
1132
        """
1133
        self.write('*RST')
1134
        self.write('*WAI')
1135
        return 0
1136
1137
    def query(self, question):
1138
        """ Asks the device a 'question' and receive and return an answer from it.
1139
1140
        @param string question: string containing the command
1141
1142
        @return string: the answer of the device to the 'question' in a string
1143
        """
1144
        answer = self.awg.query(question)
1145
        answer = answer.strip()
1146
        answer = answer.rstrip('\n')
1147
        answer = answer.rstrip()
1148
        answer = answer.strip('"')
1149
        return answer
1150
1151
    def write(self, command):
1152
        """ Sends a command string to the device.
1153
1154
        @param string command: string containing the command
1155
1156
        @return int: error code (0:OK, -1:error)
1157
        """
1158
        bytes_written, enum_status_code = self.awg.write(command)
1159
        return int(enum_status_code)
1160
1161
    def generate_sequence(self, name, steps, tracks=1):
1162
        """
1163
        Generate a new sequence 'name' having 'steps' number of steps and 'tracks' number of tracks
1164
1165
        @param str name: Name of the sequence which should be generated
1166
        @param int steps: Number of steps
1167
        @param int tracks: Number of tracks
1168
1169
        @return 0
1170
        """
1171
        if not self.has_sequence_mode():
1172
            self.log.error('Direct sequence generation in AWG not possible. '
1173
                           'Sequencer option not installed.')
1174
            return -1
1175
1176
        if name in self.get_sequence_names():
1177
            self.delete_sequence(name)
1178
        self.write('SLIS:SEQ:NEW "{0}", {1:d}, {2:d}'.format(name, steps, tracks))
1179
        self.write('SLIS:SEQ:EVEN:JTIM "{0}", IMM'.format(name))
1180
        return 0
1181
1182
    def add_waveform2sequence(self, sequence_name, waveform_name, step, track, repeat):
1183
        """
1184
        Add the waveform 'waveform_name' to position 'step' in the sequence 'sequence_name' and
1185
        repeat it 'repeat' times
1186
1187
        @param str sequence_name: Name of the sequence which should be editted
1188
        @param str waveform_name: Name of the waveform which should be added
1189
        @param int step: Position of the added waveform
1190
        @param int track: track which should be editted
1191
        @param int repeat: number of repetition of added waveform
1192
1193
        @return 0
1194
        """
1195
        if not self.has_sequence_mode():
1196
            self.log.error('Direct sequence generation in AWG not possible. '
1197
                           'Sequencer option not installed.')
1198
            return -1
1199
1200
        self.write('SLIS:SEQ:STEP{0:d}:TASS{1:d}:WAV "{2}", "{3}"'.format(
1201
            step, track, sequence_name, waveform_name))
1202
        self.write('SLIST:SEQUENCE:STEP{0:d}:RCOUNT "{1}", {2}'.format(step, sequence_name, repeat))
1203
        return 0
1204
1205
    def make_sequence_continuous(self, sequencename):
1206
        """
1207
        Usually after a run of a sequence the output stops. Many times it is desired that the full
1208
        sequence is repeated many times. This is achieved here by setting the 'jump to' value of
1209
        the last element to 'First'
1210
1211
        @param sequencename: Name of the sequence which should be made continous
1212
1213
        @return int last_step: The step number which 'jump to' has to be set to 'First'
1214
        """
1215
        if not self.has_sequence_mode():
1216
            self.log.error('Direct sequence generation in AWG not possible. '
1217
                           'Sequencer option not installed.')
1218
            return -1
1219
1220
        last_step = int(self.query('SLIS:SEQ:LENG? "{0}"'.format(sequencename)))
1221
        self.write('SLIS:SEQ:STEP{0:d}:GOTO "{1}",  FIRST'.format(last_step, sequencename))
1222
        return last_step
1223
1224
    def force_jump_sequence(self, final_step, channel=1):
1225
        """
1226
        This command forces the sequencer to jump to the specified step per channel. A
1227
        force jump does not require a trigger event to execute the jump.
1228
        For two channel instruments, if both channels are playing the same sequence, then
1229
        both channels jump simultaneously to the same sequence step.
1230
1231
        @param channel: determines the channel number. If omitted, interpreted as 1
1232
        @param final_step: Step to jump to. Possible options are
1233
            FIRSt - This enables the sequencer to jump to first step in the sequence.
1234
            CURRent - This enables the sequencer to jump to the current sequence step,
1235
            essentially starting the current step over.
1236
            LAST - This enables the sequencer to jump to the last step in the sequence.
1237
            END - This enables the sequencer to go to the end and play 0 V until play is
1238
            stopped.
1239
            <NR1> - This enables the sequencer to jump to the specified step, where the
1240
            value is between 1 and 16383.
1241
1242
        """
1243
        self.write('SOURCE{0:d}:JUMP:FORCE {1}'.format(channel, final_step))
1244
        return
1245
1246
    def _get_all_channels(self):
1247
        """
1248
        Helper method to return a sorted list of all technically available channel descriptors
1249
        (e.g. ['a_ch1', 'a_ch2', 'd_ch1', 'd_ch2'])
1250
1251
        @return list: Sorted list of channels
1252
        """
1253
        configs = self.get_constraints().activation_config
1254
        if 'all' in configs:
1255
            largest_config = configs['all']
1256
        else:
1257
            largest_config = list(configs.values())[0]
1258
            for config in configs.values():
1259
                if len(largest_config) < len(config):
1260
                    largest_config = config
1261
        return sorted(largest_config)
1262
1263
    def _get_all_analog_channels(self):
1264
        """
1265
        Helper method to return a sorted list of all technically available analog channel
1266
        descriptors (e.g. ['a_ch1', 'a_ch2'])
1267
1268
        @return list: Sorted list of analog channels
1269
        """
1270
        return [chnl for chnl in self._get_all_channels() if chnl.startswith('a')]
1271
1272
    def _get_all_digital_channels(self):
1273
        """
1274
        Helper method to return a sorted list of all technically available digital channel
1275
        descriptors (e.g. ['d_ch1', 'd_ch2'])
1276
1277
        @return list: Sorted list of digital channels
1278
        """
1279
        return [chnl for chnl in self._get_all_channels() if chnl.startswith('d')]
1280
1281
    def _is_output_on(self):
1282
        """
1283
        Aks the AWG if the output is enabled, i.e. if the AWG is running
1284
1285
        @return: bool, (True: output on, False: output off)
1286
        """
1287
        return bool(int(self.query('AWGC:RST?')))
1288
1289
    def _get_filenames_on_device(self):
1290
        """
1291
1292
        @return list: filenames found in <ftproot>\\waves
1293
        """
1294
        filename_list = list()
1295
        with FTP(self._ip_address) as ftp:
1296 View Code Duplication
            ftp.login(user=self._username, passwd=self._password)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1297
            ftp.cwd(self.ftp_working_dir)
1298
            # get only the files from the dir and skip possible directories
1299
            log = list()
1300
            ftp.retrlines('LIST', callback=log.append)
1301
            for line in log:
1302
                if '<DIR>' not in line:
1303
                    # that is how a potential line is looking like:
1304
                    #   '05-10-16  05:22PM                  292 SSR aom adjusted.seq'
1305
                    # The first part consists of the date information. Remove this information and
1306
                    # separate the first number, which indicates the size of the file. This is
1307
                    # necessary if the filename contains whitespaces.
1308
                    size_filename = line[18:].lstrip()
1309
                    # split after the first appearing whitespace and take the rest as filename.
1310
                    # Remove for safety all trailing and leading whitespaces:
1311
                    filename = size_filename.split(' ', 1)[1].strip()
1312
                    filename_list.append(filename)
1313
        return filename_list
1314
1315
    def _delete_file(self, filename):
1316
        """
1317
1318
        @param str filename:
1319
        """
1320
        if filename in self._get_filenames_on_device():
1321
            with FTP(self._ip_address) as ftp:
1322
                ftp.login(user=self._username, passwd=self._password)
1323
                ftp.cwd(self.ftp_working_dir)
1324
                ftp.delete(filename)
1325
        return
1326
1327
    def _send_file(self, filename):
1328
        """
1329
1330
        @param filename:
1331
        @return:
1332
        """
1333
        # check input
1334 View Code Duplication
        if not filename:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1335
            self.log.error('No filename provided for file upload to awg!\nCommand will be ignored.')
1336
            return -1
1337
1338
        filepath = os.path.join(self._tmp_work_dir, filename)
1339
        if not os.path.isfile(filepath):
1340
            self.log.error('No file "{0}" found in "{1}". Unable to upload!'
1341
                           ''.format(filename, self._tmp_work_dir))
1342
            return -1
1343
1344
        # Delete old file on AWG by the same filename
1345
        self._delete_file(filename)
1346
1347
        # Transfer file
1348
        with FTP(self._ip_address) as ftp:
1349
            ftp.login(user=self._username, passwd=self._password)
1350
            ftp.cwd(self.ftp_working_dir)
1351
            with open(filepath, 'rb') as file:
1352
                ftp.storbinary('STOR ' + filename, file)
1353
        return 0
1354
1355
    def _write_wfmx(self, filename, analog_samples, marker_bytes, is_first_chunk, is_last_chunk,
1356
                    total_number_of_samples):
1357
        """
1358
        Appends a sampled chunk of a whole waveform to a wfmx-file. Create the file
1359
        if it is the first chunk.
1360
        If both flags (is_first_chunk, is_last_chunk) are set to TRUE it means
1361
        that the whole ensemble is written as a whole in one big chunk.
1362
1363
        @param name: string, represents the name of the sampled ensemble
1364
        @param analog_samples: dict containing float32 numpy ndarrays, contains the
1365
                                       samples for the analog channels that
1366
                                       are to be written by this function call.
1367
        @param marker_bytes: np.ndarray containing bool numpy ndarrays, contains the samples
1368
                                      for the digital channels that
1369
                                      are to be written by this function call.
1370
        @param total_number_of_samples: int, The total number of samples in the
1371
                                        entire waveform. Has to be known in advance.
1372
        @param is_first_chunk: bool, indicates if the current chunk is the
1373
                               first write to this file.
1374
        @param is_last_chunk: bool, indicates if the current chunk is the last
1375
                              write to this file.
1376
1377
        @return list: the list contains the string names of the created files for the passed
1378
                      presampled arrays
1379
        """
1380
        # The memory overhead of the tmp file write/read process in bytes. Only used if wfmx file is
1381
        # written in chunks in order to avoid excessive memory usage.
1382
        tmp_bytes_overhead = 16777216  # 16 MB
1383
1384
        if not filename.endswith('.wfmx'):
1385
            filename += '.wfmx'
1386
        wfmx_path = os.path.join(self._tmp_work_dir, filename)
1387
        tmp_path = os.path.join(self._tmp_work_dir, 'digital_tmp.bin')
1388
1389
        # if it is the first chunk, create the .WFMX file with header.
1390
        if is_first_chunk:
1391
            # create header
1392
            header = self._create_xml_header(total_number_of_samples, marker_bytes is not None)
1393
            # write header
1394
            with open(wfmx_path, 'wb') as wfmxfile:
1395
                wfmxfile.write(header)
1396
            # Check if a tmp digital samples file is present and delete it if necessary.
1397
            if os.path.isfile(tmp_path):
1398
                os.remove(tmp_path)
1399
1400
        # append analog samples to the .WFMX file.
1401
        # Write digital samples in temporary file if not the entire samples are passed at once.
1402
        with open(wfmx_path, 'ab') as wfmxfile:
1403
            # append analog samples in binary format. One sample is 4 bytes (np.float32).
1404
            wfmxfile.write(analog_samples)
1405
1406
        # Write digital samples to tmp file if chunkwise writing is used and it's not the last chunk
1407
        if not is_last_chunk and marker_bytes is not None:
1408
            with open(tmp_path, 'ab') as tmp_file:
1409
                tmp_file.write(marker_bytes)
1410
1411
        # If this is the last chunk, write digital samples from tmp file to wfmx file (if present)
1412
        # and also append the currently passed digital samples to wfmx file.
1413
        # Read from tmp file in chunks of tmp_bytes_overhead in order to avoid too much memory
1414
        # overhead.
1415
        if is_last_chunk and marker_bytes is not None:
1416
            with open(wfmx_path, 'ab') as wfmxfile:
1417
                # Copy over digital samples from tmp file. Delete tmp file afterwards.
1418
                if os.path.isfile(tmp_path):
1419
                    with open(tmp_path, 'rb') as tmp_file:
1420
                        while True:
1421
                            tmp = tmp_file.read(tmp_bytes_overhead)
1422
                            if not tmp:
1423
                                break
1424
                            wfmxfile.write(tmp)
1425
                    os.remove(tmp_path)
1426
                # Append current digital samples array to wfmx file
1427
                wfmxfile.write(marker_bytes)
1428
        return
1429
1430
    def _create_xml_header(self, number_of_samples, markers_active):
1431
        """
1432
        This function creates an xml file containing the header for the wfmx-file format using
1433
        etree.
1434
        """
1435
        hdr = ET.Element('DataFile', offset='XXXXXXXXX', version='0.1')
1436
        dsc = ET.SubElement(hdr, 'DataSetsCollection', xmlns='http://www.tektronix.com')
1437
        datasets = ET.SubElement(dsc, 'DataSets', version='1', xmlns='http://www.tektronix.com')
1438
        datadesc = ET.SubElement(datasets, 'DataDescription')
1439
        sub_elem = ET.SubElement(datadesc, 'NumberSamples')
1440
        sub_elem.text = str(int(number_of_samples))
1441
        sub_elem = ET.SubElement(datadesc, 'SamplesType')
1442
        sub_elem.text = 'AWGWaveformSample'
1443
        sub_elem = ET.SubElement(datadesc, 'MarkersIncluded')
1444
        sub_elem.text = 'true' if markers_active else 'false'
1445
        sub_elem = ET.SubElement(datadesc, 'NumberFormat')
1446
        sub_elem.text = 'Single'
1447
        sub_elem = ET.SubElement(datadesc, 'Endian')
1448
        sub_elem.text = 'Little'
1449
        sub_elem = ET.SubElement(datadesc, 'Timestamp')
1450
        sub_elem.text = '2014-10-28T12:59:52.9004865-07:00'
1451
        prodspec = ET.SubElement(datasets, 'ProductSpecific', name='')
1452
        sub_elem = ET.SubElement(prodspec, 'ReccSamplingRate', units='Hz')
1453
        sub_elem.text = str(self.get_sample_rate())
1454
        sub_elem = ET.SubElement(prodspec, 'ReccAmplitude', units='Volts')
1455
        sub_elem.text = '0.5'
1456
        sub_elem = ET.SubElement(prodspec, 'ReccOffset', units='Volts')
1457
        sub_elem.text = '0'
1458
        sub_elem = ET.SubElement(prodspec, 'SerialNumber')
1459
        sub_elem = ET.SubElement(prodspec, 'SoftwareVersion')
1460
        sub_elem.text = '4.0.0075'
1461
        sub_elem = ET.SubElement(prodspec, 'UserNotes')
1462
        sub_elem = ET.SubElement(prodspec, 'OriginalBitDepth')
1463
        sub_elem.text = 'Floating'
1464
        sub_elem = ET.SubElement(prodspec, 'Thumbnail')
1465
        sub_elem = ET.SubElement(prodspec, 'CreatorProperties', name='Basic Waveform')
1466
        sub_elem = ET.SubElement(hdr, 'Setup')
1467
1468
        xml_header = ET.tostring(hdr, encoding='unicode')
1469
        xml_header = xml_header.replace('><', '>\r\n<')
1470
1471
        # Calculates the length of the header and replace placeholder with actual number
1472
        xml_header = xml_header.replace('XXXXXXXXX', str(len(xml_header)).zfill(9))
1473
        return xml_header
1474