1 | # |
||
2 | # Convenience class for using the DAF's notifications feature. This is a |
||
3 | # collection that, once connected to EDEX by calling start(), fills with |
||
4 | # data as notifications come in. Runs on a separate thread to allow |
||
5 | # non-blocking data retrieval. |
||
6 | # |
||
7 | # |
||
8 | # |
||
9 | # SOFTWARE HISTORY |
||
10 | # |
||
11 | # Date Ticket# Engineer Description |
||
12 | # ------------ ---------- ----------- -------------------------- |
||
13 | # 07/29/16 2416 tgurney Initial creation |
||
14 | # |
||
15 | |||
16 | from awips.dataaccess import DataNotificationLayer as DNL |
||
17 | |||
18 | import time |
||
19 | from threading import Thread |
||
20 | import sys |
||
21 | |||
22 | |||
23 | if sys.version_info.major == 2: |
||
24 | from Queue import Queue, Empty |
||
25 | else: # Python 3 module renamed to 'queue' |
||
26 | from queue import Queue, Empty |
||
27 | |||
28 | # Used to indicate a DataQueue that will produce geometry data. |
||
29 | GEOMETRY = object() |
||
30 | |||
31 | # Used to indicate a DataQueue that will produce grid data. |
||
32 | GRID = object() |
||
33 | |||
34 | # Default maximum queue size. |
||
35 | _DEFAULT_MAXSIZE = 100 |
||
36 | |||
37 | |||
38 | class Closed(Exception): |
||
39 | """Raised when attempting to get data from a closed queue.""" |
||
40 | pass |
||
41 | |||
42 | |||
43 | View Code Duplication | class DataQueue(object): |
|
0 ignored issues
–
show
Duplication
introduced
by
![]() |
|||
44 | |||
45 | """ |
||
46 | Convenience class for using the DAF's notifications feature. This is a |
||
47 | collection that, once connected to EDEX by calling start(), fills with |
||
48 | data as notifications come in. |
||
49 | |||
50 | Example for getting obs data: |
||
51 | |||
52 | from DataQueue import DataQueue, GEOMETRY |
||
53 | request = DataAccessLayer.newDataRequest('obs') |
||
54 | request.setParameters('temperature') |
||
55 | request.setLocationNames('KOMA') |
||
56 | q = DataQueue(GEOMETRY, request) |
||
57 | q.start() |
||
58 | for item in q: |
||
59 | print(item.getNumber('temperature')) |
||
60 | """ |
||
61 | |||
62 | def __init__(self, dtype, request, maxsize=_DEFAULT_MAXSIZE): |
||
63 | """ |
||
64 | Create a new DataQueue. |
||
65 | |||
66 | Args: |
||
67 | dtype: Either GRID or GEOMETRY; must match the type of data |
||
68 | requested. |
||
69 | request: IDataRequest describing the data you want. It must at |
||
70 | least have datatype set. All data produced will satisfy the |
||
71 | constraints you specify. |
||
72 | maxsize: Maximum number of data objects the queue can hold at |
||
73 | one time. If the limit is reached, any data coming in after |
||
74 | that will not appear until one or more items are removed using |
||
75 | DataQueue.get(). |
||
76 | """ |
||
77 | assert maxsize > 0 |
||
78 | assert dtype in (GEOMETRY, GRID) |
||
79 | self._maxsize = maxsize |
||
80 | self._queue = Queue(maxsize=maxsize) |
||
81 | self._thread = None |
||
82 | if dtype is GEOMETRY: |
||
83 | self._notifier = DNL.getGeometryDataUpdates(request) |
||
84 | elif dtype is GRID: |
||
85 | self._notifier = DNL.getGridDataUpdates(request) |
||
86 | |||
87 | def start(self): |
||
88 | """Start listening for notifications and requesting data.""" |
||
89 | if self._thread is not None: |
||
90 | # Already started |
||
91 | return |
||
92 | kwargs = {'callback': self._data_received} |
||
93 | self._thread = Thread(target=self._notifier.subscribe, kwargs=kwargs) |
||
94 | self._thread.daemon = True |
||
95 | self._thread.start() |
||
96 | timer = 0 |
||
97 | while not self._notifier.subscribed: |
||
98 | time.sleep(0.1) |
||
99 | timer += 1 |
||
100 | if timer >= 100: # ten seconds |
||
101 | raise RuntimeError('timed out when attempting to subscribe') |
||
102 | |||
103 | def _data_received(self, data): |
||
104 | for d in data: |
||
105 | if not isinstance(d, list): |
||
106 | d = [d] |
||
107 | for item in d: |
||
108 | self._queue.put(item) |
||
109 | |||
110 | def get(self, block=True, timeout=None): |
||
111 | """ |
||
112 | Get and return the next available data object. By default, if there is |
||
113 | no data yet available, this method will not return until data becomes |
||
114 | available. |
||
115 | |||
116 | Args: |
||
117 | block: Specifies behavior when the queue is empty. If True, wait |
||
118 | until an item is available before returning (the default). If |
||
119 | False, return None immediately if the queue is empty. |
||
120 | timeout: If block is True, wait this many seconds, and return None |
||
121 | if data is not received in that time. |
||
122 | Returns: |
||
123 | IData |
||
124 | """ |
||
125 | if self.closed: |
||
126 | raise Closed |
||
127 | try: |
||
128 | return self._queue.get(block, timeout) |
||
129 | except Empty: |
||
130 | return None |
||
131 | |||
132 | def get_all(self): |
||
133 | """ |
||
134 | Get all data waiting for processing, in a single list. Always returns |
||
135 | immediately. Returns an empty list if no data has arrived yet. |
||
136 | |||
137 | Returns: |
||
138 | List of IData |
||
139 | """ |
||
140 | data = [] |
||
141 | for _ in range(self._maxsize): |
||
142 | next_item = self.get(False) |
||
143 | if next_item is None: |
||
144 | break |
||
145 | data.append(next_item) |
||
146 | return data |
||
147 | |||
148 | def close(self): |
||
149 | """Close the queue. May not be re-opened after closing.""" |
||
150 | if not self.closed: |
||
151 | self._notifier.close() |
||
152 | self._thread.join() |
||
153 | |||
154 | def qsize(self): |
||
155 | """Return number of items in the queue.""" |
||
156 | return self._queue.qsize() |
||
157 | |||
158 | def empty(self): |
||
159 | """Return True if the queue is empty.""" |
||
160 | return self._queue.empty() |
||
161 | |||
162 | def full(self): |
||
163 | """Return True if the queue is full.""" |
||
164 | return self._queue.full() |
||
165 | |||
166 | @property |
||
167 | def closed(self): |
||
168 | """True if the queue has been closed.""" |
||
169 | return not self._notifier.subscribed |
||
170 | |||
171 | @property |
||
172 | def maxsize(self): |
||
173 | """ |
||
174 | Maximum number of data objects the queue can hold at one time. |
||
175 | If this limit is reached, any data coming in after that will not appear |
||
176 | until one or more items are removed using get(). |
||
177 | """ |
||
178 | return self._maxsize |
||
179 | |||
180 | def __iter__(self): |
||
181 | if self._thread is not None: |
||
182 | while not self.closed: |
||
183 | yield self.get() |
||
184 | |||
185 | def __enter__(self): |
||
186 | self.start() |
||
187 | return self |
||
188 | |||
189 | def __exit__(self, *unused): |
||
190 | self.close() |
||
191 |