Completed
Push — master ( 29c278...727efb )
by Gaetano
04:01
created

Driver::getConsumer()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 0
cts 4
cp 0
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 2
crap 2
1
<?php
2
3
namespace Kaliop\Queueing\Plugins\SQSBundle\Adapter\SQS;
4
5
use Kaliop\QueueingBundle\Adapter\DriverInterface;
6
use Kaliop\QueueingBundle\Queue\MessageConsumerInterface;
7
use Symfony\Component\DependencyInjection\ContainerAwareTrait;
8
use Symfony\Component\DependencyInjection\ContainerAwareInterface;
9
use Symfony\Component\DependencyInjection\ContainerInterface;
10
11
/**
12
 * @todo inject Debug flag in both consumers and producers
13
 */
14
class Driver implements DriverInterface, ContainerAwareInterface
15
{
16
    use ContainerAwareTrait;
17
18
    protected $debug;
19
    protected $connections;
20
21
    /**
22
     * @param string $queueName
23
     * @return \Kaliop\QueueingBundle\Queue\ProducerInterface
24
     */
25
    public function getProducer($queueName)
26
    {
27
        return $this->container->get("kaliop_queueing.sqs.{$queueName}_producer")->setDebug($this->debug);
28
    }
29
30
    /**
31
     * This method is more flexible than what is declared in the interface, as it allows direct injection of a callback
32
     * by the caller instead of relying solely on service configuration.
33
     * It helps when queues are created dynamically.
34
     *
35
     * @param string $queueName
36
     * @param MessageConsumerInterface|null $callback when null, the appropriate MessageConsumer for the queue is looked
37
     *                                                up in service configuration
38
     * @return object
39
     */
40
    public function getConsumer($queueName, MessageConsumerInterface $callback = null)
41
    {
42
        return $this->container->get("kaliop_queueing.sqs.{$queueName}_consumer")->setDebug($this->debug)->setQueueName($queueName);
43
    }
44
45
    public function acceptMessage($message)
46
    {
47
        return $message instanceof \Kaliop\Queueing\Plugins\SQSBundle\Adapter\SQS\Message;
48
    }
49
50
    /**
51
     * Unlike the RabbitMQ driver, we do not have to deal with a native message type from the underlying library.
52
     * So we just let the Producer create messages of the good type, and decoding them becomes a no-op
53
     *
54
     * @param \Kaliop\Queueing\Plugins\SQSBundle\Adapter\SQS\Message $message
55
     * @return \Kaliop\Queueing\Plugins\SQSBundle\Adapter\SQS\Message
56
     */
57
    public function decodeMessage($message)
58
    {
59
        return $message;
60
    }
61
62
    /**
63
     * @param string $queueName
64
     * @return \Kaliop\QueueingBundle\Queue\QueueManagerInterface
65
     */
66
    public function getQueueManager($queueName)
67
    {
68
        $mgr = $this->container->get('kaliop_queueing.sqs.queue_manager');
69
        $mgr->setQueueName($queueName);
70
        return $mgr;
71
    }
72
73
    public function setDebug($debug)
74
    {
75
        $this->debug = $debug;
76
77
        return $this;
78
    }
79
80
    /**
81
     * @param string $connectionId
82
     * @param array $params
83
     */
84
    public function registerConnection($connectionId, array $params)
85
    {
86
        $this->connections[$connectionId] = $params;
87
    }
88
89
    protected function getConnectionConfig($connectionId)
90
    {
91
        if (!isset($this->connections[$connectionId])) {
92
            throw new \RuntimeException("Connection '$connectionId' is not registered with SQS driver");
93
        }
94
95
        return $this->connections[$connectionId];
96
    }
97
98
    /**
99
     * Dynamically creates a producer, with no need for configuration except for the connection configuration
100
     *
101
     * @param string $queueName
102
     * @param string $queueUrl
103
     * @param string $connectionId
104
     * @return mixed
105
     */
106
    public function createProducer($queueName, $queueUrl, $connectionId)
107
    {
108
        $class = $this->container->getParameter('kaliop_queueing.sqs.producer.class');
109
        $producer = new $class($this->getConnectionConfig($connectionId));
110
        $producer->setQueueUrl($queueUrl);
111
        $this->container->set("kaliop_queueing.sqs.{$queueName}_producer", $producer);
112
        return $producer;
113
    }
114
115
    /**
116
     * Dynamically creates a consumer, with no need for configuration except for the connection configuration
117
     *
118
     * @param string $queueName
119
     * @param string $queueUrl
120
     * @param string $connectionId Id of a connection as defined in configuration
121
     * @param MessageConsumerInterface $callback
122
     * @param string $routingKey
123
     * @return Consumer
124
     */
125
    public function createConsumer($queueName, $queueUrl, $connectionId, $callback=null, $routingKey=null)
126
    {
127
        $class = $this->container->getParameter('kaliop_queueing.sqs.consumer.class');
128
        $consumer = new $class($this->getConnectionConfig($connectionId));
129
        $consumer->setQueueUrl($queueUrl)->setRoutingKey($routingKey)->setQueueName($queueName);
130
        if ($callback != null) {
131
            $consumer->setCallBack($callback);
132
        }
133
        $this->container->set("kaliop_queueing.sqs.{$queueName}_consumer", $consumer);
134
        return $consumer;
135
    }
136
}
137