RpcServerFactory   A
last analyzed

Complexity

Total Complexity 8

Size/Duplication

Total Lines 74
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 8
dl 0
loc 74
ccs 31
cts 31
cp 1
rs 10
c 0
b 0
f 0

3 Methods

Rating   Name   Duplication   Size   Complexity  
A getOptionsClass() 0 4 1
B createServer() 0 33 6
A __invoke() 0 7 1
1
<?php
2
3
namespace RabbitMqModule\Service;
4
5
use Interop\Container\ContainerInterface;
6
use Interop\Container\Exception\ContainerException;
7
use RabbitMqModule\ConsumerInterface;
8
use RabbitMqModule\RpcServer;
9
use Zend\ServiceManager\Exception\ServiceNotCreatedException;
10
use Zend\ServiceManager\Exception\ServiceNotFoundException;
11
use RabbitMqModule\Options\RpcServer as Options;
12
use InvalidArgumentException;
13
14
/**
15
 * Class RpcServerFactory
16
 * @package RabbitMqModule\Service
17
 */
18
class RpcServerFactory extends AbstractFactory
19
{
20
    /**
21
     * Get the class name of the options associated with this factory.
22
     *
23
     * @return string
24
     */
25 2
    public function getOptionsClass()
26
    {
27 2
        return Options::class;
28
    }
29
30
    /**
31
     * @param ContainerInterface $serviceLocator
32
     * @param Options                 $options
33
     *
34
     * @throws InvalidArgumentException
35
     *
36
     * @return RpcServer
37
     */
38 2
    protected function createServer(ContainerInterface $serviceLocator, Options $options)
39
    {
40 2
        $callback = $options->getCallback();
41 2
        if (is_string($callback)) {
42 1
            $callback = $serviceLocator->get($callback);
43 1
        }
44 2
        if ($callback instanceof ConsumerInterface) {
45 1
            $callback = [$callback, 'execute'];
46 1
        }
47 2
        if (!is_callable($callback)) {
48 1
            throw new InvalidArgumentException('Invalid callback provided');
49
        }
50
51
        /** @var \PhpAmqpLib\Connection\AbstractConnection $connection */
52 1
        $connection = $serviceLocator->get(sprintf('%s.connection.%s', $this->configKey, $options->getConnection()));
53 1
        $server = new RpcServer($connection);
54 1
        $server->setQueueOptions($options->getQueue());
55 1
        $server->setExchangeOptions($options->getExchange());
56 1
        $server->setConsumerTag($options->getConsumerTag() ?: sprintf('PHPPROCESS_%s_%s', gethostname(), getmypid()));
57 1
        $server->setAutoSetupFabricEnabled($options->isAutoSetupFabricEnabled());
58 1
        $server->setCallback($callback);
59 1
        $server->setIdleTimeout($options->getIdleTimeout());
60 1
        $server->setSerializer($options->getSerializer());
61
62 1
        if ($options->getQos()) {
63 1
            $server->setQosOptions(
64 1
                $options->getQos()->getPrefetchSize(),
65 1
                $options->getQos()->getPrefetchCount()
66 1
            );
67 1
        }
68
69 1
        return $server;
70
    }
71
72
    /**
73
     * Create an object
74
     *
75
     * @param  ContainerInterface $container
76
     * @param  string $requestedName
77
     * @param  null|array $options
78
     * @return RpcServer
79
     * @throws ServiceNotFoundException if unable to resolve the service.
80
     * @throws ServiceNotCreatedException if an exception is raised when
81
     *     creating a service.
82
     * @throws ContainerException if any other error occurs
83
     */
84 2
    public function __invoke(ContainerInterface $container, $requestedName, array $options = null)
85
    {
86
        /* @var $options Options */
87 2
        $options = $this->getOptions($container, 'rpc_server');
88
89 2
        return $this->createServer($container, $options);
90
    }
91
}
92