Test Failed
Push — master ( e380d0...f5671d )
by W
02:58
created

st2stream/st2stream/controllers/v1/executions.py (2 issues)

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import itertools
17
18
import six
19
20
from st2api.controllers.resource import ResourceController
21
from st2common import log as logging
22
from st2common.constants import action as action_constants
23
from st2common.models.db.execution import ActionExecutionOutputDB
24
from st2common.models.api.execution import ActionExecutionAPI
25
from st2common.models.api.execution import ActionExecutionOutputAPI
26
from st2common.persistence.execution import ActionExecution
27
from st2common.persistence.execution import ActionExecutionOutput
28
from st2common.router import Response
29
from st2common.util.jsonify import json_encode
30
from st2common.rbac.types import PermissionType
31
from st2common.stream.listener import get_listener
32
33
__all__ = [
34
    'ActionExecutionOutputStreamController'
35
]
36
37
LOG = logging.getLogger(__name__)
38
39
# Event which is returned when no more data will be produced on this stream endpoint before closing
40
# the connection.
41
NO_MORE_DATA_EVENT = 'event: EOF\ndata: \'\'\n\n'
42
43
44
class ActionExecutionOutputStreamController(ResourceController):
45
    model = ActionExecutionAPI
46
    access = ActionExecution
47
48
    supported_filters = {
49
        'output_type': 'output_type'
50
    }
51
52
    CLOSE_STREAM_LIVEACTION_STATES = action_constants.LIVEACTION_COMPLETED_STATES + [
53
        action_constants.LIVEACTION_STATUS_PAUSING,
54
        action_constants.LIVEACTION_STATUS_RESUMING
55
    ]
56
57
    def get_one(self, id, output_type='all', requester_user=None):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in id.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
58
        # Special case for id == "last"
59
        if id == 'last':
60
            execution_db = ActionExecution.query().order_by('-id').limit(1).first()
61
62
            if not execution_db:
63
                raise ValueError('No executions found in the database')
64
65
            id = str(execution_db.id)
66
67
        execution_db = self._get_one_by_id(id=id, requester_user=requester_user,
68
                                           permission_type=PermissionType.EXECUTION_VIEW)
69
        execution_id = str(execution_db.id)
70
71
        query_filters = {}
72
        if output_type and output_type != 'all':
73
            query_filters['output_type'] = output_type
74
75
        def format_output_object(output_db_or_api):
76
            if isinstance(output_db_or_api, ActionExecutionOutputDB):
77
                data = ActionExecutionOutputAPI.from_model(output_db_or_api)
78
            elif isinstance(output_db_or_api, ActionExecutionOutputAPI):
79
                data = output_db_or_api
80
            else:
81
                raise ValueError('Unsupported format: %s' % (type(output_db_or_api)))
82
83
            event = 'st2.execution.output__create'
84
            result = 'event: %s\ndata: %s\n\n' % (event, json_encode(data, indent=None))
85
            return result
86
87
        def existing_output_iter():
88
            # Consume and return all of the existing lines
89
            output_dbs = ActionExecutionOutput.query(execution_id=execution_id, **query_filters)
90
91
            # Note: We return all at once instead of yield line by line to avoid multiple socket
92
            # writes and to achieve better performance
93
            output = [format_output_object(output_db) for output_db in output_dbs]
94
            output = ''.join(output)
95
            yield six.binary_type(output.encode('utf-8'))
96
97
        def new_output_iter():
98
            def noop_gen():
99
                yield six.binary_type(NO_MORE_DATA_EVENT.encode('utf-8'))
100
101
            # Bail out if execution has already completed / been paused
102
            if execution_db.status in self.CLOSE_STREAM_LIVEACTION_STATES:
103
                return noop_gen()
104
105
            # Wait for and return any new line which may come in
106
            execution_ids = [execution_id]
107
            listener = get_listener(name='execution_output')  # pylint: disable=no-member
108
            gen = listener.generator(execution_ids=execution_ids)
109
110
            def format(gen):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in format.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
111
                for pack in gen:
112
                    if not pack:
113
                        continue
114
                    else:
115
                        (_, model_api) = pack
116
117
                        # Note: gunicorn wsgi handler expect bytes, not unicode
118
                        # pylint: disable=no-member
119
                        if isinstance(model_api, ActionExecutionOutputAPI):
120
                            if output_type and output_type != 'all' and \
121
                               model_api.output_type != output_type:
122
                                continue
123
124
                            output = format_output_object(model_api).encode('utf-8')
125
                            yield six.binary_type(output)
126
                        elif isinstance(model_api, ActionExecutionAPI):
127
                            if model_api.status in self.CLOSE_STREAM_LIVEACTION_STATES:
128
                                yield six.binary_type(NO_MORE_DATA_EVENT.encode('utf-8'))
129
                                break
130
                        else:
131
                            LOG.debug('Unrecognized message type: %s' % (model_api))
132
133
            gen = format(gen)
134
            return gen
135
136
        def make_response():
137
            app_iter = itertools.chain(existing_output_iter(), new_output_iter())
138
            res = Response(content_type='text/event-stream', app_iter=app_iter)
139
            return res
140
141
        res = make_response()
142
        return res
143
144
145
action_execution_output_controller = ActionExecutionOutputStreamController()
146