Issues (227)

awips/dataaccess/DataQueue.py (1 issue)

1
#
2
#   Convenience class for using the DAF's notifications feature. This is a
3
#   collection that, once connected to EDEX by calling start(), fills with
4
#   data as notifications come in. Runs on a separate thread to allow
5
#   non-blocking data retrieval.
6
#
7
#
8
#
9
#     SOFTWARE HISTORY
10
#
11
#    Date            Ticket#       Engineer       Description
12
#    ------------    ----------    -----------    --------------------------
13
#    07/29/16        2416          tgurney        Initial creation
14
#
15
16
from awips.dataaccess import DataNotificationLayer as DNL
17
18
import time
19
from threading import Thread
20
import sys
21
22
23
if sys.version_info.major == 2:
24
    from Queue import Queue, Empty
25
else:  # Python 3 module renamed to 'queue'
26
    from queue import Queue, Empty
27
28
# Used to indicate a DataQueue that will produce geometry data.
29
GEOMETRY = object()
30
31
# Used to indicate a DataQueue that will produce grid data.
32
GRID = object()
33
34
# Default maximum queue size.
35
_DEFAULT_MAXSIZE = 100
36
37
38
class Closed(Exception):
39
    """Raised when attempting to get data from a closed queue."""
40
    pass
41
42
43 View Code Duplication
class DataQueue(object):
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
44
45
    """
46
    Convenience class for using the DAF's notifications feature. This is a
47
    collection that, once connected to EDEX by calling start(), fills with
48
    data as notifications come in.
49
50
    Example for getting obs data:
51
52
      from DataQueue import DataQueue, GEOMETRY
53
      request = DataAccessLayer.newDataRequest('obs')
54
      request.setParameters('temperature')
55
      request.setLocationNames('KOMA')
56
      q = DataQueue(GEOMETRY, request)
57
      q.start()
58
      for item in q:
59
          print(item.getNumber('temperature'))
60
    """
61
62
    def __init__(self, dtype, request, maxsize=_DEFAULT_MAXSIZE):
63
        """
64
        Create a new DataQueue.
65
66
        Args:
67
            dtype: Either GRID or GEOMETRY; must match the type of data
68
              requested.
69
            request: IDataRequest describing the data you want. It must at
70
              least have datatype set. All data produced will satisfy the
71
              constraints you specify.
72
            maxsize: Maximum number of data objects the queue can hold at
73
              one time. If the limit is reached, any data coming in after
74
              that will not appear until one or more items are removed using
75
              DataQueue.get().
76
        """
77
        assert maxsize > 0
78
        assert dtype in (GEOMETRY, GRID)
79
        self._maxsize = maxsize
80
        self._queue = Queue(maxsize=maxsize)
81
        self._thread = None
82
        if dtype is GEOMETRY:
83
            self._notifier = DNL.getGeometryDataUpdates(request)
84
        elif dtype is GRID:
85
            self._notifier = DNL.getGridDataUpdates(request)
86
87
    def start(self):
88
        """Start listening for notifications and requesting data."""
89
        if self._thread is not None:
90
            # Already started
91
            return
92
        kwargs = {'callback': self._data_received}
93
        self._thread = Thread(target=self._notifier.subscribe, kwargs=kwargs)
94
        self._thread.daemon = True
95
        self._thread.start()
96
        timer = 0
97
        while not self._notifier.subscribed:
98
            time.sleep(0.1)
99
            timer += 1
100
            if timer >= 100:  # ten seconds
101
                raise RuntimeError('timed out when attempting to subscribe')
102
103
    def _data_received(self, data):
104
        for d in data:
105
            if not isinstance(d, list):
106
                d = [d]
107
            for item in d:
108
                self._queue.put(item)
109
110
    def get(self, block=True, timeout=None):
111
        """
112
        Get and return the next available data object. By default, if there is
113
        no data yet available, this method will not return until data becomes
114
        available.
115
116
        Args:
117
            block: Specifies behavior when the queue is empty. If True, wait
118
              until an item is available before returning (the default). If
119
              False, return None immediately if the queue is empty.
120
            timeout: If block is True, wait this many seconds, and return None
121
              if data is not received in that time.
122
        Returns:
123
            IData
124
        """
125
        if self.closed:
126
            raise Closed
127
        try:
128
            return self._queue.get(block, timeout)
129
        except Empty:
130
            return None
131
132
    def get_all(self):
133
        """
134
        Get all data waiting for processing, in a single list. Always returns
135
        immediately. Returns an empty list if no data has arrived yet.
136
137
        Returns:
138
            List of IData
139
        """
140
        data = []
141
        for _ in range(self._maxsize):
142
            next_item = self.get(False)
143
            if next_item is None:
144
                break
145
            data.append(next_item)
146
        return data
147
148
    def close(self):
149
        """Close the queue. May not be re-opened after closing."""
150
        if not self.closed:
151
            self._notifier.close()
152
        self._thread.join()
153
154
    def qsize(self):
155
        """Return number of items in the queue."""
156
        return self._queue.qsize()
157
158
    def empty(self):
159
        """Return True if the queue is empty."""
160
        return self._queue.empty()
161
162
    def full(self):
163
        """Return True if the queue is full."""
164
        return self._queue.full()
165
166
    @property
167
    def closed(self):
168
        """True if the queue has been closed."""
169
        return not self._notifier.subscribed
170
171
    @property
172
    def maxsize(self):
173
        """
174
        Maximum number of data objects the queue can hold at one time.
175
        If this limit is reached, any data coming in after that will not appear
176
        until one or more items are removed using get().
177
        """
178
        return self._maxsize
179
180
    def __iter__(self):
181
        if self._thread is not None:
182
            while not self.closed:
183
                yield self.get()
184
185
    def __enter__(self):
186
        self.start()
187
        return self
188
189
    def __exit__(self, *unused):
190
        self.close()
191