Completed
Pull Request — master (#366)
by
unknown
07:07
created

AWSSQSSensor   A

Complexity

Total Complexity 15

Size/Duplication

Total Lines 77
Duplicated Lines 0 %
Metric Value
wmc 15
dl 0
loc 77
rs 10

11 Methods

Rating   Name   Duplication   Size   Complexity  
A _setup_sqs() 0 8 1
A remove_trigger() 0 2 1
A _get_config_entry() 0 9 2
A poll() 0 6 2
A update_trigger() 0 3 1
A cleanup() 0 2 1
A add_trigger() 0 3 1
A _receive_messages() 0 5 1
A _get_queue_by_name() 0 12 3
A __init__() 0 3 1
A setup() 0 13 1
1
"""
2
This is generic SQS Sensor using boto3 api to fetch messages from sqs queue.
3
After receiving a message it's content is passed as payload to a trigger 'aws.sqs_new_message'
4
5
This sensor can be configured either by using config.yaml withing a pack or by creating
6
following values in datastore:
7
    - aws.input_queue
8
    - aws.aws_access_key_id
9
    - aws.aws_secret_access_key
10
    - aws.region
11
12
For configuration in config.yaml with config like this
13
    setup:
14
      aws_access_key_id:
15
      aws_access_key_id:
16
      region:
17
    sqs_sensor:
18
      input_queue:
19
20
If any value exist in datastore it will be taken instead of any value in config.yaml
21
"""
22
23
from boto3.session import Session
24
from botocore.exceptions import ClientError
25
26
from st2reactor.sensor.base import PollingSensor
27
28
29
class AWSSQSSensor(PollingSensor):
30
    def __init__(self, sensor_service, config=None, poll_interval=5):
31
        super(AWSSQSSensor, self).__init__(sensor_service=sensor_service, config=config,
32
                                           poll_interval=poll_interval)
33
34
    def setup(self):
35
        self.input_queue = self._get_config_entry(key='keys ', prefix='sqs_sensor')
36
        self.aws_access_key = self._get_config_entry('aws_access_key_id')
37
        self.aws_secret_key = self._get_config_entry('aws_access_key_id')
38
        self.aws_region = self._get_config_entry('region')
39
40
        self._logger = self._sensor_service.get_logger(name=self.__class__.__name__)
41
42
        self.session = None
43
        self.sqs_res = None
44
45
        self._setup_sqs()
46
        self.queue = self._get_queue_by_name(self.input_queue)
47
48
    def poll(self):
49
        msg = self._receive_messages(queue=self.queue)
50
        if msg:
51
            payload = {"queue": self.input_queue, "body": msg[0].body}
52
            self._sensor_service.dispatch(trigger="aws.sqs_new_message", payload=payload)
53
            msg[0].delete()
54
55
    def cleanup(self):
56
        pass
57
58
    def add_trigger(self, trigger):
59
        # This method is called when trigger is created
60
        pass
61
62
    def update_trigger(self, trigger):
63
        # This method is called when trigger is updated
64
        pass
65
66
    def remove_trigger(self, trigger):
67
        pass
68
69
    def _get_config_entry(self, key, prefix='setup'):
70
        ''' Get configuration values either from Datastore or config file. '''
71
        config = self._config.get(prefix, None)
72
73
        value = self._sensor_service.get_value('aws.%s' % (key), local=False)
74
        if not value:
75
            value = config.get(key, None)
76
77
        return value
78
79
    def _setup_sqs(self):
80
        ''' Setup Boto3 structures '''
81
        self._logger.debug('Setting up SQS resources')
82
        self.session = Session(aws_access_key_id=self.aws_access_key,
83
                               aws_secret_access_key=self.aws_secret_key,
84
                               region_name=self.aws_region)
85
86
        self.sqs_res = self.session.resource('sqs')
87
88
    def _get_queue_by_name(self, queueName):
89
        ''' Fetch QUEUE by it's name create new one if queue doesn't exist '''
90
        try:
91
            queue = self.sqs_res.get_queue_by_name(QueueName=queueName)
92
        except ClientError as e:
93
            self._logger.warning("SQS Queue: %s doesn't exist, creating it.", queueName)
94
            if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue':
95
                queue = self.sqs_res.create_queue(QueueName=queueName)
96
            else:
97
                raise
98
99
        return queue
100
101
    def _receive_messages(self, queue, wait_time=2, num_messages=1):
102
        ''' Receive a message from queue and return it. '''
103
        msg = queue.receive_messages(WaitTimeSeconds=wait_time, MaxNumberOfMessages=num_messages)
104
105
        return msg
106