Completed
Push — master ( 156d2e...49e913 )
by Gaetano
06:34
created

Driver   A

Complexity

Total Complexity 12

Size/Duplication

Total Lines 124
Duplicated Lines 0 %

Coupling/Cohesion

Components 2
Dependencies 3

Test Coverage

Coverage 86.11%

Importance

Changes 0
Metric Value
wmc 12
lcom 2
cbo 3
dl 0
loc 124
ccs 31
cts 36
cp 0.8611
rs 10
c 0
b 0
f 0

10 Methods

Rating   Name   Duplication   Size   Complexity  
A getProducer() 0 4 1
A getConsumer() 0 4 1
A acceptMessage() 0 4 1
A decodeMessage() 0 4 1
A getQueueManager() 0 6 1
A setDebug() 0 6 1
A registerConnection() 0 4 1
A getConnectionConfig() 0 8 2
A createProducer() 0 8 1
A createConsumer() 0 11 2
1
<?php
2
3
namespace Kaliop\Queueing\Plugins\SQSBundle\Adapter\SQS;
4
5
use Kaliop\QueueingBundle\Adapter\DriverInterface;
6
use Kaliop\QueueingBundle\Queue\MessageConsumerInterface;
7
use Kaliop\QueueingBundle\Queue\QueueManagerAwareTrait;
8
use Symfony\Component\DependencyInjection\ContainerAwareTrait;
9
use Symfony\Component\DependencyInjection\ContainerAwareInterface;
10
11
/**
12
 * @todo inject Debug flag in both consumers and producers
13
 */
14
class Driver implements DriverInterface, ContainerAwareInterface
15
{
16
    use ContainerAwareTrait;
17
    use QueueManagerAwareTrait;
18
19
    protected $debug;
20
    protected $connections;
21
22
    /**
23
     * @param string $queueName
24
     * @return \Kaliop\QueueingBundle\Queue\ProducerInterface
25
     */
26 9
    public function getProducer($queueName)
27
    {
28 9
        return $this->container->get("kaliop_queueing.sqs.{$queueName}_producer")->setDebug($this->debug);
29
    }
30
31
    /**
32
     * This method is more flexible than what is declared in the interface, as it allows direct injection of a callback
33
     * by the caller instead of relying solely on service configuration.
34
     * It helps when queues are created dynamically.
35
     *
36
     * @param string $queueName
37
     * @param MessageConsumerInterface|null $callback when null, the appropriate MessageConsumer for the queue is looked
38
     *                                                up in service configuration
39
     * @return object
40
     */
41 4
    public function getConsumer($queueName, MessageConsumerInterface $callback = null)
42
    {
43 4
        return $this->container->get("kaliop_queueing.sqs.{$queueName}_consumer")->setDebug($this->debug)->setQueueName($queueName);
44
    }
45
46 4
    public function acceptMessage($message)
47
    {
48 4
        return $message instanceof \Kaliop\Queueing\Plugins\SQSBundle\Adapter\SQS\Message;
49
    }
50
51
    /**
52
     * Unlike the RabbitMQ driver, we do not have to deal with a native message type from the underlying library.
53
     * So we just let the Producer create messages of the good type, and decoding them becomes a no-op
54
     *
55
     * @param \Kaliop\Queueing\Plugins\SQSBundle\Adapter\SQS\Message $message
56
     * @return \Kaliop\Queueing\Plugins\SQSBundle\Adapter\SQS\Message
57
     */
58 4
    public function decodeMessage($message)
59
    {
60 4
        return $message;
61
    }
62
63
    /**
64
     * @param string $queueName
65
     * @return \Kaliop\QueueingBundle\Queue\QueueManagerInterface
66
     */
67 9
    public function getQueueManager($queueName)
68
    {
69 9
        $mgr = $this->getQueueManagerInternal();
70 9
        $mgr->setQueueName($queueName);
71 9
        return $mgr;
72
    }
73
74
    public function setDebug($debug)
75
    {
76
        $this->debug = $debug;
77
78
        return $this;
79
    }
80
81
    /**
82
     * @param string $connectionId
83
     * @param array $params
84
     */
85 10
    public function registerConnection($connectionId, array $params)
86
    {
87 10
        $this->connections[$connectionId] = $params;
88 10
    }
89
90 9
    protected function getConnectionConfig($connectionId)
91
    {
92 9
        if (!isset($this->connections[$connectionId])) {
93
            throw new \RuntimeException("Connection '$connectionId' is not registered with SQS driver");
94
        }
95
96 9
        return $this->connections[$connectionId];
97
    }
98
99
    /**
100
     * Dynamically creates a producer, with no need for configuration except for the connection configuration
101
     *
102
     * @param string $queueName
103
     * @param string $queueUrl
104
     * @param string $connectionId
105
     * @return mixed
106
     */
107 9
    public function createProducer($queueName, $queueUrl, $connectionId)
108
    {
109 9
        $class = $this->container->getParameter('kaliop_queueing.sqs.producer.class');
110 9
        $producer = new $class($this->getConnectionConfig($connectionId));
111 9
        $producer->setQueueUrl($queueUrl);
112 9
        $this->container->set("kaliop_queueing.sqs.{$queueName}_producer", $producer);
113 9
        return $producer;
114
    }
115
116
    /**
117
     * Dynamically creates a consumer, with no need for configuration except for the connection configuration
118
     *
119
     * @param string $queueName
120
     * @param string $queueUrl
121
     * @param string $connectionId Id of a connection as defined in configuration
122
     * @param MessageConsumerInterface $callback
123
     * @param string $routingKey
124
     * @return Consumer
125
     */
126 9
    public function createConsumer($queueName, $queueUrl, $connectionId, $callback=null, $routingKey=null)
127
    {
128 9
        $class = $this->container->getParameter('kaliop_queueing.sqs.consumer.class');
129 9
        $consumer = new $class($this->getConnectionConfig($connectionId));
130 9
        $consumer->setQueueUrl($queueUrl)->setRoutingKey($routingKey)->setQueueName($queueName);
131 9
        if ($callback != null) {
132
            $consumer->setCallBack($callback);
133
        }
134 9
        $this->container->set("kaliop_queueing.sqs.{$queueName}_consumer", $consumer);
135 9
        return $consumer;
136
    }
137
}
138