Passed
Push — master ( f1fe9e...5c5de8 )
by
unknown
03:44
created

read_and_store_stream()   C

Complexity

Conditions 7

Size

Total Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 7
c 1
b 0
f 0
dl 0
loc 21
rs 6.4705
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import eventlet
17
18
import logging as stdlib_logging
19
20
from oslo_config import cfg
21
22
from st2common.constants.action import ACTION_OUTPUT_RESULT_DELIMITER
23
from st2common import log as logging
24
from st2common.runners import base as runners
25
from st2common.util import action_db as action_db_utils
26
from st2actions.container.service import RunnerContainerService
27
28
29
__all__ = [
30
    'get_logger_for_python_runner_action',
31
    'get_action_class_instance',
32
33
    'make_read_and_store_stream_func'
34
]
35
36
LOG = logging.getLogger(__name__)
37
38
39
def get_logger_for_python_runner_action(action_name):
40
    """
41
    Set up a logger which logs all the messages with level DEBUG and above to stderr.
42
    """
43
    logger_name = 'actions.python.%s' % (action_name)
44
    logger = logging.getLogger(logger_name)
45
46
    console = stdlib_logging.StreamHandler()
47
    console.setLevel(stdlib_logging.DEBUG)
48
49
    formatter = stdlib_logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
50
    console.setFormatter(formatter)
51
    logger.addHandler(console)
52
    logger.setLevel(stdlib_logging.DEBUG)
53
54
    return logger
55
56
57
def get_action_class_instance(action_cls, config=None, action_service=None):
58
    """
59
    Instantiate and return Action class instance.
60
61
    :param action_cls: Action class to instantiate.
62
    :type action_cls: ``class``
63
64
    :param config: Config to pass to the action class.
65
    :type config: ``dict``
66
67
    :param action_service: ActionService instance to pass to the class.
68
    :type action_service: :class:`ActionService`
69
    """
70
    kwargs = {}
71
    kwargs['config'] = config
72
    kwargs['action_service'] = action_service
73
74
    # Note: This is done for backward compatibility reasons. We first try to pass
75
    # "action_service" argument to the action class constructor, but if that doesn't work (e.g. old
76
    # action which hasn't been updated yet), we resort to late assignment post class instantiation.
77
    # TODO: Remove in next major version once all the affected actions have been updated.
78
    try:
79
        action_instance = action_cls(**kwargs)
80
    except TypeError as e:
81
        if 'unexpected keyword argument \'action_service\'' not in str(e):
82
            raise e
83
84
        LOG.debug('Action class (%s) constructor doesn\'t take "action_service" argument, '
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
85
                  'falling back to late assignment...' % (action_cls.__class__.__name__))
86
87
        action_service = kwargs.pop('action_service', None)
88
        action_instance = action_cls(**kwargs)
89
        action_instance.action_service = action_service
90
91
    return action_instance
92
93
94
def invoke_post_run(liveaction_db, action_db=None):
95
    LOG.info('Invoking post run for action execution %s.', liveaction_db.id)
96
97
    # Identify action and runner.
98
    if not action_db:
99
        action_db = action_db_utils.get_action_by_ref(liveaction_db.action)
100
101
    if not action_db:
102
        LOG.exception('Unable to invoke post run. Action %s no longer exists.',
103
                      liveaction_db.action)
104
        return
105
106
    LOG.info('Action execution %s runs %s of runner type %s.',
107
             liveaction_db.id, action_db.name, action_db.runner_type['name'])
108
109
    # Get an instance of the action runner.
110
    runnertype_db = action_db_utils.get_runnertype_by_name(action_db.runner_type['name'])
111
    runner = runners.get_runner(runnertype_db.runner_module)
112
113
    # Configure the action runner.
114
    runner.container_service = RunnerContainerService()
115
    runner.action = action_db
116
    runner.action_name = action_db.name
117
    runner.action_execution_id = str(liveaction_db.id)
118
    runner.entry_point = RunnerContainerService.get_entry_point_abs_path(
119
        pack=action_db.pack, entry_point=action_db.entry_point)
120
    runner.context = getattr(liveaction_db, 'context', dict())
121
    runner.callback = getattr(liveaction_db, 'callback', dict())
122
    runner.libs_dir_path = RunnerContainerService.get_action_libs_abs_path(
123
        pack=action_db.pack, entry_point=action_db.entry_point)
124
125
    # Invoke the post_run method.
126
    runner.post_run(liveaction_db.status, liveaction_db.result)
127
128
129
def make_read_and_store_stream_func(execution_db, action_db, store_data_func):
130
    """
131
    Factory function which returns a function for reading from a stream (stdout / stderr).
132
133
    This function writes read data into a buffer and stores it in a database.
134
    """
135
    def read_and_store_stream(stream, buff):
136
        try:
137
            while not stream.closed:
138
                line = stream.readline()
139
                if not line:
140
                    break
141
142
                buff.write(line)
143
144
                # Filter out result delimiter lines
145
                if ACTION_OUTPUT_RESULT_DELIMITER in line:
146
                    continue
147
148
                if cfg.CONF.actionrunner.stream_output:
149
                    store_data_func(execution_db=execution_db, action_db=action_db, data=line)
150
        except RuntimeError:
151
            # process was terminated abruptly
152
            pass
153
        except eventlet.support.greenlets.GreenletExit:
154
            # Green thread exited / was killed
155
            pass
156
157
    return read_and_store_stream
158