Completed
Push — master ( 761ae3...12898b )
by
unknown
02:31
created

AWSSQSSensor   A

Complexity

Total Complexity 23

Size/Duplication

Total Lines 98
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 98
rs 10
wmc 23

11 Methods

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 3 1
A remove_trigger() 0 2 1
A update_trigger() 0 3 1
A cleanup() 0 2 1
A add_trigger() 0 3 1
A poll() 0 9 4
A setup() 0 23 3
A _setup_sqs() 0 8 1
B _get_config_entry() 0 17 6
A _receive_messages() 0 5 1
A _get_queue_by_name() 0 12 3
1
"""
2
This is generic SQS Sensor using boto3 api to fetch messages from sqs queue.
3
After receiving a message it's content is passed as payload to a trigger 'aws.sqs_new_message'
4
This sensor can be configured either by using config.yaml within a pack or by creating
5
following values in datastore:
6
    - aws.input_queues (list queues as comma separated string: first_queue,second_queue)
7
    - aws.aws_access_key_id
8
    - aws.aws_secret_access_key
9
    - aws.region
10
    - aws.max_number_of_messages (must be between 1 - 10)
11
For configuration in config.yaml with config like this
12
    setup:
13
      aws_access_key_id:
14
      aws_access_key_id:
15
      region:
16
    sqs_sensor:
17
      input_queues:
18
        - first_queue
19
        - second_queue
20
    sqs_other:
21
        max_number_of_messages: 1
22
If any value exist in datastore it will be taken instead of any value in config.yaml
23
"""
24
25
import six
26
from boto3.session import Session
27
from botocore.exceptions import ClientError
28
29
from st2reactor.sensor.base import PollingSensor
30
31
32
class AWSSQSSensor(PollingSensor):
33
    def __init__(self, sensor_service, config=None, poll_interval=5):
34
        super(AWSSQSSensor, self).__init__(sensor_service=sensor_service, config=config,
35
                                           poll_interval=poll_interval)
36
37
    def setup(self):
38
        queues = self._get_config_entry(key='input_queues', prefix='sqs_sensor')
39
40
        # XXX: This is a hack as from datastore we can only receive a string while
41
        # from config.yaml we can receive a list
42
        if isinstance(queues, six.string_types):
43
            self.input_queues = [x.strip() for x in queues.split(',')]
44
        else:
45
            self.input_queues = queues
46
47
        self.aws_access_key = self._get_config_entry('aws_access_key_id')
48
        self.aws_secret_key = self._get_config_entry('aws_secret_access_key')
49
        self.aws_region = self._get_config_entry('region')
50
51
        self.max_number_of_messages = self._get_config_entry('max_number_of_messages',
52
                                                             prefix='sqs_other')
53
54
        self._logger = self._sensor_service.get_logger(name=self.__class__.__name__)
55
56
        self.session = None
57
        self.sqs_res = None
58
59
        self._setup_sqs()
60
61
    def poll(self):
62
        for queue in self.input_queues:
63
            msgs = self._receive_messages(queue=self._get_queue_by_name(queue),
64
                                          num_messages=self.max_number_of_messages)
65
            for msg in msgs:
66
                if msg:
67
                    payload = {"queue": queue, "body": msg.body}
68
                    self._sensor_service.dispatch(trigger="aws.sqs_new_message", payload=payload)
69
                    msg.delete()
70
71
    def cleanup(self):
72
        pass
73
74
    def add_trigger(self, trigger):
75
        # This method is called when trigger is created
76
        pass
77
78
    def update_trigger(self, trigger):
79
        # This method is called when trigger is updated
80
        pass
81
82
    def remove_trigger(self, trigger):
83
        pass
84
85
    def _get_config_entry(self, key, prefix=None):
86
        ''' Get configuration values either from Datastore or config file. '''
87
        config = self.config
88
        if prefix:
89
            config = self._config.get(prefix, {})
90
91
        value = self._sensor_service.get_value('aws.%s' % (key), local=False)
92
        if not value:
93
            value = config.get(key, None)
94
95
        if not value and config.get('setup', None):
96
            value = config['setup'].get(key, None)
97
98
        if not value:
99
            raise ValueError('[AWSSQSSensor]: Configuration for %s key is missing.' % (key))
100
101
        return value
102
103
    def _setup_sqs(self):
104
        ''' Setup Boto3 structures '''
105
        self._logger.debug('Setting up SQS resources')
106
        self.session = Session(aws_access_key_id=self.aws_access_key,
107
                               aws_secret_access_key=self.aws_secret_key,
108
                               region_name=self.aws_region)
109
110
        self.sqs_res = self.session.resource('sqs')
111
112
    def _get_queue_by_name(self, queueName):
113
        ''' Fetch QUEUE by it's name create new one if queue doesn't exist '''
114
        try:
115
            queue = self.sqs_res.get_queue_by_name(QueueName=queueName)
116
        except ClientError as e:
117
            self._logger.warning("SQS Queue: %s doesn't exist, creating it.", queueName)
118
            if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue':
119
                queue = self.sqs_res.create_queue(QueueName=queueName)
120
            else:
121
                raise
122
123
        return queue
124
125
    def _receive_messages(self, queue, num_messages, wait_time=2):
126
        ''' Receive a message from queue and return it. '''
127
        msgs = queue.receive_messages(WaitTimeSeconds=wait_time, MaxNumberOfMessages=num_messages)
128
129
        return msgs
130