ConsumerFactory   A
last analyzed

Complexity

Total Complexity 8

Size/Duplication

Total Lines 73
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 8
dl 0
loc 73
ccs 30
cts 30
cp 1
rs 10
c 1
b 0
f 0

3 Methods

Rating   Name   Duplication   Size   Complexity  
A getOptionsClass() 0 4 1
B createConsumer() 0 32 6
A __invoke() 0 7 1
1
<?php
2
3
namespace RabbitMqModule\Service;
4
5
use Interop\Container\ContainerInterface;
6
use Interop\Container\Exception\ContainerException;
7
use RabbitMqModule\Consumer;
8
use RabbitMqModule\ConsumerInterface;
9
use Zend\ServiceManager\Exception\ServiceNotCreatedException;
10
use Zend\ServiceManager\Exception\ServiceNotFoundException;
11
use RabbitMqModule\Options\Consumer as Options;
12
use InvalidArgumentException;
13
14
/**
15
 * Class ConsumerFactory
16
 * @package RabbitMqModule\Service
17
 */
18
class ConsumerFactory extends AbstractFactory
19
{
20
    /**
21
     * Get the class name of the options associated with this factory.
22
     *
23
     * @return string
24
     */
25 2
    public function getOptionsClass()
26
    {
27 2
        return Options::class;
28
    }
29
30
    /**
31
     * @param ContainerInterface $serviceLocator
32
     * @param Options                 $options
33
     *
34
     * @throws InvalidArgumentException
35
     *
36
     * @return Consumer
37
     */
38 2
    protected function createConsumer(ContainerInterface $serviceLocator, Options $options)
39
    {
40 2
        $callback = $options->getCallback();
41 2
        if (is_string($callback)) {
42 1
            $callback = $serviceLocator->get($callback);
43 1
        }
44 2
        if ($callback instanceof ConsumerInterface) {
45 1
            $callback = [$callback, 'execute'];
46 1
        }
47 2
        if (!is_callable($callback)) {
48 1
            throw new InvalidArgumentException('Invalid callback provided');
49
        }
50
51
        /** @var \PhpAmqpLib\Connection\AbstractConnection $connection */
52 1
        $connection = $serviceLocator->get(sprintf('%s.connection.%s', $this->configKey, $options->getConnection()));
53 1
        $consumer = new Consumer($connection);
54 1
        $consumer->setQueueOptions($options->getQueue());
55 1
        $consumer->setExchangeOptions($options->getExchange());
56 1
        $consumer->setConsumerTag($options->getConsumerTag() ?: sprintf('PHPPROCESS_%s_%s', gethostname(), getmypid()));
57 1
        $consumer->setAutoSetupFabricEnabled($options->isAutoSetupFabricEnabled());
58 1
        $consumer->setCallback($callback);
59 1
        $consumer->setIdleTimeout($options->getIdleTimeout());
60
61 1
        if ($options->getQos()) {
62 1
            $consumer->setQosOptions(
63 1
                $options->getQos()->getPrefetchSize(),
64 1
                $options->getQos()->getPrefetchCount()
65 1
            );
66 1
        }
67
68 1
        return $consumer;
69
    }
70
71
    /**
72
     * Create an object
73
     *
74
     * @param  ContainerInterface $container
75
     * @param  string $requestedName
76
     * @param  null|array $options
77
     * @return Consumer
78
     * @throws ServiceNotFoundException if unable to resolve the service.
79
     * @throws ServiceNotCreatedException if an exception is raised when
80
     *     creating a service.
81
     * @throws ContainerException if any other error occurs
82
     */
83 2
    public function __invoke(ContainerInterface $container, $requestedName, array $options = null)
84
    {
85
        /* @var $options Options */
86 2
        $options = $this->getOptions($container, 'consumer');
87
88 2
        return $this->createConsumer($container, $options);
89
    }
90
}
91