1
|
|
|
from unittest import mock |
2
|
|
|
|
3
|
|
|
from loafer.ext.aws.message_translators import SQSMessageTranslator, SNSMessageTranslator |
4
|
|
|
from loafer.ext.aws.providers import SQSProvider |
|
|
|
|
5
|
|
|
from loafer.ext.aws.routes import SQSRoute, SNSQueueRoute |
6
|
|
|
|
7
|
|
|
|
8
|
|
|
def test_sqs_route(): |
9
|
|
|
route = SQSRoute('what', handler='ever') |
10
|
|
|
assert isinstance(route.message_translator, SQSMessageTranslator) |
|
|
|
|
11
|
|
|
assert isinstance(route.provider, SQSProvider) |
|
|
|
|
12
|
|
|
assert route.name == 'what' |
|
|
|
|
13
|
|
|
|
14
|
|
|
|
15
|
|
|
def test_sqs_route_keep_message_translator(): |
|
|
|
|
16
|
|
|
route = SQSRoute('what', handler='ever', message_translator=mock.Mock()) |
17
|
|
|
assert isinstance(route.message_translator, mock.Mock) |
|
|
|
|
18
|
|
|
|
19
|
|
|
|
20
|
|
|
def test_sqs_route_keep_name(): |
21
|
|
|
route = SQSRoute('what', handler='ever', name='foobar') |
22
|
|
|
assert route.name == 'foobar' |
|
|
|
|
23
|
|
|
|
24
|
|
|
|
25
|
|
|
def test_sns_queue_route(): |
26
|
|
|
route = SNSQueueRoute('what', handler='ever') |
27
|
|
|
assert isinstance(route.message_translator, SNSMessageTranslator) |
|
|
|
|
28
|
|
|
assert isinstance(route.provider, SQSProvider) |
|
|
|
|
29
|
|
|
assert route.name == 'what' |
|
|
|
|
30
|
|
|
|
31
|
|
|
|
32
|
|
|
def test_sns_queue_route_keep_message_translator(): |
|
|
|
|
33
|
|
|
route = SNSQueueRoute('what', handler='ever', message_translator=mock.Mock()) |
34
|
|
|
assert isinstance(route.message_translator, mock.Mock) |
|
|
|
|
35
|
|
|
|
36
|
|
|
|
37
|
|
|
def test_sns_queue_route_keep_name(): |
38
|
|
|
route = SNSQueueRoute('what', handler='ever', name='foobar') |
39
|
|
|
assert route.name == 'foobar' |
|
|
|
|
40
|
|
|
|