1 | # Licensed to the StackStorm, Inc ('StackStorm') under one or more |
||
2 | # contributor license agreements. See the NOTICE file distributed with |
||
3 | # this work for additional information regarding copyright ownership. |
||
4 | # The ASF licenses this file to You under the Apache License, Version 2.0 |
||
5 | # (the "License"); you may not use this file except in compliance with |
||
6 | # the License. You may obtain a copy of the License at |
||
7 | # |
||
8 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
9 | # |
||
10 | # Unless required by applicable law or agreed to in writing, software |
||
11 | # distributed under the License is distributed on an "AS IS" BASIS, |
||
12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||
13 | # See the License for the specific language governing permissions and |
||
14 | # limitations under the License. |
||
15 | |||
16 | from __future__ import absolute_import |
||
17 | import abc |
||
18 | |||
19 | import six |
||
20 | |||
21 | from st2common import log as logging |
||
22 | from st2common.exceptions.db import StackStormDBObjectConflictError |
||
23 | from st2common.models.system.common import ResourceReference |
||
24 | |||
25 | |||
26 | __all__ = [ |
||
27 | 'Access', |
||
28 | |||
29 | 'ContentPackResource', |
||
30 | 'StatusBasedResource' |
||
31 | ] |
||
32 | |||
33 | LOG = logging.getLogger(__name__) |
||
34 | |||
35 | |||
36 | @six.add_metaclass(abc.ABCMeta) |
||
37 | class Access(object): |
||
38 | impl = None |
||
39 | publisher = None |
||
40 | dispatcher = None |
||
41 | |||
42 | # ModelAPI class for this resource |
||
43 | api_model_cls = None |
||
44 | |||
45 | # A list of operations for which we should dispatch a trigger |
||
46 | dispatch_trigger_for_operations = [] |
||
47 | |||
48 | # Maps model operation name (e.g. create, update, delete) to the trigger reference which is |
||
49 | # used when dispatching a trigger |
||
50 | operation_to_trigger_ref_map = {} |
||
51 | |||
52 | @classmethod |
||
53 | @abc.abstractmethod |
||
54 | def _get_impl(cls): |
||
55 | pass |
||
56 | |||
57 | @classmethod |
||
58 | @abc.abstractmethod |
||
59 | def _get_publisher(cls): |
||
60 | return None |
||
61 | |||
62 | @classmethod |
||
63 | def _get_dispatcher(cls): |
||
64 | """ |
||
65 | Return a dispatcher class which is used for dispatching triggers. |
||
66 | """ |
||
67 | # Late import to avoid very expensive in-direct jsonschema import (~1 second) when this |
||
68 | # function is not called / used |
||
69 | from st2common.transport.reactor import TriggerDispatcher |
||
70 | |||
71 | if not cls.dispatcher: |
||
72 | cls.dispatcher = TriggerDispatcher(LOG) |
||
73 | |||
74 | return cls.dispatcher |
||
75 | |||
76 | @classmethod |
||
77 | @abc.abstractmethod |
||
78 | def _get_by_object(cls, object): |
||
0 ignored issues
–
show
|
|||
79 | return None |
||
80 | |||
81 | @classmethod |
||
82 | def get_by_name(cls, value): |
||
83 | return cls._get_impl().get_by_name(value) |
||
84 | |||
85 | @classmethod |
||
86 | def get_by_id(cls, value): |
||
87 | return cls._get_impl().get_by_id(value) |
||
88 | |||
89 | @classmethod |
||
90 | def get_by_uid(cls, value): |
||
91 | return cls._get_impl().get_by_uid(value) |
||
92 | |||
93 | @classmethod |
||
94 | def get_by_ref(cls, value): |
||
95 | return cls._get_impl().get_by_ref(value) |
||
96 | |||
97 | @classmethod |
||
98 | def get_by_pack(cls, value): |
||
99 | return cls._get_impl().get_by_pack(value) |
||
100 | |||
101 | @classmethod |
||
102 | def get(cls, *args, **kwargs): |
||
103 | return cls._get_impl().get(*args, **kwargs) |
||
104 | |||
105 | @classmethod |
||
106 | def get_all(cls, *args, **kwargs): |
||
107 | return cls._get_impl().get_all(*args, **kwargs) |
||
108 | |||
109 | @classmethod |
||
110 | def count(cls, *args, **kwargs): |
||
111 | return cls._get_impl().count(*args, **kwargs) |
||
112 | |||
113 | @classmethod |
||
114 | def query(cls, *args, **kwargs): |
||
115 | return cls._get_impl().query(*args, **kwargs) |
||
116 | |||
117 | @classmethod |
||
118 | def distinct(cls, *args, **kwargs): |
||
119 | return cls._get_impl().distinct(*args, **kwargs) |
||
120 | |||
121 | @classmethod |
||
122 | def aggregate(cls, *args, **kwargs): |
||
123 | return cls._get_impl().aggregate(*args, **kwargs) |
||
124 | |||
125 | @classmethod |
||
126 | def insert(cls, model_object, publish=True, dispatch_trigger=True, |
||
127 | log_not_unique_error_as_debug=False): |
||
128 | # Late import to avoid very expensive in-direct import (~1 second) when this function |
||
129 | # is not called / used |
||
130 | from mongoengine import NotUniqueError |
||
131 | |||
132 | if model_object.id: |
||
133 | raise ValueError('id for object %s was unexpected.' % model_object) |
||
134 | try: |
||
135 | model_object = cls._get_impl().insert(model_object) |
||
136 | except NotUniqueError as e: |
||
137 | if log_not_unique_error_as_debug: |
||
138 | LOG.debug('Conflict while trying to save in DB: %s.', str(e)) |
||
139 | else: |
||
140 | LOG.exception('Conflict while trying to save in DB.') |
||
141 | # On a conflict determine the conflicting object and return its id in |
||
142 | # the raised exception. |
||
143 | conflict_object = cls._get_by_object(model_object) |
||
144 | conflict_id = str(conflict_object.id) if conflict_object else None |
||
145 | message = str(e) |
||
146 | raise StackStormDBObjectConflictError(message=message, conflict_id=conflict_id, |
||
147 | model_object=model_object) |
||
148 | |||
149 | # Publish internal event on the message bus |
||
150 | if publish: |
||
151 | try: |
||
152 | cls.publish_create(model_object) |
||
153 | except: |
||
154 | LOG.exception('Publish failed.') |
||
155 | |||
156 | # Dispatch trigger |
||
157 | if dispatch_trigger: |
||
158 | try: |
||
159 | cls.dispatch_create_trigger(model_object) |
||
160 | except: |
||
161 | LOG.exception('Trigger dispatch failed.') |
||
162 | |||
163 | return model_object |
||
164 | |||
165 | @classmethod |
||
166 | def add_or_update(cls, model_object, publish=True, dispatch_trigger=True, |
||
167 | log_not_unique_error_as_debug=False): |
||
168 | # Late import to avoid very expensive in-direct import (~1 second) when this function |
||
169 | # is not called / used |
||
170 | from mongoengine import NotUniqueError |
||
171 | |||
172 | pre_persist_id = model_object.id |
||
173 | try: |
||
174 | model_object = cls._get_impl().add_or_update(model_object) |
||
175 | except NotUniqueError as e: |
||
176 | if log_not_unique_error_as_debug: |
||
177 | LOG.debug('Conflict while trying to save in DB: %s.', str(e)) |
||
178 | else: |
||
179 | LOG.exception('Conflict while trying to save in DB.') |
||
180 | # On a conflict determine the conflicting object and return its id in |
||
181 | # the raised exception. |
||
182 | conflict_object = cls._get_by_object(model_object) |
||
183 | conflict_id = str(conflict_object.id) if conflict_object else None |
||
184 | message = str(e) |
||
185 | raise StackStormDBObjectConflictError(message=message, conflict_id=conflict_id, |
||
186 | model_object=model_object) |
||
187 | |||
188 | is_update = str(pre_persist_id) == str(model_object.id) |
||
189 | |||
190 | # Publish internal event on the message bus |
||
191 | if publish: |
||
192 | try: |
||
193 | if is_update: |
||
194 | cls.publish_update(model_object) |
||
195 | else: |
||
196 | cls.publish_create(model_object) |
||
197 | except: |
||
198 | LOG.exception('Publish failed.') |
||
199 | |||
200 | # Dispatch trigger |
||
201 | if dispatch_trigger: |
||
202 | try: |
||
203 | if is_update: |
||
204 | cls.dispatch_update_trigger(model_object) |
||
205 | else: |
||
206 | cls.dispatch_create_trigger(model_object) |
||
207 | except: |
||
208 | LOG.exception('Trigger dispatch failed.') |
||
209 | |||
210 | return model_object |
||
211 | |||
212 | @classmethod |
||
213 | def update(cls, model_object, publish=True, dispatch_trigger=True, **kwargs): |
||
214 | """ |
||
215 | Use this method when - |
||
216 | * upsert=False is desired |
||
217 | * special operators like push, push_all are to be used. |
||
218 | """ |
||
219 | cls._get_impl().update(model_object, **kwargs) |
||
220 | # update does not return the object but a flag; likely success/fail but docs |
||
221 | # are not very good on this one so ignoring. Explicitly get the object from |
||
222 | # DB abd return. |
||
223 | model_object = cls.get_by_id(model_object.id) |
||
224 | |||
225 | # Publish internal event on the message bus |
||
226 | if publish: |
||
227 | try: |
||
228 | cls.publish_update(model_object) |
||
229 | except: |
||
230 | LOG.exception('Publish failed.') |
||
231 | |||
232 | # Dispatch trigger |
||
233 | if dispatch_trigger: |
||
234 | try: |
||
235 | cls.dispatch_update_trigger(model_object) |
||
236 | except: |
||
237 | LOG.exception('Trigger dispatch failed.') |
||
238 | |||
239 | return model_object |
||
240 | |||
241 | @classmethod |
||
242 | def delete(cls, model_object, publish=True, dispatch_trigger=True): |
||
243 | persisted_object = cls._get_impl().delete(model_object) |
||
244 | |||
245 | # Publish internal event on the message bus |
||
246 | if publish: |
||
247 | try: |
||
248 | cls.publish_delete(model_object) |
||
249 | except Exception: |
||
250 | LOG.exception('Publish failed.') |
||
251 | |||
252 | # Dispatch trigger |
||
253 | if dispatch_trigger: |
||
254 | try: |
||
255 | cls.dispatch_delete_trigger(model_object) |
||
256 | except Exception: |
||
257 | LOG.exception('Trigger dispatch failed.') |
||
258 | |||
259 | return persisted_object |
||
260 | |||
261 | #################################################### |
||
262 | # Internal event bus message publish related methods |
||
263 | #################################################### |
||
264 | |||
265 | @classmethod |
||
266 | def publish_create(cls, model_object): |
||
267 | publisher = cls._get_publisher() |
||
268 | if publisher: |
||
269 | publisher.publish_create(model_object) |
||
270 | |||
271 | @classmethod |
||
272 | def publish_update(cls, model_object): |
||
273 | publisher = cls._get_publisher() |
||
274 | if publisher: |
||
275 | publisher.publish_update(model_object) |
||
276 | |||
277 | @classmethod |
||
278 | def publish_delete(cls, model_object): |
||
279 | publisher = cls._get_publisher() |
||
280 | if publisher: |
||
281 | publisher.publish_delete(model_object) |
||
282 | |||
283 | ############################################ |
||
284 | # Internal trigger dispatch related methods |
||
285 | ########################################### |
||
286 | |||
287 | @classmethod |
||
288 | def dispatch_create_trigger(cls, model_object): |
||
289 | """ |
||
290 | Dispatch a resource-specific trigger which indicates a new resource has been created. |
||
291 | """ |
||
292 | return cls._dispatch_operation_trigger(operation='create', model_object=model_object) |
||
293 | |||
294 | @classmethod |
||
295 | def dispatch_update_trigger(cls, model_object): |
||
296 | """ |
||
297 | Dispatch a resource-specific trigger which indicates an existing resource has been updated. |
||
298 | """ |
||
299 | return cls._dispatch_operation_trigger(operation='update', model_object=model_object) |
||
300 | |||
301 | @classmethod |
||
302 | def dispatch_delete_trigger(cls, model_object): |
||
303 | """ |
||
304 | Dispatch a resource-specific trigger which indicates an existing resource has been |
||
305 | deleted. |
||
306 | """ |
||
307 | return cls._dispatch_operation_trigger(operation='delete', model_object=model_object) |
||
308 | |||
309 | @classmethod |
||
310 | def _get_trigger_ref_for_operation(cls, operation): |
||
311 | trigger_ref = cls.operation_to_trigger_ref_map.get(operation, None) |
||
312 | |||
313 | if not trigger_ref: |
||
314 | raise ValueError('Trigger ref not specified for operation: %s' % (operation)) |
||
315 | |||
316 | return trigger_ref |
||
317 | |||
318 | @classmethod |
||
319 | def _dispatch_operation_trigger(cls, operation, model_object): |
||
320 | if operation not in cls.dispatch_trigger_for_operations: |
||
321 | return |
||
322 | |||
323 | trigger = cls._get_trigger_ref_for_operation(operation=operation) |
||
324 | |||
325 | object_payload = cls.api_model_cls.from_model(model_object, mask_secrets=True).__json__() |
||
326 | payload = { |
||
327 | 'object': object_payload |
||
328 | } |
||
329 | return cls._dispatch_trigger(operation=operation, trigger=trigger, payload=payload) |
||
330 | |||
331 | @classmethod |
||
332 | def _dispatch_trigger(cls, operation, trigger, payload): |
||
333 | if operation not in cls.dispatch_trigger_for_operations: |
||
334 | return |
||
335 | |||
336 | dispatcher = cls._get_dispatcher() |
||
337 | return dispatcher.dispatch(trigger=trigger, payload=payload) |
||
338 | |||
339 | |||
340 | class ContentPackResource(Access): |
||
341 | |||
342 | @classmethod |
||
343 | def get_by_ref(cls, ref): |
||
344 | if not ref: |
||
345 | return None |
||
346 | |||
347 | ref_obj = ResourceReference.from_string_reference(ref=ref) |
||
348 | result = cls.query(name=ref_obj.name, |
||
349 | pack=ref_obj.pack).first() |
||
350 | return result |
||
351 | |||
352 | @classmethod |
||
353 | def _get_by_object(cls, object): |
||
0 ignored issues
–
show
|
|||
354 | # For an object with a resourcepack pack.name is unique. |
||
355 | name = getattr(object, 'name', '') |
||
356 | pack = getattr(object, 'pack', '') |
||
357 | return cls.get_by_ref(ResourceReference.to_string_reference(pack=pack, name=name)) |
||
358 | |||
359 | |||
360 | class StatusBasedResource(Access): |
||
361 | """Persistence layer for models that needs to publish status to the message queue.""" |
||
362 | |||
363 | @classmethod |
||
364 | def publish_status(cls, model_object): |
||
365 | """Publish the object status to the message queue. |
||
366 | |||
367 | Publish the instance of the model as payload with the status |
||
368 | as routing key to the message queue via the StatePublisher. |
||
369 | |||
370 | :param model_object: An instance of the model. |
||
371 | :type model_object: ``object`` |
||
372 | """ |
||
373 | publisher = cls._get_publisher() |
||
374 | if publisher: |
||
375 | publisher.publish_state(model_object, getattr(model_object, 'status', None)) |
||
376 |
It is generally discouraged to redefine built-ins as this makes code very hard to read.