Test Failed
Pull Request — master (#4197)
by W
03:53
created

ConcurrencyByAttributeApplicator._apply_before()   A

Complexity

Conditions 3

Size

Total Lines 35

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
dl 0
loc 35
rs 9.0399
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
import json
18
import six
19
20
from st2common.constants import action as action_constants
21
from st2common import log as logging
22
from st2common.persistence import action as action_access
23
from st2common.services import action as action_service
24
from st2common.policies.concurrency import BaseConcurrencyApplicator
25
from st2common.services import coordination
26
27
__all__ = [
28
    'ConcurrencyByAttributeApplicator'
29
]
30
31
LOG = logging.getLogger(__name__)
32
33
34
class ConcurrencyByAttributeApplicator(BaseConcurrencyApplicator):
35
36
    def __init__(self, policy_ref, policy_type, threshold=0, action='delay', attributes=None):
37
        super(ConcurrencyByAttributeApplicator, self).__init__(policy_ref=policy_ref,
38
                                                               policy_type=policy_type,
39
                                                               threshold=threshold,
40
                                                               action=action)
41
        self.attributes = attributes or []
42
43
    def _get_lock_uid(self, target):
44
        meta = {
45
            'policy_type': self._policy_type,
46
            'action': target.action,
47
            'attributes': self.attributes
48
        }
49
50
        return json.dumps(meta)
51
52
    def _get_filters(self, target):
53
        filters = {('parameters__%s' % k): v
54
                   for k, v in six.iteritems(target.parameters)
55
                   if k in self.attributes}
56
57
        filters['action'] = target.action
58
        filters['status'] = None
59
60
        return filters
61
62
    def _apply_before(self, target):
63
        # Get the count of scheduled and running instances of the action.
64
        filters = self._get_filters(target)
65
66
        # Get the count of scheduled instances of the action.
67
        filters['status'] = action_constants.LIVEACTION_STATUS_SCHEDULED
68
        scheduled = action_access.LiveAction.count(**filters)
69
70
        # Get the count of running instances of the action.
71
        filters['status'] = action_constants.LIVEACTION_STATUS_RUNNING
72
        running = action_access.LiveAction.count(**filters)
73
74
        count = scheduled + running
75
76
        # Mark the execution as scheduled if threshold is not reached or delayed otherwise.
77
        if count < self.threshold:
78
            LOG.debug('There are %s instances of %s in scheduled or running status. '
79
                      'Threshold of %s is not reached. Action execution will be scheduled.',
80
                      count, target.action, self._policy_ref)
81
            status = action_constants.LIVEACTION_STATUS_SCHEDULED
82
        else:
83
            action = 'delayed' if self.policy_action == 'delay' else 'canceled'
84
            LOG.debug('There are %s instances of %s in scheduled or running status. '
85
                      'Threshold of %s is reached. Action execution will be %s.',
86
                      count, target.action, self._policy_ref, action)
87
            status = self._get_status_for_policy_action(action=self.policy_action)
88
89
        # Update the status in the database. Publish status for cancellation so the
90
        # appropriate runner can cancel the execution. Other statuses are not published
91
        # because they will be picked up by the worker(s) to be processed again,
92
        # leading to duplicate action executions.
93
        publish = (status == action_constants.LIVEACTION_STATUS_CANCELING)
94
        target = action_service.update_status(target, status, publish=publish)
95
96
        return target
97
98
    def apply_before(self, target):
99
        # Exit if target not in schedulable state.
100
        if target.status != action_constants.LIVEACTION_STATUS_REQUESTED:
101
            LOG.debug('The live action is not schedulable therefore the policy '
102
                      '"%s" cannot be applied. %s', self._policy_ref, target)
103
            return target
104
105
        # Warn users that the coordination service is not configured.
106
        if not coordination.configured():
107
            LOG.warn('Coordination service is not configured. Policy enforcement is best effort.')
108
109
        # Acquire a distributed lock before querying the database to make sure that only one
110
        # scheduler is scheduling execution for this action. Even if the coordination service
111
        # is not configured, the fake driver using zake or the file driver can still acquire
112
        # a lock for the local process or server respectively.
113
        lock_uid = self._get_lock_uid(target)
114
        LOG.debug('%s is attempting to acquire lock "%s".', self.__class__.__name__, lock_uid)
115
        with self.coordinator.get_lock(lock_uid):
116
            target = self._apply_before(target)
117
118
        return target
119
120
    def _apply_after(self, target):
121
        # Schedule the oldest delayed executions.
122
        filters = self._get_filters(target)
123
        filters['status'] = action_constants.LIVEACTION_STATUS_DELAYED
124
        requests = action_access.LiveAction.query(order_by=['start_timestamp'], limit=1, **filters)
125
126
        if requests:
127
            action_service.update_status(
128
                requests[0], action_constants.LIVEACTION_STATUS_REQUESTED, publish=True)
129
130
    def apply_after(self, target):
131
        # Warn users that the coordination service is not configured.
132
        if not coordination.configured():
133
            LOG.warn('Coordination service is not configured. Policy enforcement is best effort.')
134
135
        # Acquire a distributed lock before querying the database to make sure that only one
136
        # scheduler is scheduling execution for this action. Even if the coordination service
137
        # is not configured, the fake driver using zake or the file driver can still acquire
138
        # a lock for the local process or server respectively.
139
        lock_uid = self._get_lock_uid(target)
140
        LOG.debug('%s is attempting to acquire lock "%s".', self.__class__.__name__, lock_uid)
141
        with self.coordinator.get_lock(lock_uid):
142
            self._apply_after(target)
143
144
        return target
145