Completed
Push — master ( 4a733d...64ab4d )
by George
02:09
created

test_consumer()   A

Complexity

Conditions 3

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
c 1
b 0
f 0
dl 0
loc 4
rs 10
1
import json
2
from unittest import mock
3
4
from click.testing import CliRunner
0 ignored issues
show
Configuration introduced by
The import click.testing could not be resolved.

This can be caused by one of the following:

1. Missing Dependencies

This error could indicate a configuration issue of Pylint. Make sure that your libraries are available by adding the necessary commands.

# .scrutinizer.yml
before_commands:
    - sudo pip install abc # Python2
    - sudo pip3 install abc # Python3
Tip: We are currently not using virtualenv to run pylint, when installing your modules make sure to use the command for the correct version.

2. Missing __init__.py files

This error could also result from missing __init__.py files in your module folders. Make sure that you place one file in each sub-folder.

Loading history...
5
6
import pytest
7
8
from loafer import conf
9
from loafer.cli import cli
10
11
12
@pytest.fixture
13
def runner():
14
    return CliRunner()
15
16
17
@pytest.fixture
18
def local_settings():
19
    return conf.Settings()
20
21
22
def test_verbose(runner, local_settings):
23
    with mock.patch('loafer.cli.settings', local_settings) as settings:
24
        runner.invoke(cli, ['-v'])
25
        assert settings.LOAFER_LOGLEVEL == 'INFO'
26
27
28
def test_very_verbose(runner, local_settings):
29
    with mock.patch('loafer.cli.settings', local_settings) as settings:
30
        runner.invoke(cli, ['-vv'])
31
        assert settings.LOAFER_LOGLEVEL == 'DEBUG'
32
33
34
def test_max_threads(runner, local_settings):
35
    with mock.patch('loafer.cli.settings', local_settings) as settings:
36
        runner.invoke(cli, ['--max-threads', '20'])
37
        assert settings.LOAFER_MAX_THREAD_POOL == 20
38
39
40
def test_max_threads_error(runner, local_settings):
41
    result = runner.invoke(cli, ['--max-threads', 'a'])
42
    assert isinstance(result.exception, SystemExit)
43
44
    with mock.patch('loafer.cli.settings', local_settings) as settings:
45
        result = runner.invoke(cli, ['--max-threads', '0'])
46
        # preserves default value
47
        assert settings.LOAFER_MAX_THREAD_POOL is None
48
49
50
def test_source(runner, local_settings):
51
    with mock.patch('loafer.cli.settings', local_settings) as settings:
52
        runner.invoke(cli, ['--source', 'foobar'])
53
        routes = settings.LOAFER_ROUTES
54
        assert routes[0]['source'] == 'foobar'
55
56
57
def test_handler(runner, local_settings):
58
    with mock.patch('loafer.cli.settings', local_settings) as settings:
59
        runner.invoke(cli, ['--handler', 'foobar'])
60
        routes = settings.LOAFER_ROUTES
61
        assert routes[0]['handler'] == 'foobar'
62
63
64
def test_translator(runner, local_settings):
65
    with mock.patch('loafer.cli.settings', local_settings) as settings:
66
        runner.invoke(cli, ['--translator', 'foobar'])
67
        routes = settings.LOAFER_ROUTES
68
        assert routes[0]['message_translator'] == 'foobar'
69
70
71
def test_consumer_opts(runner, local_settings):
72
    with mock.patch('loafer.cli.settings', local_settings) as settings:
73
        for opt in [{}, '1', [], '{"key": "value"}', '""']:
74
            runner.invoke(cli, ['--consumer-opts', str(opt)])
75
            assert settings.LOAFER_DEFAULT_CONSUMER_OPTIONS == json.loads(str(opt))
76
77
78
def test_consumer_opts_error(runner):
79
    for opt in ['test', '123:123']:
80
        result = runner.invoke(cli, ['--consumer-opts', opt])
81
        assert isinstance(result.exception, json.decoder.JSONDecodeError)
82