Issues (227)

awips/qpidingest.py (1 issue)

1
# ===============================================================================
2
# qpidingest.py
3
#
4
# @author: Aaron Anderson
5
# @organization: NOAA/WDTB OU/CIMMS
6
# @version: 1.0 02/19/2010
7
# @requires: QPID Python Client available from http://qpid.apache.org/download.html
8
#            The Python Client is located under Single Component Package/Client
9
#
10
#            From the README.txt Installation Instructions
11
#                = INSTALLATION =
12
#                Extract the release archive into a directory of your choice and set
13
#                your PYTHONPATH accordingly:
14
#
15
#                tar -xzf qpid-python-<version>.tar.gz -C <install-prefix>
16
#                export PYTHONPATH=<install-prefix>/qpid-<version>/python
17
#
18
#       ***EDEX and QPID must be running for this module to work***
19
#
20
# DESCRIPTION:
21
# This module is used to connect to QPID and send messages to the external.dropbox queue
22
# which tells EDEX to ingest a data file from a specified path. This avoids having to copy
23
# a data file into an endpoint. Each message also contains a header which is used to determine
24
# which plugin should be used to decode the file. Each plugin has an xml file located in
25
# $EDEX_HOME/data/utility/edex_static/base/distribution that contains regular expressions
26
# that the header is compared to. When the header matches one of these regular expressions
27
# the file is decoded with that plugin. If you make changes to one of these xml files you
28
# must restart EDEX for the changes to take effect.
29
#
30
# NOTE: If the message is being sent but you do not see it being ingested in the EDEX log
31
# check the xml files to make sure the header you are passing matches one of the regular
32
# expressions. Beware of spaces, some regular expressions require spaces while others use
33
# a wildcard character so a space is optional. It seems you are better off having the space
34
# as this will be matched to both patterns. For the file in the example below,
35
# 20100218_185755_SAUS46KLOX.metar, I use SAUS46 KLOX as the header to make sure it matches.
36
#
37
#
38
# EXAMPLE:
39
# Simple example program:
40
#
41
# ------------------------------------------------------------------------------
42
# import qpidingest
43
# #Tell EDEX to ingest a metar file from data_store. The filepath is
44
# #/data_store/20100218/metar/00/standard/20100218_005920_SAUS46KSEW.metar
45
#
46
# conn=qpidingest.IngestViaQPID() #defaults to localhost port 5672
47
#
48
# #If EDEX is not on the local machine you can make the connection as follows
49
# #conn=qpidingest.IngestViaQPID(host='<MACHINE NAME>',port=<PORT NUMBER>)
50
#
51
# conn.sendmessage('/data_store/20100218/metar/18/standard/20100218_185755_SAUS46KLOX.metar','SAUS46 KLOX')
52
# conn.close()
53
# -------------------------------------------------------------------------------
54
#
55
#    SOFTWARE HISTORY
56
#
57
#    Date            Ticket#       Engineer       Description
58
#    ------------    ----------    -----------    --------------------------
59
#    ....
60
#    06/13/2013      DR 16242      D. Friedman    Add Qpid authentication info
61
#    03/06/2014      DR 17907      D. Friedman    Workaround for issue QPID-5569
62
#    02/16/2017      DR 6084       bsteffen       Support ssl connections
63
#
64
# ===============================================================================
65
66
import os
67
import os.path
68
69
import qpid
70
from qpid.util import connect
71
from qpid.connection import Connection
72
from qpid.datatypes import Message, uuid4
73
74
QPID_USERNAME = 'guest'
75
QPID_PASSWORD = 'guest'
76
77
78 View Code Duplication
class IngestViaQPID:
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
79
    def __init__(self, host='localhost', port=5672, ssl=None):
80
        """
81
        Connect to QPID and make bindings to route message to external.dropbox queue
82
        @param host: string hostname of computer running EDEX and QPID (default localhost)
83
        @param port: integer port used to connect to QPID (default 5672)
84
        @param ssl: boolean to determine whether ssl is used, default value of None will use
85
            ssl only if a client certificate is found.
86
        """
87
88
        try:
89
            #
90
            socket = connect(host, port)
91
            if "QPID_SSL_CERT_DB" in os.environ:
92
                certdb = os.environ["QPID_SSL_CERT_DB"]
93
            else:
94
                certdb = os.path.expanduser("~/.qpid/")
95
            if "QPID_SSL_CERT_NAME" in os.environ:
96
                certname = os.environ["QPID_SSL_CERT_NAME"]
97
            else:
98
                certname = QPID_USERNAME
99
            certfile = os.path.join(certdb, certname + ".crt")
100
            if ssl or (ssl is None and os.path.exists(certfile)):
101
                keyfile = os.path.join(certdb, certname + ".key")
102
                trustfile = os.path.join(certdb, "root.crt")
103
                socket = qpid.util.ssl(socket, keyfile=keyfile, certfile=certfile, ca_certs=trustfile)
104
            self.connection = Connection(sock=socket, username=QPID_USERNAME, password=QPID_PASSWORD)
105
            self.connection.start()
106
            self.session = self.connection.session(str(uuid4()))
107
            self.session.exchange_bind(exchange='amq.direct', queue='external.dropbox', binding_key='external.dropbox')
108
            print('Connected to Qpid')
109
        except ValueError:
110
            print('Unable to connect to Qpid')
111
112
    def sendmessage(self, filepath, header):
113
        """
114
        This function sends a message to the external.dropbox queue providing the path
115
        to the file to be ingested and a header to determine the plugin to be used to
116
        decode the file.
117
        @param filepath: string full path to file to be ingested
118
        @param header: string header used to determine plugin decoder to use
119
        """
120
        props = self.session.delivery_properties(routing_key='external.dropbox')
121
        head = self.session.message_properties(application_headers={'header': header},
122
                                               user_id=QPID_USERNAME)
123
        self.session.message_transfer(destination='amq.direct', message=Message(props, head, filepath))
124
125
    def close(self):
126
        """
127
        After all messages are sent call this function to close connection and make sure
128
        there are no threads left open
129
        """
130
        self.session.close(timeout=10)
131
        print('Connection to Qpid closed')
132