1 | # =============================================================================== |
||
2 | # qpidingest.py |
||
3 | # |
||
4 | # @author: Aaron Anderson |
||
5 | # @organization: NOAA/WDTB OU/CIMMS |
||
6 | # @version: 1.0 02/19/2010 |
||
7 | # @requires: QPID Python Client available from http://qpid.apache.org/download.html |
||
8 | # The Python Client is located under Single Component Package/Client |
||
9 | # |
||
10 | # From the README.txt Installation Instructions |
||
11 | # = INSTALLATION = |
||
12 | # Extract the release archive into a directory of your choice and set |
||
13 | # your PYTHONPATH accordingly: |
||
14 | # |
||
15 | # tar -xzf qpid-python-<version>.tar.gz -C <install-prefix> |
||
16 | # export PYTHONPATH=<install-prefix>/qpid-<version>/python |
||
17 | # |
||
18 | # ***EDEX and QPID must be running for this module to work*** |
||
19 | # |
||
20 | # DESCRIPTION: |
||
21 | # This module is used to connect to QPID and send messages to the external.dropbox queue |
||
22 | # which tells EDEX to ingest a data file from a specified path. This avoids having to copy |
||
23 | # a data file into an endpoint. Each message also contains a header which is used to determine |
||
24 | # which plugin should be used to decode the file. Each plugin has an xml file located in |
||
25 | # $EDEX_HOME/data/utility/edex_static/base/distribution that contains regular expressions |
||
26 | # that the header is compared to. When the header matches one of these regular expressions |
||
27 | # the file is decoded with that plugin. If you make changes to one of these xml files you |
||
28 | # must restart EDEX for the changes to take effect. |
||
29 | # |
||
30 | # NOTE: If the message is being sent but you do not see it being ingested in the EDEX log |
||
31 | # check the xml files to make sure the header you are passing matches one of the regular |
||
32 | # expressions. Beware of spaces, some regular expressions require spaces while others use |
||
33 | # a wildcard character so a space is optional. It seems you are better off having the space |
||
34 | # as this will be matched to both patterns. For the file in the example below, |
||
35 | # 20100218_185755_SAUS46KLOX.metar, I use SAUS46 KLOX as the header to make sure it matches. |
||
36 | # |
||
37 | # |
||
38 | # EXAMPLE: |
||
39 | # Simple example program: |
||
40 | # |
||
41 | # ------------------------------------------------------------------------------ |
||
42 | # import qpidingest |
||
43 | # #Tell EDEX to ingest a metar file from data_store. The filepath is |
||
44 | # #/data_store/20100218/metar/00/standard/20100218_005920_SAUS46KSEW.metar |
||
45 | # |
||
46 | # conn=qpidingest.IngestViaQPID() #defaults to localhost port 5672 |
||
47 | # |
||
48 | # #If EDEX is not on the local machine you can make the connection as follows |
||
49 | # #conn=qpidingest.IngestViaQPID(host='<MACHINE NAME>',port=<PORT NUMBER>) |
||
50 | # |
||
51 | # conn.sendmessage('/data_store/20100218/metar/18/standard/20100218_185755_SAUS46KLOX.metar','SAUS46 KLOX') |
||
52 | # conn.close() |
||
53 | # ------------------------------------------------------------------------------- |
||
54 | # |
||
55 | # SOFTWARE HISTORY |
||
56 | # |
||
57 | # Date Ticket# Engineer Description |
||
58 | # ------------ ---------- ----------- -------------------------- |
||
59 | # .... |
||
60 | # 06/13/2013 DR 16242 D. Friedman Add Qpid authentication info |
||
61 | # 03/06/2014 DR 17907 D. Friedman Workaround for issue QPID-5569 |
||
62 | # 02/16/2017 DR 6084 bsteffen Support ssl connections |
||
63 | # |
||
64 | # =============================================================================== |
||
65 | |||
66 | import os |
||
67 | import os.path |
||
68 | |||
69 | import qpid |
||
70 | from qpid.util import connect |
||
71 | from qpid.connection import Connection |
||
72 | from qpid.datatypes import Message, uuid4 |
||
73 | |||
74 | QPID_USERNAME = 'guest' |
||
75 | QPID_PASSWORD = 'guest' |
||
76 | |||
77 | |||
78 | View Code Duplication | class IngestViaQPID: |
|
0 ignored issues
–
show
Duplication
introduced
by
![]() |
|||
79 | def __init__(self, host='localhost', port=5672, ssl=None): |
||
80 | """ |
||
81 | Connect to QPID and make bindings to route message to external.dropbox queue |
||
82 | @param host: string hostname of computer running EDEX and QPID (default localhost) |
||
83 | @param port: integer port used to connect to QPID (default 5672) |
||
84 | @param ssl: boolean to determine whether ssl is used, default value of None will use |
||
85 | ssl only if a client certificate is found. |
||
86 | """ |
||
87 | |||
88 | try: |
||
89 | # |
||
90 | socket = connect(host, port) |
||
91 | if "QPID_SSL_CERT_DB" in os.environ: |
||
92 | certdb = os.environ["QPID_SSL_CERT_DB"] |
||
93 | else: |
||
94 | certdb = os.path.expanduser("~/.qpid/") |
||
95 | if "QPID_SSL_CERT_NAME" in os.environ: |
||
96 | certname = os.environ["QPID_SSL_CERT_NAME"] |
||
97 | else: |
||
98 | certname = QPID_USERNAME |
||
99 | certfile = os.path.join(certdb, certname + ".crt") |
||
100 | if ssl or (ssl is None and os.path.exists(certfile)): |
||
101 | keyfile = os.path.join(certdb, certname + ".key") |
||
102 | trustfile = os.path.join(certdb, "root.crt") |
||
103 | socket = qpid.util.ssl(socket, keyfile=keyfile, certfile=certfile, ca_certs=trustfile) |
||
104 | self.connection = Connection(sock=socket, username=QPID_USERNAME, password=QPID_PASSWORD) |
||
105 | self.connection.start() |
||
106 | self.session = self.connection.session(str(uuid4())) |
||
107 | self.session.exchange_bind(exchange='amq.direct', queue='external.dropbox', binding_key='external.dropbox') |
||
108 | print('Connected to Qpid') |
||
109 | except ValueError: |
||
110 | print('Unable to connect to Qpid') |
||
111 | |||
112 | def sendmessage(self, filepath, header): |
||
113 | """ |
||
114 | This function sends a message to the external.dropbox queue providing the path |
||
115 | to the file to be ingested and a header to determine the plugin to be used to |
||
116 | decode the file. |
||
117 | @param filepath: string full path to file to be ingested |
||
118 | @param header: string header used to determine plugin decoder to use |
||
119 | """ |
||
120 | props = self.session.delivery_properties(routing_key='external.dropbox') |
||
121 | head = self.session.message_properties(application_headers={'header': header}, |
||
122 | user_id=QPID_USERNAME) |
||
123 | self.session.message_transfer(destination='amq.direct', message=Message(props, head, filepath)) |
||
124 | |||
125 | def close(self): |
||
126 | """ |
||
127 | After all messages are sent call this function to close connection and make sure |
||
128 | there are no threads left open |
||
129 | """ |
||
130 | self.session.close(timeout=10) |
||
131 | print('Connection to Qpid closed') |
||
132 |