Completed
Pull Request — master (#366)
by
unknown
03:42
created

AWSSQSSensor   A

Complexity

Total Complexity 15

Size/Duplication

Total Lines 77
Duplicated Lines 0 %
Metric Value
wmc 15
dl 0
loc 77
rs 10

11 Methods

Rating   Name   Duplication   Size   Complexity  
A _SetupSqs() 0 8 1
A _GetQueueByName() 0 12 3
A remove_trigger() 0 2 1
A poll() 0 6 2
A update_trigger() 0 3 1
A cleanup() 0 2 1
A _GetConfigEntry() 0 9 2
A add_trigger() 0 3 1
A _receive_messages() 0 5 1
A __init__() 0 3 1
A setup() 0 13 1
1
"""
2
This is generic SQS Sensor using boto3 api to fetch messages from sqs queue.
3
After receiving a messge it's content is passed as payload to a trigger 'aws.sqs_new_message'
4
5
This sensor can be configured either by using config.yaml withing a pack or by creating
6
following values in datastore:
7
    - aws.input_queue
8
    - aws.aws_access_key_id
9
    - aws.aws_secret_access_key
10
    - aws.region
11
"""
12
13
import time
14
15
import boto3
16
from boto3.session import Session
17
from botocore.exceptions import ClientError
18
19
import json
20
21
from st2reactor.sensor.base import PollingSensor
22
23
class AWSSQSSensor(PollingSensor):
24
    def __init__(self, sensor_service, config=None, poll_interval=5):
25
        super(AWSSQSSensor, self).__init__(sensor_service=sensor_service, config=config,
26
                                           poll_interval=poll_interval)
27
28
    def setup(self):
29
        self.input_queue = self._GetConfigEntry(key='input_queue', prefix='sqs_sensor')
30
        self.aws_access_key = self._GetConfigEntry('aws_access_key_id')
31
        self.aws_secret_key = self._GetConfigEntry('aws_secret_access_key')
32
        self.aws_region = self._GetConfigEntry('region')
33
34
        self._logger = self._sensor_service.get_logger(name=self.__class__.__name__)
35
36
        self.session = None
37
        self.sqs_res = None
38
39
        self._SetupSqs()
40
        self.queue = self._GetQueueByName(self.input_queue)
41
42
    def poll(self):
43
        msg = self._receive_messages(queue=self.queue)
44
        if msg:
45
            payload = {"queue": self.input_queue, "body": msg[0].body}
46
            self._sensor_service.dispatch(trigger="aws.sqs_new_message", payload=payload)
47
            msg[0].delete()
48
49
    def cleanup(self):
50
        pass
51
52
    def add_trigger(self, trigger):
53
        # This method is called when trigger is created
54
        pass
55
56
    def update_trigger(self, trigger):
57
        # This method is called when trigger is updated
58
        pass
59
60
    def remove_trigger(self, trigger):
61
        pass
62
63
    def _GetConfigEntry(self, key, prefix='setup'):
64
        ''' Get configuration values either from Datastore or config file. '''
65
        config = self._config.get(prefix, None)
66
67
        value = self._sensor_service.get_value('aws.%s' % (key), local=False)
68
        if not value:
69
            value = config.get(key, None)
70
71
        return value
72
73
    def _SetupSqs(self):
74
        ''' Setup Boto3 structures '''
75
        self._logger.debug('Setting up SQS resources')
76
        self.session = Session(aws_access_key_id=self.aws_access_key,
77
                               aws_secret_access_key=self.aws_secret_key,
78
                               region_name=self.aws_region)
79
80
        self.sqs_res = self.session.resource('sqs')
81
82
    def _GetQueueByName(self, queueName):
83
        ''' Fetch QUEUE by it's name create new one if queue doesn't exist '''
84
        try:
85
            queue = self.sqs_res.get_queue_by_name(QueueName=queueName)
86
        except ClientError as e:
87
            self._logger.warning("SQS Queue: %s doesn't exist, creating it.", queueName)
88
            if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue':
89
                queue = self.sqs_res.create_queue(QueueName=queueName)
90
            else:
91
                raise
92
93
        return queue
94
95
    def _receive_messages(self, queue, wait_time=2, num_messages=1):
96
        ''' Receive a message from queue and return it. '''
97
        msg = queue.receive_messages(WaitTimeSeconds=wait_time, MaxNumberOfMessages=num_messages)
98
99
        return msg
100