Completed
Push — master ( c8f703...9ddf32 )
by Olivier
25s queued 10s
created

ConnectionProcessor::__construct()   A

Complexity

Conditions 6
Paths 12

Size

Total Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 19
rs 9.0111
c 0
b 0
f 0
cc 6
nc 12
nop 2
1
<?php
2
3
namespace Swarrot\Processor\Doctrine;
4
5
use Doctrine\Common\Persistence\ConnectionRegistry;
6
use Doctrine\DBAL\Connection;
7
use Doctrine\DBAL\Connections\MasterSlaveConnection;
8
use Doctrine\DBAL\DBALException;
9
use Swarrot\Broker\Message;
10
use Swarrot\Processor\ConfigurableInterface;
11
use Swarrot\Processor\ProcessorInterface;
12
use Symfony\Component\OptionsResolver\OptionsResolver;
13
14
/**
15
 * @author Adrien Brault <[email protected]>
16
 */
17
class ConnectionProcessor implements ConfigurableInterface
18
{
19
    /**
20
     * @var ProcessorInterface
21
     */
22
    private $processor;
23
24
    /**
25
     * @var Connection[]
26
     */
27
    private $connections;
28
29
    public function __construct(ProcessorInterface $processor, $connections)
30
    {
31
        if ($connections instanceof ConnectionRegistry) {
32
            $connections = $connections->getConnections();
33
        }
34
35
        if (!is_array($connections)) {
36
            $connections = [$connections];
37
        }
38
39
        foreach ($connections as $connection) {
40
            if (!$connection instanceof Connection) {
41
                throw new \InvalidArgumentException(sprintf('$connections must be an array of Connection, but one of the elements was %s', is_object($connection) ? get_class($connection) : gettype($connection)));
42
            }
43
        }
44
45
        $this->processor = $processor;
46
        $this->connections = $connections;
47
    }
48
49
    /**
50
     * {@inheritdoc}
51
     */
52
    public function process(Message $message, array $options)
53
    {
54
        if ($options['doctrine_ping']) {
55
            foreach ($this->connections as $connection) {
56
                if ($connection->isConnected()) {
57
                    try {
58
                        $connection->query($connection->getDatabasePlatform()->getDummySelectSQL());
59
                    } catch (DBALException $e) {
60
                        $connection->close(); // close timed out connections so that using them connects again
61
                    }
62
                }
63
            }
64
        }
65
66
        try {
67
            return $this->processor->process($message, $options);
68
        } finally {
69
            if ($options['doctrine_close_master']) {
70
                foreach ($this->connections as $connection) {
71
                    if ($connection instanceof MasterSlaveConnection
72
                        && $connection->isConnectedToMaster()
73
                    ) {
74
                        $connection->close();
75
                    }
76
                }
77
            }
78
        }
79
    }
80
81
    /**
82
     * {@inheritdoc}
83
     */
84
    public function setDefaultOptions(OptionsResolver $resolver)
85
    {
86
        $resolver->setDefaults([
87
            'doctrine_ping' => true,
88
            'doctrine_close_master' => true,
89
        ]);
90
    }
91
}
92