Passed
Push — develop ( 7df2da...9e7253 )
by Plexxi
06:53 queued 03:27
created

st2reactor.cmd._run_worker()   B

Complexity

Conditions 6

Size

Total Lines 30

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 6
dl 0
loc 30
rs 7.5384
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import os
17
import sys
18
19
import eventlet
20
from oslo_config import cfg
21
22
from st2common import log as logging
23
from st2common.constants.timer import TIMER_ENABLED_LOG_LINE, TIMER_DISABLED_LOG_LINE
24
from st2common.logging.misc import get_logger_name_for_module
25
from st2common.service_setup import setup as common_setup
26
from st2common.service_setup import teardown as common_teardown
27
from st2common.util.monkey_patch import monkey_patch
28
from st2reactor.rules import config
29
from st2reactor.rules import worker
30
from st2reactor.timer.base import St2Timer
31
32
monkey_patch()
33
34
35
LOGGER_NAME = get_logger_name_for_module(sys.modules[__name__])
36
LOG = logging.getLogger(LOGGER_NAME)
37
38
39
def _setup():
40
    common_setup(service='rulesengine', config=config, setup_db=True, register_mq_exchanges=True,
41
                 register_signal_handlers=True)
42
43
44
def _teardown():
45
    common_teardown()
46
47
48
def _kickoff_timer(timer):
49
    timer.start()
50
51
52
def _run_worker():
53
    LOG.info('(PID=%s) RulesEngine started.', os.getpid())
54
55
    timer = None
56
    rules_engine_worker = worker.get_worker()
57
58
    try:
59
        timer_thread = None
60
        if cfg.CONF.timer.enable:
61
            timer = St2Timer(local_timezone=cfg.CONF.timer.local_timezone)
62
            timer_thread = eventlet.spawn(_kickoff_timer, timer)
63
            LOG.info(TIMER_ENABLED_LOG_LINE)
64
        else:
65
            LOG.info(TIMER_DISABLED_LOG_LINE)
66
        rules_engine_worker.start()
67
        if timer:
68
            return timer_thread.wait() and rules_engine_worker.wait()
69
        else:
70
            return rules_engine_worker.wait()
71
    except (KeyboardInterrupt, SystemExit):
72
        LOG.info('(PID=%s) RulesEngine stopped.', os.getpid())
73
        rules_engine_worker.shutdown()
74
    except:
75
        LOG.exception('(PID:%s) RulesEngine quit due to exception.', os.getpid())
76
        return 1
77
    finally:
78
        if timer:
79
            timer.cleanup()
80
81
    return 0
82
83
84
def main():
85
    try:
86
        _setup()
87
        return _run_worker()
88
    except SystemExit as exit_code:
89
        sys.exit(exit_code)
90
    except:
91
        LOG.exception('(PID=%s) RulesEngine quit due to exception.', os.getpid())
92
        return 1
93
    finally:
94
        _teardown()
95