Completed
Pull Request — master (#2357)
by Manas
07:53
created

st2reactor.timer.BaseSensor   A

Complexity

Total Complexity 7

Size/Duplication

Total Lines 58
Duplicated Lines 0 %
Metric Value
wmc 7
dl 0
loc 58
rs 10

5 Methods

Rating   Name   Duplication   Size   Complexity  
A st2reactor.timer.St2Timer.start() 0 4 1
A st2reactor.timer.St2Timer.add_trigger() 0 2 1
A st2reactor.timer.St2Timer.cleanup() 0 2 1
A st2reactor.timer.St2Timer.update_trigger() 0 3 1
A st2reactor.timer.St2Timer.__init__() 0 12 1
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import uuid
17
18
from apscheduler.schedulers.background import BlockingScheduler
19
from apscheduler.triggers.cron import CronTrigger
20
from apscheduler.triggers.date import DateTrigger
21
from apscheduler.triggers.interval import IntervalTrigger
22
import apscheduler.util as aps_utils
23
import dateutil.parser as date_parser
24
import jsonschema
25
26
from st2common import log as logging
27
from st2common.constants.triggers import TIMER_TRIGGER_TYPES
28
from st2common.models.api.trace import TraceContext
29
import st2common.services.triggers as trigger_services
30
from st2common.services.triggerwatcher import TriggerWatcher
31
from st2common.transport.reactor import TriggerDispatcher
32
from st2common.util import date as date_utils
33
from st2common.util import schema as util_schema
34
35
LOG = logging.getLogger(__name__)
36
37
38
class St2Timer(object):
39
    """
40
    A timer interface that uses APScheduler 3.0.
41
    """
42
    def __init__(self, local_timezone=None):
43
        self._timezone = local_timezone
44
        self._scheduler = BlockingScheduler(timezone=self._timezone)
45
        self._jobs = {}
46
        self._trigger_types = TIMER_TRIGGER_TYPES.keys()
47
        self._trigger_watcher = TriggerWatcher(create_handler=self._handle_create_trigger,
48
                                               update_handler=self._handle_update_trigger,
49
                                               delete_handler=self._handle_delete_trigger,
50
                                               trigger_types=self._trigger_types,
51
                                               queue_suffix=self.__class__.__name__,
52
                                               exclusive=True)
53
        self._trigger_dispatcher = TriggerDispatcher(LOG)
54
55
    def start(self):
56
        self._register_timer_trigger_types()
57
        self._trigger_watcher.start()
58
        self._scheduler.start()
59
60
    def cleanup(self):
61
        self._scheduler.shutdown(wait=True)
62
63
    def add_trigger(self, trigger):
64
        self._add_job_to_scheduler(trigger)
65
66
    def update_trigger(self, trigger):
67
        self.remove_trigger(trigger)
68
        self.add_trigger(trigger)
69
70
    def remove_trigger(self, trigger):
71
        trigger_id = trigger['id']
72
73
        try:
74
            job_id = self._jobs[trigger_id]
75
        except KeyError:
76
            LOG.info('Job not found: %s', trigger_id)
77
            return
78
79
        self._scheduler.remove_job(job_id)
80
        del self._jobs[trigger_id]
81
82
    def _add_job_to_scheduler(self, trigger):
83
        trigger_type_ref = trigger['type']
84
        trigger_type = TIMER_TRIGGER_TYPES[trigger_type_ref]
85
        try:
86
            util_schema.validate(instance=trigger['parameters'],
87
                                 schema=trigger_type['parameters_schema'],
88
                                 cls=util_schema.CustomValidator,
89
                                 use_default=True,
90
                                 allow_default_none=True)
91
        except jsonschema.ValidationError as e:
92
            LOG.error('Exception scheduling timer: %s, %s',
93
                      trigger['parameters'], e, exc_info=True)
94
            raise  # Or should we just return?
95
96
        time_spec = trigger['parameters']
97
        time_zone = aps_utils.astimezone(trigger['parameters'].get('timezone'))
98
99
        time_type = None
100
101
        if trigger_type['name'] == 'st2.IntervalTimer':
102
            unit = time_spec.get('unit', None)
103
            value = time_spec.get('delta', None)
104
            time_type = IntervalTrigger(**{unit: value, 'timezone': time_zone})
105
        elif trigger_type['name'] == 'st2.DateTimer':
106
            # Raises an exception if date string isn't a valid one.
107
            dat = date_parser.parse(time_spec.get('date', None))
108
            time_type = DateTrigger(dat, timezone=time_zone)
109
        elif trigger_type['name'] == 'st2.CronTimer':
110
            cron = time_spec.copy()
111
            cron['timezone'] = time_zone
112
113
            time_type = CronTrigger(**cron)
114
115
        utc_now = date_utils.get_datetime_utc_now()
116
        if hasattr(time_type, 'run_date') and utc_now > time_type.run_date:
117
            LOG.warning('Not scheduling expired timer: %s : %s',
118
                        trigger['parameters'], time_type.run_date)
119
        else:
120
            self._add_job(trigger, time_type)
121
        return time_type
122
123
    def _add_job(self, trigger, time_type, replace=True):
124
        try:
125
            job = self._scheduler.add_job(self._emit_trigger_instance,
126
                                          trigger=time_type,
127
                                          args=[trigger],
128
                                          replace_existing=replace)
129
            LOG.info('Job %s scheduled.', job.id)
130
            self._jobs[trigger['id']] = job.id
131
        except Exception as e:
132
            LOG.error('Exception scheduling timer: %s, %s',
133
                      trigger['parameters'], e, exc_info=True)
134
135
    def _emit_trigger_instance(self, trigger):
136
        utc_now = date_utils.get_datetime_utc_now()
137
        # debug logging is reasonable for this one. A high resolution timer will end up
138
        # trashing standard logs.
139
        LOG.debug('Timer fired at: %s. Trigger: %s', str(utc_now), trigger)
140
141
        payload = {
142
            'executed_at': str(utc_now),
143
            'schedule': trigger['parameters'].get('time')
144
        }
145
146
        trace_context = TraceContext(trace_tag='%s-%s' % (self._get_trigger_type_name(trigger),
147
                                                          trigger.get('name', uuid.uuid4().hex)))
148
        self._trigger_dispatcher.dispatch(trigger, payload, trace_context=trace_context)
149
150
    def _get_trigger_type_name(self, trigger):
151
        trigger_type_ref = trigger['type']
152
        trigger_type = TIMER_TRIGGER_TYPES[trigger_type_ref]
153
        return trigger_type['name']
154
155
    def _register_timer_trigger_types(self):
156
        return trigger_services.add_trigger_models(TIMER_TRIGGER_TYPES.values())
157
158
    ##############################################
159
    # Event handler methods for the trigger events
160
    ##############################################
161
162
    def _handle_create_trigger(self, trigger):
163
        LOG.debug('Calling "add_trigger" method (trigger.type=%s)' % (trigger.type))
164
        trigger = self._sanitize_trigger(trigger=trigger)
165
        self.add_trigger(trigger=trigger)
166
167
    def _handle_update_trigger(self, trigger):
168
        LOG.debug('Calling "update_trigger" method (trigger.type=%s)' % (trigger.type))
169
        trigger = self._sanitize_trigger(trigger=trigger)
170
        self.update_trigger(trigger=trigger)
171
172
    def _handle_delete_trigger(self, trigger):
173
        LOG.debug('Calling "remove_trigger" method (trigger.type=%s)' % (trigger.type))
174
        trigger = self._sanitize_trigger(trigger=trigger)
175
        self.remove_trigger(trigger=trigger)
176
177
    def _sanitize_trigger(self, trigger):
178
        sanitized = trigger._data
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like _data was declared protected and should not be accessed from this context.

Prefixing a member variable _ is usually regarded as the equivalent of declaring it with protected visibility that exists in other languages. Consequentially, such a member should only be accessed from the same class or a child class:

class MyParent:
    def __init__(self):
        self._x = 1;
        self.y = 2;

class MyChild(MyParent):
    def some_method(self):
        return self._x    # Ok, since accessed from a child class

class AnotherClass:
    def some_method(self, instance_of_my_child):
        return instance_of_my_child._x   # Would be flagged as AnotherClass is not
                                         # a child class of MyParent
Loading history...
179
        if 'id' in sanitized:
180
            # Friendly objectid rather than the MongoEngine representation.
181
            sanitized['id'] = str(sanitized['id'])
182
        return sanitized
183