Driver   A
last analyzed

Complexity

Total Complexity 9

Size/Duplication

Total Lines 72
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 3

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
wmc 9
lcom 1
cbo 3
dl 0
loc 72
ccs 0
cts 27
cp 0
rs 10
c 0
b 0
f 0

7 Methods

Rating   Name   Duplication   Size   Complexity  
A getConsumer() 0 8 1
A getQueueCallback() 0 8 3
A acceptMessage() 0 4 1
A decodeMessage() 0 4 1
A getProducer() 0 7 1
A getQueueManager() 0 6 1
A setDebug() 0 6 1
1
<?php
2
3
namespace Kaliop\Queueing\Plugins\KinesisBundle\Adapter\Kinesis;
4
5
use Kaliop\QueueingBundle\Adapter\DriverInterface;
6
use Kaliop\QueueingBundle\Queue\QueueManagerAwareTrait;
7
use Symfony\Component\DependencyInjection\ContainerAwareTrait;
8
use Symfony\Component\DependencyInjection\ContainerAwareInterface;
9
10
class Driver implements DriverInterface, ContainerAwareInterface
11
{
12
    use ContainerAwareTrait;
13
    use QueueManagerAwareTrait;
14
15
    protected $debug;
16
17
    public function getConsumer($queueName)
18
    {
19
        $consumer = $this->container->get('kaliop_queueing.kinesis.consumer');
0 ignored issues
show
Bug Compatibility introduced by
The expression $this->container->get('k...ing.kinesis.consumer'); of type object|null adds the type object to the return on line 23 which is incompatible with the return type declared by the interface Kaliop\QueueingBundle\Ad...rInterface::getConsumer of type string.
Loading history...
20
        $callback = $this->getQueueCallback($queueName);
21
        $consumer->setQueueName($queueName);
22
        $consumer->setCallback($callback);
23
        return $consumer;
24
    }
25
26
    protected function getQueueCallback($queueName)
27
    {
28
        $callbacks = $this->container->getParameter('kaliop_queueing_kinesis.default.consumers');
29
        if (!isset($callbacks[$queueName]) || !isset($callbacks[$queueName]['callback'])) {
30
            throw new \UnexpectedValueException("No callback has been defined for queue '$queueName', please check config parameter 'kaliop_queueing_kinesis.default.consumers'");
31
        }
32
        return $this->container->get($callbacks[$queueName]['callback']);
33
    }
34
35
    public function acceptMessage($message)
36
    {
37
        return $message instanceof \Kaliop\Queueing\Plugins\KinesisBundle\Adapter\Kinesis\Message;
38
    }
39
40
    /**
41
     * Unlike the RabbitMQ driver, we do not have to deal with a native message type from the underlying library.
42
     * So we just let the Producer create messages of the good type, and decoding them becomes a no-op
43
     *
44
     * @param \Kaliop\Queueing\Plugins\KinesisBundle\Adapter\Kinesis\Message $message
45
     * @return \Kaliop\Queueing\Plugins\KinesisBundle\Adapter\Kinesis\Message
46
     */
47
    public function decodeMessage($message)
48
    {
49
        return $message;
50
    }
51
52
    /**
53
     * @param string $queueName
54
     * @return \Kaliop\QueueingBundle\Queue\ProducerInterface
55
     */
56
    public function getProducer($queueName)
57
    {
58
        $producer = $this->container->get('kaliop_queueing.kinesis.producer');
59
        $producer->setQueueName($queueName);
60
        $producer->setDebug($this->debug);
61
        return $producer;
62
    }
63
64
    /**
65
     * @param string $queueName
66
     * @return \Kaliop\QueueingBundle\Queue\QueueManagerInterface
67
     */
68
    public function getQueueManager($queueName)
69
    {
70
        $mgr = $this->getQueueManagerInternal();
71
        $mgr->setQueueName($queueName);
72
        return $mgr;
73
    }
74
75
    public function setDebug($debug)
76
    {
77
        $this->debug = $debug;
78
79
        return $this;
80
    }
81
}
82