Driver::getConnectionConfig()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 8
rs 10
c 0
b 0
f 0
cc 2
nc 2
nop 1
1
<?php
2
3
namespace Kaliop\Queueing\Plugins\StompBundle\Adapter\Stomp;
4
5
use Kaliop\QueueingBundle\Adapter\DriverInterface;
6
use Kaliop\QueueingBundle\Queue\MessageConsumerInterface;
7
use Kaliop\QueueingBundle\Queue\QueueManagerAwareTrait;
8
use Symfony\Component\DependencyInjection\ContainerAwareTrait;
9
use Symfony\Component\DependencyInjection\ContainerAwareInterface;
10
use Symfony\Component\DependencyInjection\ContainerInterface;
11
12
/**
13
 * @todo inject Debug flag in both consumers and producers
14
 */
15
class Driver implements DriverInterface, ContainerAwareInterface
16
{
17
    use ContainerAwareTrait;
18
    use QueueManagerAwareTrait;
19
20
    protected $debug;
21
    protected $connections;
22
23
    /**
24
     * @param string $queueName
25
     * @return \Kaliop\QueueingBundle\Queue\ProducerInterface
26
     */
27
    public function getProducer($queueName)
28
    {
29
        return $this->container->get("kaliop_queueing.stomp.{$queueName}_producer")->setDebug($this->debug);
30
    }
31
32
    /**
33
     * @param string $queueName
34
     * @return object
35
     */
36
    public function getConsumer($queueName)
37
    {
38
        return $this->container->get("kaliop_queueing.stomp.{$queueName}_consumer")->setDebug($this->debug)->setQueueName($queueName);
39
    }
40
41
    public function acceptMessage($message)
42
    {
43
        return $message instanceof \Kaliop\Queueing\Plugins\StompBundle\Adapter\Stomp\Message;
44
    }
45
46
    /**
47
     * Unlike the RabbitMQ driver, we do not have to deal with a native message type from the underlying library.
48
     * So we just let the Producer create messages of the good type, and decoding them becomes a no-op
49
     *
50
     * @param \Kaliop\Queueing\Plugins\StompBundle\Adapter\Stomp\Message $message
51
     * @return \Kaliop\Queueing\Plugins\StompBundle\Adapter\Stomp\Message
52
     */
53
    public function decodeMessage($message)
54
    {
55
        return $message;
56
    }
57
58
    /**
59
     * @param string $queueName
60
     * @return \Kaliop\QueueingBundle\Queue\QueueManagerInterface
61
     */
62
    public function getQueueManager($queueName)
63
    {
64
        $mgr = $this->getQueueManagerInternal();
65
        $mgr->setQueueName($queueName);
66
        return $mgr;
67
    }
68
69
    public function setDebug($debug)
70
    {
71
        $this->debug = $debug;
72
73
        return $this;
74
    }
75
76
    /**
77
     * @param string $connectionId
78
     * @param array $params
79
     */
80
    public function registerConnection($connectionId, array $params)
81
    {
82
        $this->connections[$connectionId] = $params;
83
    }
84
85
    protected function getConnectionConfig($connectionId)
86
    {
87
        if (!isset($this->connections[$connectionId])) {
88
            throw new \RuntimeException("Connection '$connectionId' is not registered with Stomp driver");
89
        }
90
91
        return $this->connections[$connectionId];
92
    }
93
94
    /**
95
     * Dynamically creates a producer, with no need for configuration except for the connection configuration
96
     *
97
     * @param string $queueName        the name to be used later when asking the producer to the driver
98
     * @param string $queueDestination the stomp destination
99
     * @param string $connectionId     the name of a stomp connection as set in configuration
100
     * @return Producer
101
     */
102
    public function createProducer($queueName, $queueDestination, $connectionId)
103
    {
104
        $class = $this->container->getParameter('kaliop_queueing.stomp.producer.class');
105
        $producer = new $class($this->getConnectionConfig($connectionId));
106
        $producer->setStompQueueName($queueDestination);
107
        $this->container->set("kaliop_queueing.stomp.{$queueName}_producer", $producer);
108
        return $producer;
109
    }
110
111
    /**
112
     * Dynamically creates a consumer, with no need for configuration except for the connection configuration
113
     *
114
     * @param string $queueName        the name to be used later when asking the producer to the driver
115
     * @param string $queueDestination the stomp destination
116
     * @param string $connectionId     the name of a stomp connection as set in configuration
117
     * @param string $subscriptionName the name of the subscription to be used to connect to the broker
118
     * @param MessageConsumerInterface $callback
119
     * @param string $routingKey
120
     * @return Consumer
121
     */
122
    public function createConsumer($queueName, $queueDestination, $connectionId, $subscriptionName, $callback=null, $routingKey=null)
123
    {
124
        $class = $this->container->getParameter('kaliop_queueing.stomp.consumer.class');
125
        $consumer = new $class($this->getConnectionConfig($connectionId));
126
        $consumer->setSubscriptionName($subscriptionName)->setStompQueueName($queueDestination)->setRoutingKey($routingKey)->setQueueName($queueName);
127
        if ($callback != null) {
128
            $consumer->setCallBack($callback);
129
        }
130
        $this->container->set("kaliop_queueing.stomp.{$queueName}_consumer", $consumer);
131
        return $consumer;
132
    }
133
}
134