Test Failed
Push — master ( e380d0...f5671d )
by W
02:58
created

st2common/st2common/persistence/base.py (2 issues)

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
import abc
18
19
import six
20
21
from st2common import log as logging
22
from st2common.exceptions.db import StackStormDBObjectConflictError
23
from st2common.models.system.common import ResourceReference
24
25
26
__all__ = [
27
    'Access',
28
29
    'ContentPackResource',
30
    'StatusBasedResource'
31
]
32
33
LOG = logging.getLogger(__name__)
34
35
36
@six.add_metaclass(abc.ABCMeta)
37
class Access(object):
38
    impl = None
39
    publisher = None
40
    dispatcher = None
41
42
    # ModelAPI class for this resource
43
    api_model_cls = None
44
45
    # A list of operations for which we should dispatch a trigger
46
    dispatch_trigger_for_operations = []
47
48
    # Maps model operation name (e.g. create, update, delete) to the trigger reference which is
49
    # used when dispatching a trigger
50
    operation_to_trigger_ref_map = {}
51
52
    @classmethod
53
    @abc.abstractmethod
54
    def _get_impl(cls):
55
        pass
56
57
    @classmethod
58
    @abc.abstractmethod
59
    def _get_publisher(cls):
60
        return None
61
62
    @classmethod
63
    def _get_dispatcher(cls):
64
        """
65
        Return a dispatcher class which is used for dispatching triggers.
66
        """
67
        # Late import to avoid very expensive in-direct jsonschema import (~1 second) when this
68
        # function is not called / used
69
        from st2common.transport.reactor import TriggerDispatcher
70
71
        if not cls.dispatcher:
72
            cls.dispatcher = TriggerDispatcher(LOG)
73
74
        return cls.dispatcher
75
76
    @classmethod
77
    @abc.abstractmethod
78
    def _get_by_object(cls, object):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in object.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
79
        return None
80
81
    @classmethod
82
    def get_by_name(cls, value):
83
        return cls._get_impl().get_by_name(value)
84
85
    @classmethod
86
    def get_by_id(cls, value):
87
        return cls._get_impl().get_by_id(value)
88
89
    @classmethod
90
    def get_by_uid(cls, value):
91
        return cls._get_impl().get_by_uid(value)
92
93
    @classmethod
94
    def get_by_ref(cls, value):
95
        return cls._get_impl().get_by_ref(value)
96
97
    @classmethod
98
    def get_by_pack(cls, value):
99
        return cls._get_impl().get_by_pack(value)
100
101
    @classmethod
102
    def get(cls, *args, **kwargs):
103
        return cls._get_impl().get(*args, **kwargs)
104
105
    @classmethod
106
    def get_all(cls, *args, **kwargs):
107
        return cls._get_impl().get_all(*args, **kwargs)
108
109
    @classmethod
110
    def count(cls, *args, **kwargs):
111
        return cls._get_impl().count(*args, **kwargs)
112
113
    @classmethod
114
    def query(cls, *args, **kwargs):
115
        return cls._get_impl().query(*args, **kwargs)
116
117
    @classmethod
118
    def distinct(cls, *args, **kwargs):
119
        return cls._get_impl().distinct(*args, **kwargs)
120
121
    @classmethod
122
    def aggregate(cls, *args, **kwargs):
123
        return cls._get_impl().aggregate(*args, **kwargs)
124
125
    @classmethod
126
    def insert(cls, model_object, publish=True, dispatch_trigger=True,
127
               log_not_unique_error_as_debug=False):
128
        # Late import to avoid very expensive in-direct import (~1 second) when this function
129
        # is not called / used
130
        from mongoengine import NotUniqueError
131
132
        if model_object.id:
133
            raise ValueError('id for object %s was unexpected.' % model_object)
134
        try:
135
            model_object = cls._get_impl().insert(model_object)
136
        except NotUniqueError as e:
137
            if log_not_unique_error_as_debug:
138
                LOG.debug('Conflict while trying to save in DB: %s.', str(e))
139
            else:
140
                LOG.exception('Conflict while trying to save in DB.')
141
            # On a conflict determine the conflicting object and return its id in
142
            # the raised exception.
143
            conflict_object = cls._get_by_object(model_object)
144
            conflict_id = str(conflict_object.id) if conflict_object else None
145
            message = str(e)
146
            raise StackStormDBObjectConflictError(message=message, conflict_id=conflict_id,
147
                                                  model_object=model_object)
148
149
        # Publish internal event on the message bus
150
        if publish:
151
            try:
152
                cls.publish_create(model_object)
153
            except:
154
                LOG.exception('Publish failed.')
155
156
        # Dispatch trigger
157
        if dispatch_trigger:
158
            try:
159
                cls.dispatch_create_trigger(model_object)
160
            except:
161
                LOG.exception('Trigger dispatch failed.')
162
163
        return model_object
164
165
    @classmethod
166
    def add_or_update(cls, model_object, publish=True, dispatch_trigger=True,
167
                      log_not_unique_error_as_debug=False):
168
        # Late import to avoid very expensive in-direct import (~1 second) when this function
169
        # is not called / used
170
        from mongoengine import NotUniqueError
171
172
        pre_persist_id = model_object.id
173
        try:
174
            model_object = cls._get_impl().add_or_update(model_object)
175
        except NotUniqueError as e:
176
            if log_not_unique_error_as_debug:
177
                LOG.debug('Conflict while trying to save in DB: %s.', str(e))
178
            else:
179
                LOG.exception('Conflict while trying to save in DB.')
180
            # On a conflict determine the conflicting object and return its id in
181
            # the raised exception.
182
            conflict_object = cls._get_by_object(model_object)
183
            conflict_id = str(conflict_object.id) if conflict_object else None
184
            message = str(e)
185
            raise StackStormDBObjectConflictError(message=message, conflict_id=conflict_id,
186
                                                  model_object=model_object)
187
188
        is_update = str(pre_persist_id) == str(model_object.id)
189
190
        # Publish internal event on the message bus
191
        if publish:
192
            try:
193
                if is_update:
194
                    cls.publish_update(model_object)
195
                else:
196
                    cls.publish_create(model_object)
197
            except:
198
                LOG.exception('Publish failed.')
199
200
        # Dispatch trigger
201
        if dispatch_trigger:
202
            try:
203
                if is_update:
204
                    cls.dispatch_update_trigger(model_object)
205
                else:
206
                    cls.dispatch_create_trigger(model_object)
207
            except:
208
                LOG.exception('Trigger dispatch failed.')
209
210
        return model_object
211
212
    @classmethod
213
    def update(cls, model_object, publish=True, dispatch_trigger=True, **kwargs):
214
        """
215
        Use this method when -
216
        * upsert=False is desired
217
        * special operators like push, push_all are to be used.
218
        """
219
        cls._get_impl().update(model_object, **kwargs)
220
        # update does not return the object but a flag; likely success/fail but docs
221
        # are not very good on this one so ignoring. Explicitly get the object from
222
        # DB abd return.
223
        model_object = cls.get_by_id(model_object.id)
224
225
        # Publish internal event on the message bus
226
        if publish:
227
            try:
228
                cls.publish_update(model_object)
229
            except:
230
                LOG.exception('Publish failed.')
231
232
        # Dispatch trigger
233
        if dispatch_trigger:
234
            try:
235
                cls.dispatch_update_trigger(model_object)
236
            except:
237
                LOG.exception('Trigger dispatch failed.')
238
239
        return model_object
240
241
    @classmethod
242
    def delete(cls, model_object, publish=True, dispatch_trigger=True):
243
        persisted_object = cls._get_impl().delete(model_object)
244
245
        # Publish internal event on the message bus
246
        if publish:
247
            try:
248
                cls.publish_delete(model_object)
249
            except Exception:
250
                LOG.exception('Publish failed.')
251
252
        # Dispatch trigger
253
        if dispatch_trigger:
254
            try:
255
                cls.dispatch_delete_trigger(model_object)
256
            except Exception:
257
                LOG.exception('Trigger dispatch failed.')
258
259
        return persisted_object
260
261
    ####################################################
262
    # Internal event bus message publish related methods
263
    ####################################################
264
265
    @classmethod
266
    def publish_create(cls, model_object):
267
        publisher = cls._get_publisher()
268
        if publisher:
269
            publisher.publish_create(model_object)
270
271
    @classmethod
272
    def publish_update(cls, model_object):
273
        publisher = cls._get_publisher()
274
        if publisher:
275
            publisher.publish_update(model_object)
276
277
    @classmethod
278
    def publish_delete(cls, model_object):
279
        publisher = cls._get_publisher()
280
        if publisher:
281
            publisher.publish_delete(model_object)
282
283
    ############################################
284
    # Internal trigger dispatch related methods
285
    ###########################################
286
287
    @classmethod
288
    def dispatch_create_trigger(cls, model_object):
289
        """
290
        Dispatch a resource-specific trigger which indicates a new resource has been created.
291
        """
292
        return cls._dispatch_operation_trigger(operation='create', model_object=model_object)
293
294
    @classmethod
295
    def dispatch_update_trigger(cls, model_object):
296
        """
297
        Dispatch a resource-specific trigger which indicates an existing resource has been updated.
298
        """
299
        return cls._dispatch_operation_trigger(operation='update', model_object=model_object)
300
301
    @classmethod
302
    def dispatch_delete_trigger(cls, model_object):
303
        """
304
        Dispatch a resource-specific trigger which indicates an existing resource has been
305
        deleted.
306
        """
307
        return cls._dispatch_operation_trigger(operation='delete', model_object=model_object)
308
309
    @classmethod
310
    def _get_trigger_ref_for_operation(cls, operation):
311
        trigger_ref = cls.operation_to_trigger_ref_map.get(operation, None)
312
313
        if not trigger_ref:
314
            raise ValueError('Trigger ref not specified for operation: %s' % (operation))
315
316
        return trigger_ref
317
318
    @classmethod
319
    def _dispatch_operation_trigger(cls, operation, model_object):
320
        if operation not in cls.dispatch_trigger_for_operations:
321
            return
322
323
        trigger = cls._get_trigger_ref_for_operation(operation=operation)
324
325
        object_payload = cls.api_model_cls.from_model(model_object, mask_secrets=True).__json__()
326
        payload = {
327
            'object': object_payload
328
        }
329
        return cls._dispatch_trigger(operation=operation, trigger=trigger, payload=payload)
330
331
    @classmethod
332
    def _dispatch_trigger(cls, operation, trigger, payload):
333
        if operation not in cls.dispatch_trigger_for_operations:
334
            return
335
336
        dispatcher = cls._get_dispatcher()
337
        return dispatcher.dispatch(trigger=trigger, payload=payload)
338
339
340
class ContentPackResource(Access):
341
342
    @classmethod
343
    def get_by_ref(cls, ref):
344
        if not ref:
345
            return None
346
347
        ref_obj = ResourceReference.from_string_reference(ref=ref)
348
        result = cls.query(name=ref_obj.name,
349
                           pack=ref_obj.pack).first()
350
        return result
351
352
    @classmethod
353
    def _get_by_object(cls, object):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in object.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
354
        # For an object with a resourcepack pack.name is unique.
355
        name = getattr(object, 'name', '')
356
        pack = getattr(object, 'pack', '')
357
        return cls.get_by_ref(ResourceReference.to_string_reference(pack=pack, name=name))
358
359
360
class StatusBasedResource(Access):
361
    """Persistence layer for models that needs to publish status to the message queue."""
362
363
    @classmethod
364
    def publish_status(cls, model_object):
365
        """Publish the object status to the message queue.
366
367
        Publish the instance of the model as payload with the status
368
        as routing key to the message queue via the StatePublisher.
369
370
        :param model_object: An instance of the model.
371
        :type model_object: ``object``
372
        """
373
        publisher = cls._get_publisher()
374
        if publisher:
375
            publisher.publish_state(model_object, getattr(model_object, 'status', None))
376