Completed
Push — master ( be542d...29973a )
by Arma
08:49 queued 01:31
created

st2actions.notifier.CloudSlangRunner   A

Complexity

Total Complexity 17

Size/Duplication

Total Lines 106
Duplicated Lines 0 %
Metric Value
wmc 17
dl 0
loc 106
rs 10

6 Methods

Rating   Name   Duplication   Size   Complexity  
B _run_cli_command() 0 24 4
A pre_run() 0 7 1
A __init__() 0 2 1
B _write_inputs_to_a_temp_file() 0 25 3
B run() 0 21 5
A _prepare_command() 0 16 3
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import datetime
17
18
from oslo_config import cfg
19
from apscheduler.schedulers.background import BlockingScheduler
20
from apscheduler.triggers.interval import IntervalTrigger
21
import apscheduler.util as aps_utils
22
23
from st2common import log as logging
24
from st2common.constants import action as action_constants
25
from st2common.services import coordination
26
from st2common.util import date as date_utils
27
from st2common.services import action as action_service
28
from st2common.persistence.liveaction import LiveAction
29
30
__all__ = [
31
    'get_rescheduler',
32
    'recover_delayed_executions'
33
]
34
35
LOG = logging.getLogger(__name__)
36
37
38
def get_rescheduler():
39
    timer = BlockingScheduler()
40
41
    time_spec = {
42
        'seconds': cfg.CONF.scheduler.rescheduling_interval,
43
        'timezone': aps_utils.astimezone('UTC')
44
    }
45
46
    timer.add_job(recover_delayed_executions,
47
                  trigger=IntervalTrigger(**time_spec),
48
                  max_instances=1,
49
                  misfire_grace_time=60,
50
                  next_run_time=date_utils.get_datetime_utc_now(),
51
                  replace_existing=True)
52
53
    return timer
54
55
56
def recover_delayed_executions():
57
    coordinator = coordination.get_coordinator()
58
    dt_now = date_utils.get_datetime_utc_now()
59
    dt_delta = datetime.timedelta(seconds=cfg.CONF.scheduler.delayed_execution_recovery)
60
    dt_timeout = dt_now - dt_delta
61
62
    with coordinator.get_lock('st2-rescheduling-delayed-executions'):
63
        liveactions = LiveAction.query(status=action_constants.LIVEACTION_STATUS_DELAYED,
64
                                       start_timestamp__lte=dt_timeout,
65
                                       order_by=['start_timestamp'])
66
67
        if not liveactions:
68
            return
69
70
        LOG.info('There are %d liveactions that have been delayed for longer than %d seconds.',
71
                 len(liveactions), cfg.CONF.scheduler.delayed_execution_recovery)
72
73
        # Update status to requested and publish status for each liveactions.
74
        rescheduled = 0
75
        for instance in liveactions:
76
            try:
77
                action_service.update_status(instance,
78
                                             action_constants.LIVEACTION_STATUS_REQUESTED,
79
                                             publish=True)
80
                rescheduled += 1
81
            except:
82
                LOG.exception('Unable to reschedule liveaction. <LiveAction.id=%s>', instance.id)
83
84
        LOG.info('Rescheduled %d out of %d delayed liveactions.', len(liveactions), rescheduled)
85