QueueManager   A
last analyzed

Complexity

Total Complexity 11

Size/Duplication

Total Lines 66
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 0

Test Coverage

Coverage 11.11%

Importance

Changes 0
Metric Value
wmc 11
lcom 1
cbo 0
dl 0
loc 66
ccs 3
cts 27
cp 0.1111
rs 10
c 0
b 0
f 0

8 Methods

Rating   Name   Duplication   Size   Complexity  
A setContainer() 0 4 1
A setQueueName() 0 6 1
A listActions() 0 4 1
A deleteQueue() 0 5 1
A queueInfo() 0 5 1
A listQueues() 0 5 1
A getProducerService() 0 4 1
A executeAction() 0 16 4
1
<?php
2
3
namespace Kaliop\Queueing\Plugins\KinesisBundle\Adapter\Kinesis;
4
5
use Kaliop\QueueingBundle\Service\MessageProducer as BaseMessageProducer;
6
use Symfony\Component\DependencyInjection\ContainerAwareInterface;
7
use Symfony\Component\DependencyInjection\Exception\ServiceNotFoundException;
8
use Symfony\Component\DependencyInjection\ContainerInterface;
9
use Symfony\Component\DependencyInjection\ContainerBuilder;
10
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
11
use Symfony\Component\Config\FileLocator;
12
use InvalidArgumentException;
13
use Kaliop\QueueingBundle\Queue\Queue;
14
use Kaliop\QueueingBundle\Queue\QueueManagerInterface;
15
16
/**
17
 * A class dedicated to sending control commands
18
 *
19
 * @todo add support for stream creation (needs a nr. of shards)
20
 */
21
class QueueManager implements ContainerAwareInterface, QueueManagerInterface
22
{
23
    protected $streamName;
24
    protected $container;
25
26 1
    public function setContainer(ContainerInterface $container = null)
27
    {
28 1
        $this->container = $container;
29 1
    }
30
31
    /**
32
     * @param string $queue
33
     * @return QueueManager
34
     */
35
    public function setQueueName($queue)
36
    {
37
        $this->streamName = $queue;
38
39
        return $this;
40
    }
41
42
    public function listActions()
43
    {
44
        return array('info', 'list', 'delete');
45
    }
46
47
    public function executeAction($action, array $arguments=array())
48
    {
49
        switch ($action) {
50
            case 'delete':
51
                return $this->deleteQueue();
52
53
            case 'info':
54
                return $this->queueInfo();
55
56
            case 'list':
57
                return $this->listQueues();
58
59
            default:
60
                throw new InvalidArgumentException("Action $action not supported");
61
        }
62
    }
63
64
    protected function deleteQueue()
65
    {
66
        $result = $this->getProducerService()->call('DeleteStream', array('StreamName' => $this->streamName));
67
        return $result['@metadata'];
68
    }
69
70
    protected function queueInfo()
71
    {
72
        $result = $this->getProducerService()->call('DescribeStream', array('StreamName' => $this->streamName));
73
        return $result->get('StreamDescription');
74
    }
75
76
    protected function listQueues()
77
    {
78
        $result = $this->getProducerService()->call('ListStreams');
79
        return $result->get('StreamNames');
80
    }
81
82
    protected function getProducerService()
83
    {
84
        return $this->container->get('kaliop_queueing.kinesis.producer');
85
    }
86
}