1 | # Licensed to the StackStorm, Inc ('StackStorm') under one or more |
||
2 | # contributor license agreements. See the NOTICE file distributed with |
||
3 | # this work for additional information regarding copyright ownership. |
||
4 | # The ASF licenses this file to You under the Apache License, Version 2.0 |
||
5 | # (the "License"); you may not use this file except in compliance with |
||
6 | # the License. You may obtain a copy of the License at |
||
7 | # |
||
8 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
9 | # |
||
10 | # Unless required by applicable law or agreed to in writing, software |
||
11 | # distributed under the License is distributed on an "AS IS" BASIS, |
||
12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||
13 | # See the License for the specific language governing permissions and |
||
14 | # limitations under the License. |
||
15 | |||
16 | import itertools |
||
17 | |||
18 | import six |
||
19 | |||
20 | from st2api.controllers.resource import ResourceController |
||
21 | from st2common import log as logging |
||
22 | from st2common.constants import action as action_constants |
||
23 | from st2common.models.db.execution import ActionExecutionOutputDB |
||
24 | from st2common.models.api.execution import ActionExecutionAPI |
||
25 | from st2common.models.api.execution import ActionExecutionOutputAPI |
||
26 | from st2common.persistence.execution import ActionExecution |
||
27 | from st2common.persistence.execution import ActionExecutionOutput |
||
28 | from st2common.router import Response |
||
29 | from st2common.util.jsonify import json_encode |
||
30 | from st2common.rbac.types import PermissionType |
||
31 | from st2common.stream.listener import get_listener |
||
32 | |||
33 | __all__ = [ |
||
34 | 'ActionExecutionOutputStreamController' |
||
35 | ] |
||
36 | |||
37 | LOG = logging.getLogger(__name__) |
||
38 | |||
39 | # Event which is returned when no more data will be produced on this stream endpoint before closing |
||
40 | # the connection. |
||
41 | NO_MORE_DATA_EVENT = 'event: EOF\ndata: \'\'\n\n' |
||
42 | |||
43 | |||
44 | class ActionExecutionOutputStreamController(ResourceController): |
||
45 | model = ActionExecutionAPI |
||
46 | access = ActionExecution |
||
47 | |||
48 | supported_filters = { |
||
49 | 'output_type': 'output_type' |
||
50 | } |
||
51 | |||
52 | CLOSE_STREAM_LIVEACTION_STATES = action_constants.LIVEACTION_COMPLETED_STATES + [ |
||
53 | action_constants.LIVEACTION_STATUS_PAUSING, |
||
54 | action_constants.LIVEACTION_STATUS_RESUMING |
||
55 | ] |
||
56 | |||
57 | def get_one(self, id, output_type='all', requester_user=None): |
||
0 ignored issues
–
show
|
|||
58 | # Special case for id == "last" |
||
59 | if id == 'last': |
||
60 | execution_db = ActionExecution.query().order_by('-id').limit(1).first() |
||
61 | |||
62 | if not execution_db: |
||
63 | raise ValueError('No executions found in the database') |
||
64 | |||
65 | id = str(execution_db.id) |
||
66 | |||
67 | execution_db = self._get_one_by_id(id=id, requester_user=requester_user, |
||
68 | permission_type=PermissionType.EXECUTION_VIEW) |
||
69 | execution_id = str(execution_db.id) |
||
70 | |||
71 | query_filters = {} |
||
72 | if output_type and output_type != 'all': |
||
73 | query_filters['output_type'] = output_type |
||
74 | |||
75 | def format_output_object(output_db_or_api): |
||
76 | if isinstance(output_db_or_api, ActionExecutionOutputDB): |
||
77 | data = ActionExecutionOutputAPI.from_model(output_db_or_api) |
||
78 | elif isinstance(output_db_or_api, ActionExecutionOutputAPI): |
||
79 | data = output_db_or_api |
||
80 | else: |
||
81 | raise ValueError('Unsupported format: %s' % (type(output_db_or_api))) |
||
82 | |||
83 | event = 'st2.execution.output__create' |
||
84 | result = 'event: %s\ndata: %s\n\n' % (event, json_encode(data, indent=None)) |
||
85 | return result |
||
86 | |||
87 | def existing_output_iter(): |
||
88 | # Consume and return all of the existing lines |
||
89 | output_dbs = ActionExecutionOutput.query(execution_id=execution_id, **query_filters) |
||
90 | |||
91 | # Note: We return all at once instead of yield line by line to avoid multiple socket |
||
92 | # writes and to achieve better performance |
||
93 | output = [format_output_object(output_db) for output_db in output_dbs] |
||
94 | output = ''.join(output) |
||
95 | yield six.binary_type(output.encode('utf-8')) |
||
96 | |||
97 | def new_output_iter(): |
||
98 | def noop_gen(): |
||
99 | yield six.binary_type(NO_MORE_DATA_EVENT.encode('utf-8')) |
||
100 | |||
101 | # Bail out if execution has already completed / been paused |
||
102 | if execution_db.status in self.CLOSE_STREAM_LIVEACTION_STATES: |
||
103 | return noop_gen() |
||
104 | |||
105 | # Wait for and return any new line which may come in |
||
106 | execution_ids = [execution_id] |
||
107 | listener = get_listener(name='execution_output') # pylint: disable=no-member |
||
108 | gen = listener.generator(execution_ids=execution_ids) |
||
109 | |||
110 | def format(gen): |
||
0 ignored issues
–
show
|
|||
111 | for pack in gen: |
||
112 | if not pack: |
||
113 | continue |
||
114 | else: |
||
115 | (_, model_api) = pack |
||
116 | |||
117 | # Note: gunicorn wsgi handler expect bytes, not unicode |
||
118 | # pylint: disable=no-member |
||
119 | if isinstance(model_api, ActionExecutionOutputAPI): |
||
120 | if output_type and output_type != 'all' and \ |
||
121 | model_api.output_type != output_type: |
||
122 | continue |
||
123 | |||
124 | output = format_output_object(model_api).encode('utf-8') |
||
125 | yield six.binary_type(output) |
||
126 | elif isinstance(model_api, ActionExecutionAPI): |
||
127 | if model_api.status in self.CLOSE_STREAM_LIVEACTION_STATES: |
||
128 | yield six.binary_type(NO_MORE_DATA_EVENT.encode('utf-8')) |
||
129 | break |
||
130 | else: |
||
131 | LOG.debug('Unrecognized message type: %s' % (model_api)) |
||
132 | |||
133 | gen = format(gen) |
||
134 | return gen |
||
135 | |||
136 | def make_response(): |
||
137 | app_iter = itertools.chain(existing_output_iter(), new_output_iter()) |
||
138 | res = Response(content_type='text/event-stream', app_iter=app_iter) |
||
139 | return res |
||
140 | |||
141 | res = make_response() |
||
142 | return res |
||
143 | |||
144 | |||
145 | action_execution_output_controller = ActionExecutionOutputStreamController() |
||
146 |
It is generally discouraged to redefine built-ins as this makes code very hard to read.