Completed
Pull Request — master (#366)
by
unknown
04:31 queued 16s
created

AWSSQSSensor.cleanup()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 2
rs 10
cc 1
1
"""
2
This is generic SQS Sensor using boto3 api to fetch messages from sqs queue.
3
After receiving a messge it's content is passed as payload to a trigger 'aws.sqs_new_message'
4
5
This sensor can be configured either by using config.yaml withing a pack or by creating
6
following values in datastore:
7
    - aws.input_queue
8
    - aws.aws_access_key_id
9
    - aws.aws_secret_access_key
10
    - aws.region
11
"""
12
13
from boto3.session import Session
14
from botocore.exceptions import ClientError
15
16
from st2reactor.sensor.base import PollingSensor
17
18
19
class AWSSQSSensor(PollingSensor):
20
    def __init__(self, sensor_service, config=None, poll_interval=5):
21
        super(AWSSQSSensor, self).__init__(sensor_service=sensor_service, config=config,
22
                                           poll_interval=poll_interval)
23
24
    def setup(self):
25
        self.input_queue = self._GetConfigEntry(key='input_queue', prefix='sqs_sensor')
26
        self.aws_access_key = self._GetConfigEntry('aws_access_key_id')
27
        self.aws_secret_key = self._GetConfigEntry('aws_secret_access_key')
28
        self.aws_region = self._GetConfigEntry('region')
29
30
        self._logger = self._sensor_service.get_logger(name=self.__class__.__name__)
31
32
        self.session = None
33
        self.sqs_res = None
34
35
        self._SetupSqs()
36
        self.queue = self._GetQueueByName(self.input_queue)
37
38
    def poll(self):
39
        msg = self._receive_messages(queue=self.queue)
40
        if msg:
41
            payload = {"queue": self.input_queue, "body": msg[0].body}
42
            self._sensor_service.dispatch(trigger="aws.sqs_new_message", payload=payload)
43
            msg[0].delete()
44
45
    def cleanup(self):
46
        pass
47
48
    def add_trigger(self, trigger):
49
        # This method is called when trigger is created
50
        pass
51
52
    def update_trigger(self, trigger):
53
        # This method is called when trigger is updated
54
        pass
55
56
    def remove_trigger(self, trigger):
57
        pass
58
59
    def _GetConfigEntry(self, key, prefix='setup'):
60
        ''' Get configuration values either from Datastore or config file. '''
61
        config = self._config.get(prefix, None)
62
63
        value = self._sensor_service.get_value('aws.%s' % (key), local=False)
64
        if not value:
65
            value = config.get(key, None)
66
67
        return value
68
69
    def _SetupSqs(self):
70
        ''' Setup Boto3 structures '''
71
        self._logger.debug('Setting up SQS resources')
72
        self.session = Session(aws_access_key_id=self.aws_access_key,
73
                               aws_secret_access_key=self.aws_secret_key,
74
                               region_name=self.aws_region)
75
76
        self.sqs_res = self.session.resource('sqs')
77
78
    def _GetQueueByName(self, queueName):
79
        ''' Fetch QUEUE by it's name create new one if queue doesn't exist '''
80
        try:
81
            queue = self.sqs_res.get_queue_by_name(QueueName=queueName)
82
        except ClientError as e:
83
            self._logger.warning("SQS Queue: %s doesn't exist, creating it.", queueName)
84
            if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue':
85
                queue = self.sqs_res.create_queue(QueueName=queueName)
86
            else:
87
                raise
88
89
        return queue
90
91
    def _receive_messages(self, queue, wait_time=2, num_messages=1):
92
        ''' Receive a message from queue and return it. '''
93
        msg = queue.receive_messages(WaitTimeSeconds=wait_time, MaxNumberOfMessages=num_messages)
94
95
        return msg
96