Completed
Branch master (0a6e76)
by Fox
01:05
created

consumer()   B

Complexity

Conditions 2

Size

Total Lines 24

Duplication

Lines 24
Ratio 100 %

Importance

Changes 5
Bugs 0 Features 0
Metric Value
cc 2
dl 24
loc 24
rs 8.9713
c 5
b 0
f 0
1
import hmac
2
import hashlib
3
4
from rest_framework.response import Response
5
from rest_framework.decorators import api_view
6
7
from django_th.models import TriggerService
8
from django_th.services import default_provider
9
10
from th_taiga.models import Taiga
11
12
13
def epic(taiga_obj, action, data):
14
    """
15
16
    :param taiga_obj: taiga object
17
    :param action: craete/change/delete
18
    :param data: data to return
19
    :return: data
20
    """
21
    if action == 'create' and taiga_obj.notify_epic_create:
22
        data['type_action'] = 'New Epic created'
23
    elif action == 'change' and taiga_obj.notify_epic_change:
24
        data['type_action'] = 'Changed Epic'
25
    elif action == 'delete' and taiga_obj.notify_epic_delete:
26
        data['type_action'] = 'Deleted Epic'
27
    return data
28
29
30
def issue(taiga_obj, action, data):
31
    """
32
33
    :param taiga_obj: taiga object
34
    :param action: craete/change/delete
35
    :param data: data to return
36
    :return: data
37
    """
38
    if action == 'create' and taiga_obj.notify_issue_create:
39
        data['type_action'] = 'New Issue created'
40
    elif action == 'change' and taiga_obj.notify_issue_change:
41
        data['type_action'] = 'Changed Issue'
42
    elif action == 'delete' and taiga_obj.notify_issue_delete:
43
        data['type_action'] = 'Deleted Issue'
44
    return data
45
46
47
def userstory(taiga_obj, action, data):
48
    """
49
50
    :param taiga_obj: taiga object
51
    :param action: craete/change/delete
52
    :param data: data to return
53
    :return: data
54
    """
55
    if action == 'create' and taiga_obj.notify_userstory_create:
56
        data['type_action'] = 'New Userstory created'
57
    elif action == 'change' and taiga_obj.notify_userstory_change:
58
        data['type_action'] = 'Changed Userstory'
59
    elif action == 'delete' and taiga_obj.notify_userstory_delete:
60
        data['type_action'] = 'Deleted Userstory'
61
    return data
62
63
64
def task(taiga_obj, action, data):
65
    """
66
67
    :param taiga_obj: taiga object
68
    :param action: craete/change/delete
69
    :param data: data to return
70
    :return: data
71
    """
72
    if action == 'create' and taiga_obj.notify_task_create:
73
        data['type_action'] = 'New Task created'
74
    elif action == 'change' and taiga_obj.notify_task_change:
75
        data['type_action'] = 'Changed Task'
76
    elif action == 'delete' and taiga_obj.notify_task_delete:
77
        data['type_action'] = 'Deleted Task'
78
    return data
79
80
81
def wikipage(taiga_obj, action, data):
82
    """
83
84
    :param taiga_obj: taiga object
85
    :param action: craete/change/delete
86
    :param data: data to return
87
    :return: data
88
    """
89
    if action == 'create' and taiga_obj.notify_wikipage_create:
90
        data['type_action'] = 'New Wikipage created'
91
    elif action == 'change' and taiga_obj.notify_wikipage_change:
92
        data['type_action'] = 'Changed Wikipage'
93
    elif action == 'delete' and taiga_obj.notify_wikipage_delete:
94
        data['type_action'] = 'Deleted Wikipage'
95
    return data
96
97
98
def relateduserstory(taiga_obj, action, data):
99
    """
100
101
    :param taiga_obj: taiga object
102
    :param action: craete/change/delete
103
    :param data: data to return
104
    :return: data
105
    """
106
    if action == 'create' and taiga_obj.notify_relateduserstory_create:
107
        data['type_action'] = 'New Related Userstory created'
108
    elif action == 'delete' and taiga_obj.notify_relateduserstory_delete:
109
        data['type_action'] = 'Deleted Related Userstory'
110
    return data
111
112
113
def data_filter(trigger_id, **data):
114
    """
115
    check if we want to track event for a given action
116
    :param trigger_id:
117
    :param data:
118
    :return:
119
    """
120
    taiga_obj = Taiga.objects.get(trigger_id=trigger_id)
121
122
    action = data.get('action')
123
    domain = data.get('type')
124
    data = data.get('data')
125
126
    if domain == 'epic':
127
        data = epic(taiga_obj, action, data)
128
    elif domain == 'issue':
129
        data = issue(taiga_obj, action, data)
130
    elif domain == 'userstory':
131
        data = userstory(taiga_obj, action, data)
132
    elif domain == 'task':
133
        data = task(taiga_obj, action, data)
134
    elif domain == 'wikipage':
135
        data = wikipage(taiga_obj, action, data)
136
    elif domain == 'relateduserstory':
137
        data = relateduserstory(taiga_obj, action, data)
138
    else:
139
        data = []
140
141
    return data
142
143
144 View Code Duplication
def consumer(trigger_id, data):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
145
    """
146
        call the consumer and handle the data
147
        :param trigger_id:
148
        :param data:
149
        :return:
150
    """
151
    status = True
152
    # consumer - the service which uses the data
153
    default_provider.load_services()
154
    service = TriggerService.objects.get(id=trigger_id)
155
156
    service_consumer = default_provider.get_service(
157
        str(service.consumer.name.name))
158
    kwargs = {'user': service.user}
159
160
    data = data_filter(trigger_id, **data)
161
    if len(data) > 0:
162
163
        getattr(service_consumer, '__init__')(service.consumer.token,
164
                                              **kwargs)
165
        status = getattr(service_consumer, 'save_data')(service.id, **data)
166
167
    return status
168
169
170
def verify_signature(data, key, signature):
171
    mac = hmac.new(key.encode("utf-8"), msg=data, digestmod=hashlib.sha1)
172
    return mac.hexdigest() == signature
173
174
175
@api_view(['POST'])
176
def taiga(request, trigger_id, key):
177
    signature = request.META.get('HTTP_X_TAIGA_WEBHOOK_SIGNATURE')
178
    # check that the data are ok with the provided signature
179
    if verify_signature(request._request.body, key, signature):
180
        status = consumer(trigger_id, request.data)
181
        if status:
182
            return Response({"message": "Success"})
183
        else:
184
            return Response({"message": "Failed!"})
185