1
|
|
|
# coding: utf-8 |
2
|
|
|
from __future__ import unicode_literals |
3
|
|
|
from __future__ import absolute_import |
4
|
|
|
|
5
|
|
|
import arrow |
6
|
|
|
|
7
|
|
|
# django |
8
|
|
|
from django.conf import settings |
9
|
|
|
from logging import getLogger |
10
|
|
|
|
11
|
|
|
# trigger happy |
12
|
|
|
from django_th.services import default_provider |
13
|
|
|
from django_th.models import TriggerService, update_result |
14
|
|
|
|
15
|
|
|
|
16
|
|
|
logger = getLogger('django_th.trigger_happy') |
17
|
|
|
|
18
|
|
|
default_provider.load_services() |
19
|
|
|
|
20
|
|
|
|
21
|
|
|
class Pub(object): |
22
|
|
|
""" |
23
|
|
|
Publishing data to any service |
24
|
|
|
""" |
25
|
|
|
|
26
|
|
|
def update_trigger(self, service): |
27
|
|
|
""" |
28
|
|
|
update the date when occurs the trigger |
29
|
|
|
:param service: service object to update |
30
|
|
|
""" |
31
|
|
|
now = arrow.utcnow().to(settings.TIME_ZONE).format( |
32
|
|
|
'YYYY-MM-DD HH:mm:ssZZ') |
33
|
|
|
TriggerService.objects.filter(id=service.id).update(date_triggered=now, |
34
|
|
|
consumer_failed=0, |
35
|
|
|
provider_failed=0, |
36
|
|
|
) |
37
|
|
|
|
38
|
|
|
def log_update(self, service, to_update, status, count): |
39
|
|
|
""" |
40
|
|
|
lets log everything at the end |
41
|
|
|
:param service: service object |
42
|
|
|
:param to_update: boolean to check if we have to update |
43
|
|
|
:param status: is everything worked fine ? |
44
|
|
|
:param count: number of data to update |
45
|
|
|
:type service: service object |
46
|
|
|
:type to_update: boolean |
47
|
|
|
:type status: boolean |
48
|
|
|
:type count: interger |
49
|
|
|
""" |
50
|
|
|
if to_update: |
51
|
|
|
if status: |
52
|
|
|
msg = "{} - {} new data".format(service, count) |
53
|
|
|
update_result(service.id, msg="OK", status=status) |
54
|
|
|
logger.info(msg) |
55
|
|
|
else: |
56
|
|
|
msg = "{} AN ERROR OCCURS ".format(service) |
57
|
|
|
update_result(service.id, msg=msg, status=status) |
58
|
|
|
logger.warn(msg) |
59
|
|
|
else: |
60
|
|
|
logger.debug("{} nothing new ".format(service)) |
61
|
|
|
|
62
|
|
|
def provider(self, service): |
63
|
|
|
""" |
64
|
|
|
get the data from (the cache of) the service provider |
65
|
|
|
:param service: |
66
|
|
|
:return: data |
67
|
|
|
""" |
68
|
|
|
service_provider = default_provider.get_service( |
69
|
|
|
str(service.provider.name.name)) |
70
|
|
|
|
71
|
|
|
# 1) get the data from the provider service |
72
|
|
|
module_name = 'th_' + \ |
73
|
|
|
service.provider.name.name.split('Service')[1].lower() |
74
|
|
|
kwargs = {'trigger_id': str(service.id), 'cache_stack': module_name} |
75
|
|
|
return getattr(service_provider, 'process_data')(**kwargs) |
76
|
|
|
|
77
|
|
|
def consumer(self, service, data, to_update, status): |
78
|
|
|
""" |
79
|
|
|
call the consumer and handle the data |
80
|
|
|
:param service: |
81
|
|
|
:param data: |
82
|
|
|
:param to_update: |
83
|
|
|
:param status: |
84
|
|
|
:return: status |
85
|
|
|
""" |
86
|
|
|
# consumer - the service which uses the data |
87
|
|
|
service_consumer = default_provider.get_service( |
88
|
|
|
str(service.consumer.name.name)) |
89
|
|
|
kwargs = {'user': service.user} |
90
|
|
|
getattr(service_consumer, '__init__')(service.consumer.token, |
91
|
|
|
**kwargs) |
92
|
|
|
instance = getattr(service_consumer, 'save_data') |
93
|
|
|
|
94
|
|
|
# 2) for each one |
95
|
|
|
for d in data: |
96
|
|
|
d['userservice_id'] = service.consumer.id |
97
|
|
|
# the consumer will save the data and return if success or not |
98
|
|
|
status = instance(service.id, **d) |
99
|
|
|
|
100
|
|
|
to_update = True |
101
|
|
|
|
102
|
|
|
return to_update, status |
103
|
|
|
|
104
|
|
|
def publishing(self, service): |
105
|
|
|
""" |
106
|
|
|
the purpose of this tasks is to get the data from the cache |
107
|
|
|
then publish them |
108
|
|
|
:param service: service object where we will publish |
109
|
|
|
:type service: object |
110
|
|
|
""" |
111
|
|
|
# flag to know if we have to update |
112
|
|
|
to_update = False |
113
|
|
|
# flag to get the status of a service |
114
|
|
|
status = False |
115
|
|
|
# provider - the service that offer data |
116
|
|
|
# check if the service has already been triggered |
117
|
|
|
# if date_triggered is None, then it's the first run |
118
|
|
|
if service.date_triggered is None: |
119
|
|
|
logger.debug("first run {}".format(service)) |
120
|
|
|
to_update = True |
121
|
|
|
status = True |
122
|
|
|
# run run run |
123
|
|
|
data = self.provider(service) |
124
|
|
|
count_new_data = len(data) if data else 0 |
125
|
|
|
if count_new_data > 0: |
126
|
|
|
to_update, status = self.consumer(service, data, to_update, status) |
127
|
|
|
# let's log |
128
|
|
|
self.log_update(service, to_update, status, count_new_data) |
129
|
|
|
# let's update |
130
|
|
|
if to_update and status: |
131
|
|
|
self.update_trigger(service) |
132
|
|
|
|