Completed
Push — master ( 4f2d5a...a59eef )
by Gaetano
08:35
created

QueueManager   A

Complexity

Total Complexity 20

Size/Duplication

Total Lines 137
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 0

Test Coverage

Coverage 78.18%

Importance

Changes 0
Metric Value
wmc 20
lcom 1
cbo 0
dl 0
loc 137
ccs 43
cts 55
cp 0.7818
rs 10
c 0
b 0
f 0

12 Methods

Rating   Name   Duplication   Size   Complexity  
A setContainer() 0 4 1
A setQueueName() 0 6 1
A listActions() 0 4 1
A listConfiguredQueues() 0 7 2
A queueInfo() 0 6 1
A purgeQueue() 0 6 1
A deleteQueue() 0 6 1
A getProducerService() 0 4 1
A registerQueue() 0 4 1
B executeAction() 0 25 7
A listAvailableQueues() 0 10 2
A createQueue() 0 11 1
1
<?php
2
3
namespace Kaliop\Queueing\Plugins\SQSBundle\Adapter\SQS;
4
5
use Kaliop\QueueingBundle\Service\MessageProducer as BaseMessageProducer;
6
use Symfony\Component\DependencyInjection\ContainerAwareInterface;
7
use Symfony\Component\DependencyInjection\ContainerInterface;
8
use InvalidArgumentException;
9
use Kaliop\QueueingBundle\Queue\Queue;
10
use Kaliop\QueueingBundle\Queue\QueueManagerInterface;
11
12
/**
13
 * A class dedicated to sending control commands
14
 *
15
 * @todo add support for listing configured queues
16
 *
17
 * @see http://docs.aws.amazon.com/aws-sdk-php/v3/api/class-Aws.Sqs.SqsClient.html
18
 */
19
class QueueManager implements ContainerAwareInterface, QueueManagerInterface
20
{
21
    protected $queueName;
22
    protected $container;
23
    protected $registeredQueues = array();
24
25 10
    public function setContainer(ContainerInterface $container = null)
26
    {
27 10
        $this->container = $container;
28 10
    }
29
30
    /**
31
     * @param string $queue
32
     * @return QueueManager
33
     */
34 9
    public function setQueueName($queue)
35
    {
36 9
        $this->queueName = $queue;
37
38 9
        return $this;
39
    }
40
41
    public function listActions()
42
    {
43
        return array('list-available', 'list-configured', 'create', 'info', 'purge', 'delete');
44
    }
45
46 9
    public function executeAction($action, array $arguments=array())
47
    {
48
        switch ($action) {
49 9
            case 'list-available':
50 1
                return $this->listAvailableQueues();
51
52 9
            case 'list-configured':
53
                return $this->listConfiguredQueues();
54
55 9
            case 'create':
56 9
                return $this->createQueue($arguments);
57
58 9
            case 'info':
59 1
                return $this->queueInfo();
60
61 9
            case 'purge':
62 1
                return $this->purgeQueue();
63
64 9
            case 'delete':
65 9
                return $this->deleteQueue();
66
67
            default:
68
                throw new InvalidArgumentException("Action $action not supported");
69
        }
70
    }
71
72
    /**
73
     * @return array keys are the queue names, values the queue type
74
     */
75 1
    protected function listAvailableQueues()
76
    {
77 1
        $result = $this->getProducerService()->call('listQueues');
78 1
        $result = $result->get('QueueUrls');
79
        // make this slightly easier to understand by callers
80 1
        if ($result === null) {
81
            $result = array();
82
        }
83 1
        return array_combine($result, array_fill(0, count($result), Queue::TYPE_ANY));
84
    }
85
86
    protected function listConfiguredQueues()
87
    {
88
        if (count($this->registeredQueues) == 0) {
89
            return array();
90
        }
91
        return array_combine($this->registeredQueues, array_fill(0, count($this->registeredQueues), Queue::TYPE_ANY));
92
    }
93
94
    /**
95
     * NB: works only after the relevant config has been set in yml, which makes it less than ideal...
96
     *     See how the SQSTest does to dynamically create both the queue on the broker and the porducer + consumer
97
     * @param $args allowed elements: see http://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#createqueue
98
     * @return the queue Url
99
     * @throw \Exception on failure
100
     */
101 9
    protected function createQueue($args)
102
    {
103 9
        $result = $this->getProducerService()->call(
104 9
            'CreateQueue',
105
            array(
106 9
                'QueueName' => $this->queueName,
107 9
                'Attributes' => $args
108
            )
109
        );
110 9
        return $result->get('QueueUrl');
111
    }
112
113
    /**
114
     * @return array
115
     * @throw \Exception on failure
116
     */
117 1
    protected function queueInfo()
118
    {
119 1
        $producer = $this->getProducerService();
120 1
        $result = $producer->call('getQueueAttributes', array('QueueUrl' => $producer->getQueueUrl(), 'AttributeNames' => array('All')));
121 1
        return $result->get('Attributes');
122
    }
123
124
    /**
125
     * @return array
126
     * @throw \Exception on failure
127
     */
128 1
    protected function purgeQueue()
129
    {
130 1
        $producer = $this->getProducerService();
131 1
        $result = $producer->call('PurgeQueue', array('QueueUrl' => $producer->getQueueUrl()));
132 1
        return $result['@metadata'];
133
    }
134
135
    /**
136
     * @return array
137
     * @throw \Exception on failure
138
     */
139 9
    protected function deleteQueue()
140
    {
141 9
        $producer = $this->getProducerService();
142 9
        $result = $producer->call('DeleteQueue', array('QueueUrl' => $producer->getQueueUrl()));
143 9
        return $result['@metadata'];
144
    }
145
146 9
    protected function getProducerService()
147
    {
148 9
        return $this->container->get('kaliop_queueing.sqs.'. $this->queueName . '_producer');
149
    }
150
151
    public function registerQueue($queueName)
152
    {
153
        $this->registeredQueues[]=$queueName;
154
    }
155
}
156