Completed
Push — master ( d761f8...93bcf6 )
by Fabian
06:30 queued 06:27
created

ecs_deploy.slack   A

Complexity

Total Complexity 32

Size/Duplication

Total Lines 131
Duplicated Lines 38.17 %

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
eloc 86
dl 50
loc 131
ccs 74
cts 74
cp 1
rs 9.84
c 0
b 0
f 0
wmc 32

5 Methods

Rating   Name   Duplication   Size   Complexity  
A SlackNotification.__init__() 0 4 1
B SlackNotification.notify_success() 24 24 7
F SlackNotification.notify_start() 0 40 15
A SlackNotification.get_payload() 0 22 2
B SlackNotification.notify_failure() 26 26 7

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1 1
import re
2 1
import requests
3 1
from datetime import datetime
4
5
6 1
class SlackException(Exception):
7 1
    pass
8
9
10 1
class SlackNotification(object):
11 1
    def __init__(self, url, service_match):
12 1
        self.__url = url
13 1
        self.__service_match_re = re.compile(service_match or '')
14 1
        self.__timestamp_start = datetime.utcnow()
15
16 1
    def get_payload(self, title, messages, color=None):
17 1
        fields = []
18 1
        for message in messages:
19 1
            field = {
20
                'title': message[0],
21
                'value': message[1],
22
                'short': True
23
            }
24 1
            fields.append(field)
25
26 1
        payload = {
27
            "username": "ECS Deploy",
28
            "attachments": [
29
                {
30
                    "pretext": title,
31
                    "color": color,
32
                    "fields": fields
33
                }
34
            ]
35
        }
36
37 1
        return payload
38
39 1
    def notify_start(self, cluster, tag, task_definition, comment, user, service=None, rule=None):
40 1
        if not self.__url or not self.__service_match_re.search(service or rule):
41 1
            return
42
43 1
        messages = [
44
            ('Cluster', cluster),
45
        ]
46
47 1
        if service:
48 1
            messages.append(('Service', service))
49
50 1
        if rule:
51 1
            messages.append(('Scheduled Task', rule))
52
53 1
        if tag:
54 1
            messages.append(('Tag', tag))
55
56 1
        if user:
57 1
            messages.append(('User', user))
58
59 1
        if comment:
60 1
            messages.append(('Comment', comment))
61
62 1
        for diff in task_definition.diff:
63 1
            if tag and diff.field == 'image' and diff.value.endswith(':' + tag):
64 1
                continue
65 1
            if diff.field == 'environment':
66 1
                messages.append(('Environment', '_sensitive (therefore hidden)_'))
67 1
                continue
68
69 1
            messages.append((diff.field, diff.value))
70
71 1
        payload = self.get_payload('Deployment has started', messages)
72
73 1
        response = requests.post(self.__url, json=payload)
74
75 1
        if response.status_code != 200:
76 1
            raise SlackException('Notifying deployment failed')
77
78 1
        return response
79
80 1 View Code Duplication
    def notify_success(self, cluster, revision, service=None, rule=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
81 1
        if not self.__url or not self.__service_match_re.search(service or rule):
82 1
            return
83
84 1
        duration = datetime.utcnow() - self.__timestamp_start
85
86 1
        messages = [
87
            ('Cluster', cluster),
88
        ]
89
90 1
        if service:
91 1
            messages.append(('Service', service))
92 1
        if rule:
93 1
            messages.append(('Scheduled Task', rule))
94
95 1
        messages.append(('Revision', revision))
96 1
        messages.append(('Duration', str(duration)))
97
98 1
        payload = self.get_payload('Deployment finished successfully', messages, 'good')
99
100 1
        response = requests.post(self.__url, json=payload)
101
102 1
        if response.status_code != 200:
103 1
            raise SlackException('Notifying deployment failed')
104
105 1 View Code Duplication
    def notify_failure(self, cluster, error, service=None, rule=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
106 1
        if not self.__url or not self.__service_match_re.search(service or rule):
107 1
            return
108
109 1
        duration = datetime.utcnow() - self.__timestamp_start
110
111 1
        messages = [
112
            ('Cluster', cluster),
113
        ]
114
115 1
        if service:
116 1
            messages.append(('Service', service))
117 1
        if rule:
118 1
            messages.append(('Scheduled Task', rule))
119
120 1
        messages.append(('Duration', str(duration)))
121 1
        messages.append(('Error', error))
122
123 1
        payload = self.get_payload('Deployment failed', messages, 'danger')
124
125 1
        response = requests.post(self.__url, json=payload)
126
127 1
        if response.status_code != 200:
128 1
            raise SlackException('Notifying deployment failed')
129
130
        return response
131