Driver::getConsumer()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 2
crap 1
1
<?php
2
3
namespace Kaliop\Queueing\Plugins\SQSBundle\Adapter\SQS;
4
5
use Kaliop\QueueingBundle\Adapter\DriverInterface;
6
use Kaliop\QueueingBundle\Queue\MessageConsumerInterface;
7
use Kaliop\QueueingBundle\Queue\QueueManagerAwareTrait;
8
use Symfony\Component\DependencyInjection\ContainerAwareTrait;
9
use Symfony\Component\DependencyInjection\ContainerAwareInterface;
10
11
/**
12
 * @todo inject Debug flag in both consumers and producers
13
 */
14
class Driver implements DriverInterface, ContainerAwareInterface
15
{
16
    use ContainerAwareTrait;
17
    use QueueManagerAwareTrait;
18
19
    protected $debug;
20
    protected $connections;
21
22
    /**
23
     * @param string $queueName
24
     * @return \Kaliop\QueueingBundle\Queue\ProducerInterface
25
     */
26 10
    public function getProducer($queueName)
27
    {
28 10
        return $this->container->get("kaliop_queueing.sqs.{$queueName}_producer")->setDebug($this->debug);
29
    }
30
31
    /**
32
     * This method is more flexible than what is declared in the interface, as it allows direct injection of a callback
33
     * by the caller instead of relying solely on service configuration.
34
     * It helps when queues are created dynamically.
35
     *
36
     * @param string $queueName
37
     * @param MessageConsumerInterface|null $callback when null, the appropriate MessageConsumer for the queue is looked
38
     *                                                up in service configuration
39
     * @return object
40
     */
41 6
    public function getConsumer($queueName, MessageConsumerInterface $callback = null)
42
    {
43 6
        return $this->container->get("kaliop_queueing.sqs.{$queueName}_consumer")->setDebug($this->debug)->setQueueName($queueName);
44
    }
45
46 6
    public function acceptMessage($message)
47
    {
48 6
        return $message instanceof \Kaliop\Queueing\Plugins\SQSBundle\Adapter\SQS\Message;
49
    }
50
51
    /**
52
     * Unlike the RabbitMQ driver, we do not have to deal with a native message type from the underlying library.
53
     * So we just let the Producer create messages of the good type, and decoding them becomes a no-op
54
     *
55
     * @param \Kaliop\Queueing\Plugins\SQSBundle\Adapter\SQS\Message $message
56
     * @return \Kaliop\Queueing\Plugins\SQSBundle\Adapter\SQS\Message
57
     */
58 6
    public function decodeMessage($message)
59
    {
60 6
        return $message;
61
    }
62
63
    /**
64
     * @param string $queueName
65
     * @return \Kaliop\QueueingBundle\Queue\QueueManagerInterface
66
     */
67 10
    public function getQueueManager($queueName)
68
    {
69 10
        $mgr = $this->getQueueManagerInternal();
70 10
        $mgr->setQueueName($queueName);
71 10
        return $mgr;
72
    }
73
74
    public function setDebug($debug)
75
    {
76
        $this->debug = $debug;
77
78
        return $this;
79
    }
80
81
    /**
82
     * @param string $connectionId
83
     * @param array $params
84
     */
85 11
    public function registerConnection($connectionId, array $params)
86
    {
87 11
        $this->connections[$connectionId] = $params;
88 11
    }
89
90 10
    protected function getConnectionConfig($connectionId)
91
    {
92 10
        if (!isset($this->connections[$connectionId])) {
93
            throw new \RuntimeException("Connection '$connectionId' is not registered with SQS driver");
94
        }
95
96 10
        return $this->connections[$connectionId];
97
    }
98
99
    /**
100
     * Dynamically creates a producer, with no need for configuration except for the connection configuration
101
     *
102
     * @param string $queueName
103
     * @param string $queueUrl
104
     * @param string $connectionId
105
     * @return mixed
106
     */
107 10
    public function createProducer($queueName, $queueUrl, $connectionId)
108
    {
109 10
        $class = $this->container->getParameter('kaliop_queueing.sqs.producer.class');
110 10
        $producer = new $class($this->getConnectionConfig($connectionId));
111 10
        $producer->setQueueUrl($queueUrl);
112 10
        $this->container->set("kaliop_queueing.sqs.{$queueName}_producer", $producer);
113 10
        return $producer;
114
    }
115
116
    /**
117
     * Dynamically creates a consumer, with no need for configuration except for the connection configuration
118
     *
119
     * @param string $queueName
120
     * @param string $queueUrl
121
     * @param string $connectionId Id of a connection as defined in configuration
122
     * @param MessageConsumerInterface $callback
123
     * @param string $routingKey
124
     * @return Consumer
125
     */
126 10
    public function createConsumer($queueName, $queueUrl, $connectionId, $callback=null, $routingKey=null)
127
    {
128 10
        $class = $this->container->getParameter('kaliop_queueing.sqs.consumer.class');
129 10
        $consumer = new $class($this->getConnectionConfig($connectionId));
130 10
        $consumer->setQueueUrl($queueUrl)->setRoutingKey($routingKey)->setQueueName($queueName);
131 10
        if ($callback != null) {
132
            $consumer->setCallBack($callback);
133
        }
134 10
        $this->container->set("kaliop_queueing.sqs.{$queueName}_consumer", $consumer);
135 10
        return $consumer;
136
    }
137
}
138