Completed
Pull Request — master (#2549)
by Manas
04:56
created

st2reactor.cmd._run_worker()   B

Complexity

Conditions 6

Size

Total Lines 28

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 6
dl 0
loc 28
rs 7.5384
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import os
17
import sys
18
19
import eventlet
20
from oslo_config import cfg
21
22
from st2common import log as logging
23
from st2common.logging.misc import get_logger_name_for_module
24
from st2common.service_setup import setup as common_setup
25
from st2common.service_setup import teardown as common_teardown
26
from st2common.util.monkey_patch import monkey_patch
27
from st2reactor.rules import config
28
from st2reactor.rules import worker
29
from st2reactor.timer.base import St2Timer
30
31
monkey_patch()
32
33
34
LOGGER_NAME = get_logger_name_for_module(sys.modules[__name__])
35
LOG = logging.getLogger(LOGGER_NAME)
36
37
38
def _setup():
39
    common_setup(service='rulesengine', config=config, setup_db=True, register_mq_exchanges=True,
40
                 register_signal_handlers=True)
41
42
43
def _teardown():
44
    common_teardown()
45
46
47
def _kickoff_timer(timer):
48
    timer.start()
49
50
51
def _run_worker():
52
    LOG.info('(PID=%s) RulesEngine started.', os.getpid())
53
54
    timer = None
55
    rules_engine_worker = worker.get_worker()
56
57
    try:
58
        timer_thread = None
59
        if cfg.CONF.timer.enable:
60
            timer = St2Timer(local_timezone=cfg.CONF.timer.local_timezone)
61
            timer_thread = eventlet.spawn(_kickoff_timer, timer)
62
            LOG.info('Timer is enabled. Started on thread %s', timer_thread)
63
        rules_engine_worker.start()
64
        if timer:
65
            return timer_thread.wait() and rules_engine_worker.wait()
66
        else:
67
            return rules_engine_worker.wait()
68
    except (KeyboardInterrupt, SystemExit):
69
        LOG.info('(PID=%s) RulesEngine stopped.', os.getpid())
70
        rules_engine_worker.shutdown()
71
    except:
72
        LOG.exception('(PID:%s) RulesEngine quit due to exception.', os.getpid())
73
        return 1
74
    finally:
75
        if timer:
76
            timer.cleanup()
77
78
    return 0
79
80
81
def main():
82
    try:
83
        _setup()
84
        return _run_worker()
85
    except SystemExit as exit_code:
86
        sys.exit(exit_code)
87
    except:
88
        LOG.exception('(PID=%s) RulesEngine quit due to exception.', os.getpid())
89
        return 1
90
    finally:
91
        _teardown()
92