Completed
Pull Request — master (#9)
by George
02:20
created

test_sqs_publish_with_queue_url()   B

Complexity

Conditions 6

Size

Total Lines 13

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 6
dl 0
loc 13
rs 8
1
# -*- coding: utf-8 -*-
2
# vi:si:et:sw=4:sts=4:ts=4
3
4
import json
5
6
from loafer.aws.publisher import sqs_publish, sns_publish
7
8
9
def test_sqs_publish(mock_boto_client_sqs):
10
    with mock_boto_client_sqs as mock_sqs:
11
        message = '{"test": "hey"}'
12
        response = sqs_publish('queue-name', message)
13
        assert response
14
        assert mock_sqs.called
15
16
        client = mock_sqs()
17
        assert client.send_message.called
18
        queue_url = client.get_queue_url()['QueueUrl']
19
        assert client.send_message.called_once_with(
20
            MessageBody='{}'.format(message),
21
            QueueUrl=queue_url)
22
23
24
def test_sqs_publish_with_queue_url(mock_boto_client_sqs):
25
    with mock_boto_client_sqs as mock_sqs:
26
        message = '{"test": "hey"}'
27
        response = sqs_publish('https://blabla/queue-name', message)
28
        assert response
29
        assert mock_sqs.called
30
31
        client = mock_sqs()
32
        assert client.send_message.called
33
        queue_url = client.get_queue_url()['QueueUrl']
34
        assert client.send_message.called_once_with(
35
            MessageBody='{}'.format(message),
36
            QueueUrl=queue_url)
37
38
39
def test_sns_publisher(mock_boto_client_sns):
40
    with mock_boto_client_sns as mock_sns:
41
        message = '{"test": "hey"}'
42
        topic = 'arn:blabla:topic-name'
43
        response = sns_publish(topic, message)
44
        assert response
45
        assert mock_sns.called
46
47
        client = mock_sns()
48
        assert client.publish.called
49
        message_sent = json.dumps({'default': message})
50
        assert client.publish.called_once_with(
51
            TopicArn=topic,
52
            MessageStructure='json',
53
            Message=message_sent)
54
55
56
def test_sns_publisher_with_topic_name(mock_boto_client_sns):
0 ignored issues
show
Coding Style Naming introduced by
The name test_sns_publisher_with_topic_name does not conform to the function naming conventions ([a-z_][a-z0-9_]{2,30}$).

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
57
    with mock_boto_client_sns as mock_sns:
58
        message = '{"test": "hey"}'
59
        topic = 'topic-name'
60
        response = sns_publish(topic, message)
61
        assert response
62
        assert mock_sns.called
63
64
        client = mock_sns()
65
        assert client.publish.called
66