Completed
Pull Request — master (#2357)
by Manas
07:53
created

_apply_after()   A

Complexity

Conditions 2

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 9
rs 9.6667
cc 2
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import json
17
import six
18
19
from st2common.constants import action as action_constants
20
from st2common import log as logging
21
from st2common.persistence import action as action_access
22
from st2common.policies import base
23
from st2common.services import action as action_service
24
from st2common.services import coordination
25
26
27
LOG = logging.getLogger(__name__)
28
29
30
class ConcurrencyByAttributeApplicator(base.ResourcePolicyApplicator):
31
32
    def __init__(self, policy_ref, policy_type, *args, **kwargs):
33
        super(ConcurrencyByAttributeApplicator, self).__init__(policy_ref, policy_type,
34
                                                               *args, **kwargs)
35
        self.coordinator = coordination.get_coordinator()
36
        self.threshold = kwargs.get('threshold', 0)
37
        self.attributes = kwargs.get('attributes', [])
38
39
    def _get_lock_uid(self, target):
40
        meta = {
41
            'policy_type': self._policy_type,
42
            'action': target.action,
43
            'attributes': self.attributes
44
        }
45
46
        return json.dumps(meta)
47
48
    def _get_filters(self, target):
49
        filters = {('parameters__%s' % k): v
50
                   for k, v in six.iteritems(target.parameters)
51
                   if k in self.attributes}
52
53
        filters['action'] = target.action
54
        filters['status'] = None
55
56
        return filters
57
58
    def _apply_before(self, target):
59
        # Get the count of scheduled and running instances of the action.
60
        filters = self._get_filters(target)
61
62
        # Get the count of scheduled instances of the action.
63
        filters['status'] = action_constants.LIVEACTION_STATUS_SCHEDULED
64
        scheduled = action_access.LiveAction.count(**filters)
65
66
        # Get the count of running instances of the action.
67
        filters['status'] = action_constants.LIVEACTION_STATUS_RUNNING
68
        running = action_access.LiveAction.count(**filters)
69
70
        count = scheduled + running
71
72
        # Mark the execution as scheduled if threshold is not reached or delayed otherwise.
73
        if count < self.threshold:
74
            LOG.debug('There are %s instances of %s in scheduled or running status. '
75
                      'Threshold of %s is not reached. Action execution will be scheduled.',
76
                      count, target.action, self._policy_ref)
77
            status = action_constants.LIVEACTION_STATUS_SCHEDULED
78
        else:
79
            LOG.debug('There are %s instances of %s in scheduled or running status. '
80
                      'Threshold of %s is reached. Action execution will be delayed.',
81
                      count, target.action, self._policy_ref)
82
            status = action_constants.LIVEACTION_STATUS_DELAYED
83
84
        # Update the status in the database but do not publish.
85
        target = action_service.update_status(target, status, publish=False)
86
87
        return target
88
89
    def apply_before(self, target):
90
        # Exit if target not in schedulable state.
91
        if target.status != action_constants.LIVEACTION_STATUS_REQUESTED:
92
            LOG.debug('The live action is not schedulable therefore the policy '
93
                      '"%s" cannot be applied. %s', self._policy_ref, target)
94
            return target
95
96
        # Warn users that the coordination service is not configured.
97
        if not coordination.configured():
98
            LOG.warn('Coordination service is not configured. Policy enforcement is best effort.')
99
100
        # Acquire a distributed lock before querying the database to make sure that only one
101
        # scheduler is scheduling execution for this action. Even if the coordination service
102
        # is not configured, the fake driver using zake or the file driver can still acquire
103
        # a lock for the local process or server respectively.
104
        lock_uid = self._get_lock_uid(target)
105
        LOG.debug('%s is attempting to acquire lock "%s".', self.__class__.__name__, lock_uid)
106
        with self.coordinator.get_lock(lock_uid):
107
            target = self._apply_before(target)
108
109
        return target
110
111
    def _apply_after(self, target):
112
        # Schedule the oldest delayed executions.
113
        filters = self._get_filters(target)
114
        filters['status'] = action_constants.LIVEACTION_STATUS_DELAYED
115
        requests = action_access.LiveAction.query(order_by=['start_timestamp'], limit=1, **filters)
116
117
        if requests:
118
            action_service.update_status(
119
                requests[0], action_constants.LIVEACTION_STATUS_REQUESTED, publish=True)
120
121
    def apply_after(self, target):
122
        # Warn users that the coordination service is not configured.
123
        if not coordination.configured():
124
            LOG.warn('Coordination service is not configured. Policy enforcement is best effort.')
125
126
        # Acquire a distributed lock before querying the database to make sure that only one
127
        # scheduler is scheduling execution for this action. Even if the coordination service
128
        # is not configured, the fake driver using zake or the file driver can still acquire
129
        # a lock for the local process or server respectively.
130
        lock_uid = self._get_lock_uid(target)
131
        LOG.debug('%s is attempting to acquire lock "%s".', self.__class__.__name__, lock_uid)
132
        with self.coordinator.get_lock(lock_uid):
133
            self._apply_after(target)
134
135
        return target
136