awips.dataaccess.PyNotification   A
last analyzed

Complexity

Total Complexity 8

Size/Duplication

Total Lines 86
Duplicated Lines 72.09 %

Importance

Changes 0
Metric Value
wmc 8
eloc 36
dl 62
loc 86
rs 10
c 0
b 0
f 0

7 Methods

Rating   Name   Duplication   Size   Complexity  
A PyNotification.subscribed() 4 4 1
A PyNotification.subscribe() 11 11 1
A PyNotification.getData() 12 12 1
A PyNotification.close() 3 3 2
A PyNotification.messageReceived() 7 7 1
A PyNotification.getDataTime() 3 3 1
A PyNotification.__init__() 9 9 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
#
2
# Implements IData for use by native Python clients to the Data Access
3
# Framework.
4
#
5
#
6
#     SOFTWARE HISTORY
7
#
8
#    Date            Ticket#       Engineer      Description
9
#    ------------    ----------    -----------   --------------------------
10
#    Jun 22, 2016    2416          rjpeter       Initial creation
11
#    Jul 22, 2016    2416          tgurney       Finish implementation
12
#    Sep 07, 2017    6175          tgurney       Override messageReceived in subclasses
13
#
14
15
from six import with_metaclass
16
import abc
17
18
from awips.dataaccess import DataAccessLayer
19
from awips.dataaccess import INotificationSubscriber
20
from awips.QpidSubscriber import QpidSubscriber
21
from dynamicserialize.dstypes.com.raytheon.uf.common.time import DataTime
22
23
24 View Code Duplication
class PyNotification(with_metaclass(abc.ABCMeta, INotificationSubscriber)):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
25
    """
26
    Receives notifications for new data and retrieves the data that meets
27
    specified filtering criteria.
28
    """
29
30
    def __init__(self, request, notificationFilter, host='localhost',
31
                 port=5672, requestHost='localhost'):
32
        self.DAL = DataAccessLayer
33
        self.DAL.changeEDEXHost(requestHost)
34
        self.request = request
35
        self.notificationFilter = notificationFilter
36
        self.__topicSubscriber = QpidSubscriber(host, port, decompress=True)
37
        self.__topicName = "edex.alerts"
38
        self.callback = None
39
40
    def subscribe(self, callback):
41
        """
42
        Start listening for notifications.
43
44
        Args:
45
            callback: Function to call with a list of received data objects.
46
              Will be called once for each request made for data.
47
        """
48
        assert hasattr(callback, '__call__'), 'callback arg must be callable'
49
        self.callback = callback
50
        self.__topicSubscriber.topicSubscribe(self.__topicName, self.messageReceived)
51
        # Blocks here
52
53
    def close(self):
54
        if self.__topicSubscriber.subscribed:
55
            self.__topicSubscriber.close()
56
57
    def getDataTime(self, dataURI):
58
        dataTimeStr = dataURI.split('/')[2]
59
        return DataTime(dataTimeStr)
60
61
    @abc.abstractmethod
62
    def messageReceived(self, msg):
63
        """Called when a message is received from QpidSubscriber.
64
65
        This method must call self.callback once for each request made for data
66
        """
67
        pass
68
69
    @abc.abstractmethod
70
    def getData(self, request, dataTimes):
71
        """
72
        Retrieve and return data
73
74
        Args:
75
            request: IDataRequest to send to the server
76
            dataTimes: list of data times
77
        Returns:
78
            list of IData
79
        """
80
        pass
81
82
    @property
83
    def subscribed(self):
84
        """True if currently subscribed to notifications."""
85
        return self.__topicSubscriber.queueStarted
86