Completed
Push — master ( 4c6e24...29c278 )
by Gaetano
06:28
created

Driver::getProducer()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Kaliop\Queueing\Plugins\SQSBundle\Adapter\SQS;
4
5
use Kaliop\QueueingBundle\Adapter\DriverInterface;
6
use Kaliop\QueueingBundle\Queue\MessageConsumerInterface;
7
use Symfony\Component\DependencyInjection\ContainerAwareTrait;
8
use Symfony\Component\DependencyInjection\ContainerAwareInterface;
9
use Symfony\Component\DependencyInjection\ContainerInterface;
10
11
/**
12
 * @todo inject Debug flag in both consumers and producers
13
 */
14
class Driver implements DriverInterface, ContainerAwareInterface
15
{
16
    use ContainerAwareTrait;
17
18
    protected $debug;
19
    protected $connections;
20
21
    /**
22
     * @param string $queueName
23
     * @return \Kaliop\QueueingBundle\Queue\ProducerInterface
24
     */
25 9
    public function getProducer($queueName)
26
    {
27 9
        return $this->container->get("kaliop_queueing.sqs.{$queueName}_producer")->setDebug($this->debug);
28
    }
29
30
    /**
31
     * This method is more flexible than what is declared in the interface, as it allows direct injection of a callback
32
     * by the caller instead of relying solely on service configuration.
33
     * It helps when queues are created dynamically.
34
     *
35
     * @param string $queueName
36
     * @param MessageConsumerInterface|null $callback when null, the appropriate MessageConsumer for the queue is looked
37
     *                                                up in service configuration
38
     * @return object
39
     */
40 4
    public function getConsumer($queueName, MessageConsumerInterface $callback = null)
41
    {
42 4
        return $this->container->get("kaliop_queueing.sqs.{$queueName}_consumer")->setDebug($this->debug)->setQueueName($queueName);
43
    }
44
45 4
    public function acceptMessage($message)
46
    {
47 4
        return $message instanceof \Kaliop\Queueing\Plugins\SQSBundle\Adapter\SQS\Message;
48
    }
49
50
    /**
51
     * Unlike the RabbitMQ driver, we do not have to deal with a native message type from the underlying library.
52
     * So we just let the Producer create messages of the good type, and decoding them becomes a no-op
53
     *
54
     * @param \Kaliop\Queueing\Plugins\SQSBundle\Adapter\SQS\Message $message
55
     * @return \Kaliop\Queueing\Plugins\SQSBundle\Adapter\SQS\Message
56
     */
57 4
    public function decodeMessage($message)
58
    {
59 4
        return $message;
60
    }
61
62
    /**
63
     * @param string $queueName
64
     * @return \Kaliop\QueueingBundle\Queue\QueueManagerInterface
65
     */
66 9
    public function getQueueManager($queueName)
67
    {
68 9
        $mgr = $this->container->get('kaliop_queueing.sqs.queue_manager');
69 9
        $mgr->setQueueName($queueName);
70 9
        return $mgr;
71
    }
72
73
    public function setDebug($debug)
74
    {
75
        $this->debug = $debug;
76
77
        return $this;
78
    }
79
80
    /**
81
     * @param string $connectionId
82
     * @param array $params
83
     */
84 10
    public function registerConnection($connectionId, array $params)
85
    {
86 10
        $this->connections[$connectionId] = $params;
87 10
    }
88
89 9
    protected function getConnectionConfig($connectionId)
90
    {
91 9
        if (!isset($this->connections[$connectionId])) {
92
            throw new \RuntimeException("Connection '$connectionId' is not registered with SQS driver");
93
        }
94
95 9
        return $this->connections[$connectionId];
96
    }
97
98
    /**
99
     * Dynamically creates a producer, with no need for configuration except for the connection configuration
100
     *
101
     * @param string $queueName
102
     * @param string $queueUrl
103
     * @param string $connectionId
104
     * @return mixed
105
     */
106 9
    public function createProducer($queueName, $queueUrl, $connectionId)
107
    {
108 9
        $class = $this->container->getParameter('kaliop_queueing.sqs.producer.class');
109 9
        $producer = new $class($this->getConnectionConfig($connectionId));
110 9
        $producer->setQueueUrl($queueUrl);
111 9
        $this->container->set("kaliop_queueing.sqs.{$queueName}_producer", $producer);
112 9
        return $producer;
113
    }
114
115
    /**
116
     * Dynamically creates a consumer, with no need for configuration except for the connection configuration
117
     *
118
     * @param string $queueName
119
     * @param string $queueUrl
120
     * @param string $connectionId Id of a connection as defined in configuration
121
     * @param MessageConsumerInterface $callback
122
     * @param string $routingKey
123
     * @return Consumer
124
     */
125 9
    public function createConsumer($queueName, $queueUrl, $connectionId, $callback=null, $routingKey=null)
126
    {
127 9
        $class = $this->container->getParameter('kaliop_queueing.sqs.consumer.class');
128 9
        $consumer = new $class($this->getConnectionConfig($connectionId));
129 9
        $consumer->setQueueUrl($queueUrl)->setRoutingKey($routingKey)->setQueueName($queueName);
130 9
        if ($callback != null) {
131
            $consumer->setCallBack($callback);
132
        }
133 9
        $this->container->set("kaliop_queueing.sqs.{$queueName}_consumer", $consumer);
134 9
        return $consumer;
135
    }
136
}
137