Passed
Branch master (dfca16)
by Emmanuel
01:28
created

TestQueueListener   A

Complexity

Total Complexity 9

Size/Duplication

Total Lines 63
Duplicated Lines 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
wmc 9
c 2
b 0
f 0
dl 0
loc 63
rs 10
1
import os
2
import signal
3
import subprocess
4
import unittest
5
from impulsare_config import Reader as ConfigReader
6
7
8
# https://docs.python.org/3/library/unittest.html#assert-methods
9
class TestQueueListener(unittest.TestCase):
10
    base_path = os.path.abspath(os.path.dirname(__file__))
11
    base_cmd = ['queue-listener']
12
13
    def test_requires_config(self):
14
        res = self._exec_cmd(self.base_cmd)
15
        self.assertIs(res['status'], 2)
16
        self.assertEqual(res['stdout'], '')
17
        self.assertRegex(res['stderr'], '.*Missing option "--host"')
18
19
20
    def test_bad_host(self):
21
        cmd = self.base_cmd + ['-h', '127.0.0.1', '-p', '80', '-q', 'wrong']
22
        res = self._exec_cmd(cmd)
23
        self.assertIs(res['status'], 1, "Can't get status 1, message: {} ('{}')".format(res['stderr'], cmd))
24
        self.assertEqual(res['stdout'], '')
25
        self.assertRegex(res['stderr'], '.*Error 111 connecting to 127.0.0.1:80. Connection refused.*')
26
        self.assertNotRegex(res['stderr'], '.*redis.exceptions.ConnectionError: Error 111 connecting to 127.0.0.1:80. Connection refused.*')
27
28
29
    def test_bad_host_debug(self):
30
        cmd = self.base_cmd + ['--debug', '-h', '127.0.0.1', '-p', '80', '-q', 'wrong']
31
        res = self._exec_cmd(cmd)
32
        self.assertIs(res['status'], 1, "Can't get status 1, message: {} ('{}')".format(res['stderr'], cmd))
33
        self.assertEqual(res['stdout'], '')
34
        self.assertRegex(res['stderr'], '.*redis.exceptions.ConnectionError: Error 111 connecting to 127.0.0.1:80. Connection refused.*')
35
36
37
    def test_right_config(self):
38
        config = self._get_config()
39
        cmd = self.base_cmd + ['-h', config['distributer']['host'], '-q', config['testqueue']['queue']]
40
        res = self._exec_cmd(cmd)
41
        self.assertIs(res['status'], 0, "Can't get status 0, message: {} ('{}')".format(res['stderr'], cmd))
42
        self.assertEqual(res['stdout'], '')
43
        self.assertRegex(res['stderr'], '.*RQ worker.*started')
44
45
46
    def _get_config(self):
47
        config_specs = self.base_path + '/../impulsare_distributer/static/specs.yml'
48
        config_default = self.base_path + '/../impulsare_distributer/static/default.yml'
49
50
        config_file = self.base_path + '/static/config_valid.yml'
51
        # Use another server, make sure to have the right configuration file
52
        if 'REDIS' in os.environ and os.environ['REDIS'] != '127.0.0.1':
53
            config_file = self.base_path + '/static/config_valid_{}.yml'.format(os.environ['REDIS'])
54
55
        config = ConfigReader().parse(config_file, config_specs, config_default)
56
57
        return config
58
59
60
    def _exec_cmd(self, cmd: list):
61
        p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
62
        try:
63
            stdout, stderr = p.communicate(timeout=3)
64
        except subprocess.TimeoutExpired:
65
            p.terminate()
66
            stdout, stderr = p.communicate()
67
68
        stdout = stdout.decode().strip().replace('\n', '')
69
        stderr = stderr.decode().strip().replace('\n', '')
70
71
        return {'stdout': stdout, 'stderr': stderr, 'status': p.returncode}
72