Test Failed
Push — master ( 0496d3...626f66 )
by W
01:28
created

st2common/st2common/runners/base.py (11 issues)

Labels
Severity
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
0 ignored issues
show
There seems to be a cyclic import (st2common.models.db.action -> st2common.models.db.liveaction -> st2common.util.action_db -> st2common.persistence.action).

Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.

Loading history...
There seems to be a cyclic import (st2common.models.api.base -> st2common.util.schema -> st2common.util.action_db -> st2common.persistence.action -> st2common.persistence.executionstate -> st2common.transport -> st2common.transport.reactor -> st2common.models.api.trace).

Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.

Loading history...
There seems to be a cyclic import (st2common.models.api.base -> st2common.util.schema -> st2common.util.action_db -> st2common.persistence.action -> st2common.persistence.liveaction -> st2common.transport -> st2common.transport.reactor -> st2common.models.api.trace).

Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.

Loading history...
There seems to be a cyclic import (st2common.models.api.base -> st2common.util.schema -> st2common.util.action_db -> st2common.persistence.action -> st2common.persistence.runner -> st2common.persistence.base -> st2common.transport.reactor -> st2common.models.api.trace).

Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.

Loading history...
There seems to be a cyclic import (st2common.models.api.base -> st2common.util.schema -> st2common.util.action_db -> st2common.persistence.liveaction -> st2common.transport -> st2common.transport.reactor -> st2common.models.api.trace).

Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.

Loading history...
There seems to be a cyclic import (st2common.models.db.liveaction -> st2common.util.action_db -> st2common.persistence.liveaction).

Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.

Loading history...
There seems to be a cyclic import (st2common.models.db.liveaction -> st2common.util.action_db -> st2common.persistence.action -> st2common.persistence.liveaction).

Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.

Loading history...
There seems to be a cyclic import (st2common.models.api.base -> st2common.util.schema -> st2common.util.action_db -> st2common.persistence.action -> st2common.persistence.actionalias -> st2common.persistence.base -> st2common.transport.reactor -> st2common.models.api.trace).

Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.

Loading history...
There seems to be a cyclic import (st2common.models.api.base -> st2common.util.schema -> st2common.util.action_db -> st2common.persistence.action -> st2common.persistence.execution -> st2common.transport -> st2common.transport.reactor -> st2common.models.api.trace).

Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.

Loading history...
There seems to be a cyclic import (st2common.models.api.base -> st2common.util.schema -> st2common.util.action_db -> st2common.persistence.action -> st2common.persistence.liveaction -> st2common.persistence.base -> st2common.transport.reactor -> st2common.models.api.trace).

Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.

Loading history...
There seems to be a cyclic import (st2common.models.api.base -> st2common.util.schema -> st2common.util.action_db -> st2common.persistence.action -> st2common.persistence.executionstate -> st2common.persistence.base -> st2common.transport.reactor -> st2common.models.api.trace).

Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.

Loading history...
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
18
import os
19
import abc
20
import shutil
21
import tempfile
22
from subprocess import list2cmdline
23
24
import six
25
import yaml
26
from oslo_config import cfg
27
from eventlet.green import subprocess
28
29
from st2common import log as logging
30
from st2common.constants import action as action_constants
31
from st2common.constants import pack as pack_constants
32
from st2common.content.utils import get_pack_directory
33
from st2common.content.utils import get_pack_base_path
34
from st2common.exceptions import actionrunner as exc
35
from st2common.util.loader import register_runner
36
from st2common.util.loader import register_callback_module
37
from st2common.util.api import get_full_public_api_url
38
from st2common.util.deprecation import deprecated
39
from st2common.util.green.shell import run_command
40
41
__all__ = [
42
    'ActionRunner',
43
    'AsyncActionRunner',
44
    'PollingAsyncActionRunner',
45
    'GitWorktreeActionRunner',
46
    'PollingAsyncActionRunner',
47
    'ShellRunnerMixin',
48
49
    'get_runner',
50
    'get_metadata'
51
]
52
53
54
LOG = logging.getLogger(__name__)
55
56
# constants to lookup in runner_parameters
57
RUNNER_COMMAND = 'cmd'
58
RUNNER_CONTENT_VERSION = 'content_version'
59
RUNNER_DEBUG = 'debug'
60
61
62
def get_runner(package_name, module_name, config=None):
63
    """
64
    Load the module and return an instance of the runner.
65
    """
66
67
    if not package_name:
68
        # Backward compatibility for Pre 2.7.0 where package name always equaled module name
69
        package_name = module_name
70
71
    LOG.debug('Runner loading Python module: %s.%s', package_name, module_name)
72
73
    try:
74
        # TODO: Explore modifying this to support register_plugin
75
        module = register_runner(package_name=package_name, module_name=module_name)
76
    except Exception as e:
77
        msg = ('Failed to import runner module %s.%s' % (package_name, module_name))
78
        LOG.exception(msg)
79
80
        raise exc.ActionRunnerCreateError('%s\n\n%s' % (msg, str(e)))
81
82
    LOG.debug('Instance of runner module: %s', module)
83
84
    if config:
85
        runner_kwargs = {'config': config}
86
    else:
87
        runner_kwargs = {}
88
89
    runner = module.get_runner(**runner_kwargs)
90
    LOG.debug('Instance of runner: %s', runner)
91
    return runner
92
93
94
def get_metadata(package_name):
95
    """
96
    Return runner related metadata for the provided runner package name.
97
98
    :rtype: ``list`` of ``dict``
99
    """
100
    import pkg_resources
101
102
    file_path = pkg_resources.resource_filename(package_name, 'runner.yaml')
103
104
    with open(file_path, 'r') as fp:
105
        content = fp.read()
106
107
    metadata = yaml.safe_load(content)
108
    return metadata
109
110
111
@six.add_metaclass(abc.ABCMeta)
112
class ActionRunner(object):
113
    """
114
    The interface that must be implemented by each StackStorm
115
    Action Runner implementation.
116
    """
117
118
    def __init__(self, runner_id):
119
        """
120
        :param id: Runner id.
121
        :type id: ``str``
122
        """
123
        self.runner_id = runner_id
124
        self.runner_type = None
125
        self.runner_parameters = None
126
        self.action = None
127
        self.action_name = None
128
        self.liveaction = None
129
        self.liveaction_id = None
130
        self.execution = None
131
        self.execution_id = None
132
        self.entry_point = None
133
        self.libs_dir_path = None
134
        self.context = None
135
        self.callback = None
136
        self.auth_token = None
137
        self.rerun_ex_ref = None
138
139
    def pre_run(self):
140
        # Handle runner "enabled" attribute
141
        runner_enabled = getattr(self.runner_type, 'enabled', True)
142
        runner_name = getattr(self.runner_type, 'name', 'unknown')
143
144
        if not runner_enabled:
145
            msg = 'Runner "%s" has been disabled by the administrator.' % runner_name
146
            raise ValueError(msg)
147
148
        runner_parameters = getattr(self, 'runner_parameters', {}) or {}
149
        self._debug = runner_parameters.get(RUNNER_DEBUG, False)
150
151
    # Run will need to take an action argument
152
    # Run may need result data argument
153
    @abc.abstractmethod
154
    def run(self, action_parameters):
155
        raise NotImplementedError()
156
157
    def pause(self):
158
        runner_name = getattr(self.runner_type, 'name', 'unknown')
159
        raise NotImplementedError('Pause is not supported for runner %s.' % runner_name)
160
161
    def resume(self):
162
        runner_name = getattr(self.runner_type, 'name', 'unknown')
163
        raise NotImplementedError('Resume is not supported for runner %s.' % runner_name)
164
165
    def cancel(self):
166
        return (
167
            action_constants.LIVEACTION_STATUS_CANCELED,
168
            self.liveaction.result,
169
            self.liveaction.context
170
        )
171
172
    def post_run(self, status, result):
173
        if self.callback and isinstance(self.callback, dict) and 'source' in self.callback:
174
            callback_module = register_callback_module(self.callback['source'])
175
            callback_handler = callback_module.get_instance()
176
            callback_handler.callback(self.liveaction)
177
178
    @deprecated
179
    def get_pack_name(self):
180
        return self.get_pack_ref()
181
182
    def get_pack_ref(self):
183
        """
184
        Retrieve pack name for the action which is being currently executed.
185
186
        :rtype: ``str``
187
        """
188
        if self.action:
189
            return self.action.pack
190
191
        return pack_constants.DEFAULT_PACK_NAME
192
193
    def get_user(self):
194
        """
195
        Retrieve a name of the user which triggered this action execution.
196
197
        :rtype: ``str``
198
        """
199
        context = getattr(self, 'context', {}) or {}
200
        user = context.get('user', cfg.CONF.system_user.user)
201
202
        return user
203
204
    def _get_common_action_env_variables(self):
205
        """
206
        Retrieve common ST2_ACTION_ environment variables which will be available to the action.
207
208
        Note: Environment variables are prefixed with ST2_ACTION_* so they don't clash with CLI
209
        environment variables.
210
211
        :rtype: ``dict``
212
        """
213
        result = {}
214
        result['ST2_ACTION_PACK_NAME'] = self.get_pack_ref()
215
        result['ST2_ACTION_EXECUTION_ID'] = str(self.execution_id)
216
        result['ST2_ACTION_API_URL'] = get_full_public_api_url()
217
218
        if self.auth_token:
219
            result['ST2_ACTION_AUTH_TOKEN'] = self.auth_token.token
220
221
        return result
222
223
    def __str__(self):
224
        attrs = ', '.join(['%s=%s' % (k, v) for k, v in six.iteritems(self.__dict__)])
225
        return '%s@%s(%s)' % (self.__class__.__name__, str(id(self)), attrs)
226
227
228
@six.add_metaclass(abc.ABCMeta)
229
class AsyncActionRunner(ActionRunner):
230
    pass
231
232
233
class PollingAsyncActionRunner(AsyncActionRunner):
234
235
    @classmethod
236
    def is_polling_enabled(cls):
237
        return True
238
239
240
@six.add_metaclass(abc.ABCMeta)
241
class GitWorktreeActionRunner(ActionRunner):
242
    """
243
    Base class for runners which work with files (e.g. Python runner, Local script runner)
244
    and support git worktree functionality - ability to use a file under a specific git revision
245
    from a git repository.
246
247
    This revision is specified using "content_version" runner parameter.
248
    """
249
250
    WORKTREE_DIRECTORY_PREFIX = 'st2-git-worktree-'
251
252
    def __init__(self, runner_id):
253
        super(GitWorktreeActionRunner, self).__init__(runner_id=runner_id)
254
255
        # Git work tree related attributes
256
        self.git_worktree_path = None
257
        self.git_worktree_revision = None
258
259
    def pre_run(self):
260
        super(GitWorktreeActionRunner, self).pre_run()
261
262
        # Handle git worktree creation
263
        self._content_version = self.runner_parameters.get(RUNNER_CONTENT_VERSION, None)
264
265
        if self._content_version:
266
            self.create_git_worktree(content_version=self._content_version)
267
268
            # Override entry_point so it points to git worktree directory
269
            pack_name = self.get_pack_name()
270
            entry_point = self._get_entry_point_for_worktree_path(pack_name=pack_name,
271
                                                  entry_point=self.entry_point,
272
                                                  worktree_path=self.git_worktree_path)
273
274
            assert(entry_point.startswith(self.git_worktree_path))
275
276
            self.entry_point = entry_point
277
278
    def post_run(self, status, result):
279
        super(GitWorktreeActionRunner, self).post_run(status=status, result=result)
280
281
        # Remove git worktree directories (if used and available)
282
        if self.git_worktree_path and self.git_worktree_revision:
283
            pack_name = self.get_pack_name()
284
            self.cleanup_git_worktree(worktree_path=self.git_worktree_path,
285
                                      content_version=self.git_worktree_revision,
286
                                      pack_name=pack_name)
287
288
    def create_git_worktree(self, content_version):
289
        """
290
        Create a git worktree for the provided git content version.
291
292
        :return: Path to the created git worktree directory.
293
        :rtype: ``str``
294
        """
295
        pack_name = self.get_pack_name()
296
        pack_directory = get_pack_directory(pack_name=pack_name)
297
        worktree_path = tempfile.mkdtemp(prefix=self.WORKTREE_DIRECTORY_PREFIX)
298
299
        # Set class variables
300
        self.git_worktree_revision = content_version
301
        self.git_worktree_path = worktree_path
302
303
        extra = {
304
            'pack_name': pack_name,
305
            'pack_directory': pack_directory,
306
            'content_version': content_version,
307
            'worktree_path': worktree_path
308
        }
309
310
        if not os.path.isdir(pack_directory):
311
            msg = ('Failed to create git worktree for pack "%s". Pack directory "%s" doesn\'t '
312
                   'exist.' % (pack_name, pack_directory))
313
            raise ValueError(msg)
314
315
        args = [
316
            'git',
317
            '-C',
318
            pack_directory,
319
            'worktree',
320
            'add',
321
            worktree_path,
322
            content_version
323
        ]
324
        cmd = list2cmdline(args)
325
326
        LOG.debug('Creating git worktree for pack "%s", content version "%s" and execution '
327
                  'id "%s" in "%s"' % (pack_name, content_version, self.execution_id,
328
                                       worktree_path), extra=extra)
329
        LOG.debug('Command: %s' % (cmd))
330
        exit_code, stdout, stderr, timed_out = run_command(cmd=cmd,
331
                                                           cwd=pack_directory,
332
                                                           stdout=subprocess.PIPE,
333
                                                           stderr=subprocess.PIPE,
334
                                                           shell=True)
335
336
        if exit_code != 0:
337
            self._handle_git_worktree_error(pack_name=pack_name, pack_directory=pack_directory,
338
                                            content_version=content_version,
339
                                            exit_code=exit_code, stdout=stdout, stderr=stderr)
340
        else:
341
            LOG.debug('Git worktree created in "%s"' % (worktree_path), extra=extra)
342
343
        # Make sure system / action runner user can access that directory
344
        args = [
345
            'chmod',
346
            '777',
347
            worktree_path
348
        ]
349
        cmd = list2cmdline(args)
350
        run_command(cmd=cmd, shell=True)
351
352
        return worktree_path
353
354
    def cleanup_git_worktree(self, worktree_path, pack_name, content_version):
355
        """
356
        Remove / cleanup the provided git worktree directory.
357
358
        :rtype: ``bool``
359
        """
360
        # Safety check to make sure we don't remove something outside /tmp
361
        assert(worktree_path.startswith('/tmp'))
362
        assert(worktree_path.startswith('/tmp/%s' % (self.WORKTREE_DIRECTORY_PREFIX)))
363
364
        if self._debug:
365
            LOG.debug('Not removing git worktree "%s" because debug mode is enabled' %
366
                      (worktree_path))
367
        else:
368
            LOG.debug('Removing git worktree "%s" for pack "%s" and content version "%s"' %
369
                      (worktree_path, pack_name, content_version))
370
371
            try:
372
                shutil.rmtree(worktree_path, ignore_errors=True)
373
            except:
374
                pass
375
376
        return True
377
378
    def _handle_git_worktree_error(self, pack_name, pack_directory, content_version, exit_code,
379
                                   stdout, stderr):
380
        """
381
        Handle "git worktree" related errors and throw a more user-friendly exception.
382
        """
383
        error_prefix = 'Failed to create git worktree for pack "%s": ' % (pack_name)
384
385
        if isinstance(stdout, six.binary_type):
386
            stdout = stdout.decode('utf-8')
387
388
        if isinstance(stderr, six.binary_type):
389
            stderr = stderr.decode('utf-8')
390
391
        # 1. Installed version of git which doesn't support worktree command
392
        if "git: 'worktree' is not a git command." in stderr:
393
            msg = ('Installed git version doesn\'t support git worktree command. '
394
                   'To be able to utilize this functionality you need to use git '
395
                   '>= 2.5.0.')
396
            raise ValueError(error_prefix + msg)
397
398
        # 2. Provided pack directory is not a git repository
399
        if "Not a git repository" in stderr:
400
            msg = ('Pack directory "%s" is not a git repository. To utilize this functionality, '
401
                   'pack directory needs to be a git repository.' % (pack_directory))
402
            raise ValueError(error_prefix + msg)
403
404
        # 3. Invalid revision provided
405
        if "invalid reference" in stderr:
406
            msg = ('Invalid content_version "%s" provided. Make sure that git repository is up '
407
                   'to date and contains that revision.' % (content_version))
408
            raise ValueError(error_prefix + msg)
409
410
    def _get_entry_point_for_worktree_path(self, pack_name, entry_point, worktree_path):
411
        """
412
        Method which returns path to an action entry point which is located inside the git
413
        worktree directory.
414
415
        :rtype: ``str``
416
        """
417
        pack_base_path = get_pack_base_path(pack_name=pack_name)
418
419
        new_entry_point = entry_point.replace(pack_base_path, '')
420
421
        # Remove leading slash (if any)
422
        if new_entry_point.startswith('/'):
423
            new_entry_point = new_entry_point[1:]
424
425
        new_entry_point = os.path.join(worktree_path, new_entry_point)
426
427
        # Check to prevent directory traversal
428
        common_prefix = os.path.commonprefix([worktree_path, new_entry_point])
429
        if common_prefix != worktree_path:
430
            raise ValueError('entry_point is not located inside the pack directory')
431
432
        return new_entry_point
433
434
435
class ShellRunnerMixin(object):
436
    """
437
    Class which contains utility functions to be used by shell runners.
438
    """
439
440
    def _transform_named_args(self, named_args):
441
        """
442
        Transform named arguments to the final form.
443
444
        :param named_args: Named arguments.
445
        :type named_args: ``dict``
446
447
        :rtype: ``dict``
448
        """
449
        if named_args:
450
            return {self._kwarg_op + k: v for (k, v) in six.iteritems(named_args)}
451
        return None
452
453
    def _get_script_args(self, action_parameters):
454
        """
455
        :param action_parameters: Action parameters.
456
        :type action_parameters: ``dict``
457
458
        :return: (positional_args, named_args)
459
        :rtype: (``str``, ``dict``)
460
        """
461
        # Lazy import to speed up import of this module
462
        from st2common.util import action_db as action_utils
463
464
        # TODO: return list for positional args, command classes should escape it
465
        # and convert it to string
466
467
        is_script_run_as_cmd = self.runner_parameters.get(RUNNER_COMMAND, None)
468
469
        pos_args = ''
470
        named_args = {}
471
472
        if is_script_run_as_cmd:
473
            pos_args = self.runner_parameters.get(RUNNER_COMMAND, '')
474
            named_args = action_parameters
475
        else:
476
            pos_args, named_args = action_utils.get_args(action_parameters, self.action)
477
478
        return pos_args, named_args
479