Completed
Push — master ( fe66bc...2a24eb )
by Manas
18:11
created

st2actions.ActionExecutionScheduler   A

Complexity

Total Complexity 8

Size/Duplication

Total Lines 55
Duplicated Lines 0 %
Metric Value
wmc 8
dl 0
loc 55
rs 10
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from kombu import Connection
17
18
from st2common import log as logging
19
from st2common.constants import action as action_constants
20
from st2common.exceptions.db import StackStormDBObjectNotFoundError
21
from st2common.models.db.liveaction import LiveActionDB
22
from st2common.services import action as action_service
23
from st2common.persistence.liveaction import LiveAction
24
from st2common.persistence.policy import Policy
25
from st2common import policies
26
from st2common.transport import consumers, liveaction
27
from st2common.transport import utils as transport_utils
28
from st2common.util import action_db as action_utils
29
30
__all__ = [
31
    'get_scheduler',
32
]
33
34
35
LOG = logging.getLogger(__name__)
36
37
ACTIONRUNNER_REQUEST_Q = liveaction.get_status_management_queue(
38
    'st2.actionrunner.req', routing_key=action_constants.LIVEACTION_STATUS_REQUESTED)
39
40
41
class ActionExecutionScheduler(consumers.MessageHandler):
42
    message_type = LiveActionDB
43
44
    def process(self, request):
45
        """Schedules the LiveAction and publishes the request
46
        to the appropriate action runner(s).
47
48
        LiveAction in statuses other than "requested" are ignored.
49
50
        :param request: Action execution request.
51
        :type request: ``st2common.models.db.liveaction.LiveActionDB``
52
        """
53
54
        if request.status != action_constants.LIVEACTION_STATUS_REQUESTED:
55
            LOG.info('%s is ignoring %s (id=%s) with "%s" status.',
56
                     self.__class__.__name__, type(request), request.id, request.status)
57
            return
58
59
        try:
60
            liveaction_db = action_utils.get_liveaction_by_id(request.id)
61
        except StackStormDBObjectNotFoundError:
62
            LOG.exception('Failed to find liveaction %s in the database.', request.id)
63
            raise
64
65
        # Apply policies defined for the action.
66
        for policy_db in Policy.query(resource_ref=liveaction_db.action):
67
            driver = policies.get_driver(policy_db.ref,
68
                                         policy_db.policy_type,
69
                                         **policy_db.parameters)
70
71
            try:
72
                liveaction_db = driver.apply_before(liveaction_db)
73
            except:
74
                LOG.exception('An exception occurred while applying policy "%s".', policy_db.ref)
75
76
            if liveaction_db.status == action_constants.LIVEACTION_STATUS_DELAYED:
77
                break
78
79
        # Exit if the status of the request is no longer runnable.
80
        # The status could have be changed by one of the policies.
81
        if liveaction_db.status not in [action_constants.LIVEACTION_STATUS_REQUESTED,
82
                                        action_constants.LIVEACTION_STATUS_SCHEDULED]:
83
            LOG.info('%s is ignoring %s (id=%s) with "%s" status after policies are applied.',
84
                     self.__class__.__name__, type(request), request.id, liveaction_db.status)
85
            return
86
87
        # Update liveaction status to "scheduled".
88
        if liveaction_db.status == action_constants.LIVEACTION_STATUS_REQUESTED:
89
            liveaction_db = action_service.update_status(
90
                liveaction_db, action_constants.LIVEACTION_STATUS_SCHEDULED, publish=False)
91
92
        # Publish the "scheduled" status here manually. Otherwise, there could be a
93
        # race condition with the update of the action_execution_db if the execution
94
        # of the liveaction completes first.
95
        LiveAction.publish_status(liveaction_db)
96
97
98
def get_scheduler():
99
    with Connection(transport_utils.get_messaging_urls()) as conn:
100
        return ActionExecutionScheduler(conn, [ACTIONRUNNER_REQUEST_Q])
101