1 | # |
||
2 | # Published interface for retrieving data updates via awips.dataaccess package |
||
3 | # |
||
4 | # |
||
5 | # SOFTWARE HISTORY |
||
6 | # |
||
7 | # Date Ticket# Engineer Description |
||
8 | # ------------ ---------- ----------- -------------------------- |
||
9 | # May 26, 2016 2416 rjpeter Initial Creation. |
||
10 | # Aug 1, 2016 2416 tgurney Finish implementation |
||
11 | # |
||
12 | # |
||
13 | |||
14 | """ |
||
15 | Interface for the DAF's data notification feature, which allows continuous |
||
16 | retrieval of new data as it is coming into the system. |
||
17 | |||
18 | There are two ways to access this feature: |
||
19 | |||
20 | 1. The DataQueue module (awips.dataaccess.DataQueue) offers a collection that |
||
21 | automatically fills up with new data as it receives notifications. See that |
||
22 | module for more information. |
||
23 | |||
24 | 2. Depending on the type of data you want, use either getGridDataUpdates() or |
||
25 | getGeometryDataUpdates() in this module. Either one will give you back an |
||
26 | object that will retrieve new data for you and will call a function you specify |
||
27 | each time new data is received. |
||
28 | |||
29 | Example code follows. This example prints temperature as observed from KOMA |
||
30 | each time a METAR is received from there. |
||
31 | |||
32 | from awips.dataaccess import DataAccessLayer as DAL |
||
33 | from awips.dataaccess import DataNotificationLayer as DNL |
||
34 | |||
35 | def process_obs(list_of_data): |
||
36 | for item in list_of_data: |
||
37 | print(item.getNumber('temperature')) |
||
38 | |||
39 | request = DAL.newDataRequest('obs') |
||
40 | request.setParameters('temperature') |
||
41 | request.setLocationNames('KOMA') |
||
42 | |||
43 | notifier = DNL.getGeometryDataUpdates(request) |
||
44 | notifier.subscribe(process_obs) |
||
45 | # process_obs will called with a list of data each time new data comes in |
||
46 | |||
47 | """ |
||
48 | |||
49 | import re |
||
50 | import sys |
||
51 | from awips.dataaccess.PyGeometryNotification import PyGeometryNotification |
||
52 | from awips.dataaccess.PyGridNotification import PyGridNotification |
||
53 | |||
54 | |||
55 | THRIFT_HOST = "edex" |
||
56 | |||
57 | USING_NATIVE_THRIFT = False |
||
58 | |||
59 | JMS_HOST_PATTERN = re.compile('tcp://([^:]+):([0-9]+)') |
||
60 | |||
61 | if 'jep' in sys.modules: |
||
62 | # intentionally do not catch if this fails to import, we want it to |
||
63 | # be obvious that something is configured wrong when running from within |
||
64 | # Java instead of allowing false confidence and fallback behavior |
||
65 | import JepRouter |
||
66 | router = JepRouter |
||
67 | else: |
||
68 | from awips.dataaccess import ThriftClientRouter |
||
69 | router = ThriftClientRouter.ThriftClientRouter(THRIFT_HOST) |
||
70 | USING_NATIVE_THRIFT = True |
||
71 | |||
72 | |||
73 | def _getJmsConnectionInfo(notifFilterResponse): |
||
74 | serverString = notifFilterResponse.getJmsConnectionInfo() |
||
75 | try: |
||
76 | host, port = JMS_HOST_PATTERN.match(serverString).groups() |
||
77 | except AttributeError: |
||
78 | raise RuntimeError('Got bad JMS connection info from server: ' + serverString) |
||
79 | return {'host': host, 'port': port} |
||
80 | |||
81 | |||
82 | def getGridDataUpdates(request): |
||
83 | """ |
||
84 | Get a notification object that receives updates to grid data. |
||
85 | |||
86 | Args: |
||
87 | request: the IDataRequest specifying the data you want to receive |
||
88 | |||
89 | Returns: |
||
90 | an update request object that you can listen for updates to by |
||
91 | calling its subscribe() method |
||
92 | """ |
||
93 | response = router.getNotificationFilter(request) |
||
94 | notificationFilter = response.getNotificationFilter() |
||
95 | jmsInfo = _getJmsConnectionInfo(response) |
||
96 | notifier = PyGridNotification(request, notificationFilter, |
||
97 | requestHost=THRIFT_HOST, **jmsInfo) |
||
98 | return notifier |
||
99 | |||
100 | |||
101 | def getGeometryDataUpdates(request): |
||
102 | """ |
||
103 | Get a notification object that receives updates to geometry data. |
||
104 | |||
105 | Args: |
||
106 | request: the IDataRequest specifying the data you want to receive |
||
107 | |||
108 | Returns: |
||
109 | an update request object that you can listen for updates to by |
||
110 | calling its subscribe() method |
||
111 | """ |
||
112 | response = router.getNotificationFilter(request) |
||
113 | notificationFilter = response.getNotificationFilter() |
||
114 | jmsInfo = _getJmsConnectionInfo(response) |
||
115 | notifier = PyGeometryNotification(request, notificationFilter, |
||
116 | requestHost=THRIFT_HOST, **jmsInfo) |
||
117 | return notifier |
||
118 | |||
119 | |||
120 | def changeEDEXHost(newHostName): |
||
121 | """ |
||
122 | Changes the EDEX host the Data Access Framework is communicating with. Only |
||
123 | works if using the native Python client implementation, otherwise, this |
||
124 | method will throw a TypeError. |
||
125 | |||
126 | Args: |
||
127 | newHostName: the EDEX host to connect to |
||
128 | """ |
||
129 | if USING_NATIVE_THRIFT: |
||
130 | global THRIFT_HOST |
||
131 | THRIFT_HOST = newHostName |
||
132 | global router |
||
133 | router = ThriftClientRouter.ThriftClientRouter(THRIFT_HOST) |
||
0 ignored issues
–
show
introduced
by
![]() |
|||
134 | else: |
||
135 | raise TypeError("Cannot call changeEDEXHost when using JepRouter.") |
||
136 |