Code Duplication    Length = 17-23 lines in 2 locations

plumd/plugins/read.py 1 location

@@ 83-105 (lines=23) @@
80
            return
81
82
83
    def poll(self):
84
        """Calls the plugins poll() function and either returns a valid
85
        :class:`plumd.ResultSet` or None.
86
87
        :rtype: plumd.ResultSet
88
        """
89
        rset = None
90
        # plugins can raise pretty much any exception, wrap in catch all try
91
        try:
92
            rset = self.pobj.poll()
93
            # ensure the result set is valid, if not set it to an empty one
94
            if rset is None or not isinstance(rset, plumd.ResultSet):
95
                msg = "read: invalid result set from reader {0}: {1}"
96
                self.log.error(msg.format(self._name, rset))
97
                rset = plumd.ResultSet()
98
        except Exception as e:
99
            # print a traceback and ensure the reader thread exits
100
            tb = traceback.format_exc()
101
            msg="read: disabling reader {0} due to exception in poll: {1} : {2}"
102
            self.log.error(msg.format(self._name, e, tb))
103
            self.stop_evt.set()
104
            rset = plumd.ResultSet()
105
        return rset
106
107
108
    def write(self, rset, lint):

plumd/plugins/write.py 1 location

@@ 65-81 (lines=17) @@
62
        return self._name
63
64
65
    def flush(self):
66
        """Flushes self.queue to self.pobj.push()."""
67
        start = time.time()
68
        while not self.queue.empty():
69
            rset = self.queue.get()
70
            # pretty much any exception can get raised in a plugins push call
71
            try:
72
                self.pobj.push(rset)
73
            except Exception as e:
74
                tb = traceback.format_exc()
75
                msg = "write: writer {0}: exception in flush: {1}: {2}"
76
                self.log.debug(msg.format(self._name, e, tb))
77
                return
78
            if time.time() - start > 1:
79
                msg = "write: flush: flush taking too long, dropping metrics"
80
                self.log.error(msg)
81
                break
82
83
84
    def dequeue(self):