Completed
Push — master ( 2ef464...a5d107 )
by Daniel
18s queued 11s
created

Connection::getChannel()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 9
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
eloc 4
c 1
b 0
f 1
dl 0
loc 9
rs 10
cc 2
nc 2
nop 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Jellyfish\QueueRabbitMq;
6
7
use Exception;
8
use Jellyfish\Queue\DestinationInterface;
9
use Jellyfish\QueueRabbitMq\Exception\CouldNotBindQueueException;
10
use PhpAmqpLib\Channel\AMQPChannel;
11
use PhpAmqpLib\Connection\AbstractConnection;
12
13
use function strtolower;
14
15
class Connection implements ConnectionInterface
16
{
17
    /**
18
     * @var \PhpAmqpLib\Channel\AMQPChannel
19
     */
20
    protected $channel;
21
22
    /**
23
     * @var \PhpAmqpLib\Connection\AbstractConnection
24
     */
25
    protected $connection;
26
27
    /**
28
     * @param \PhpAmqpLib\Connection\AbstractConnection $connection
29
     */
30
    public function __construct(AbstractConnection $connection)
31
    {
32
        $this->connection = $connection;
33
    }
34
35
    /**
36
     * @return \PhpAmqpLib\Channel\AMQPChannel
37
     */
38
    public function getChannel(): AMQPChannel
39
    {
40
        if ($this->channel !== null) {
41
            return $this->channel;
42
        }
43
44
        $this->channel = $this->connection->channel();
45
46
        return $this->channel;
47
    }
48
49
    /**
50
     * @param \Jellyfish\Queue\DestinationInterface $destination
51
     *
52
     * @return \Jellyfish\QueueRabbitMq\ConnectionInterface
53
     */
54
    public function createExchange(DestinationInterface $destination): ConnectionInterface
55
    {
56
        $this->getChannel()->exchange_declare(
57
            $destination->getName(),
58
            strtolower($destination->getType()),
59
            false,
60
            true,
61
            false
62
        );
63
64
        return $this;
65
    }
66
67
    /**
68
     * @param \Jellyfish\Queue\DestinationInterface $destination
69
     *
70
     * @return \Jellyfish\QueueRabbitMq\ConnectionInterface
71
     */
72
    public function createQueue(DestinationInterface $destination): ConnectionInterface
73
    {
74
        $this->getChannel()->queue_declare(
75
            $destination->getName(),
76
            false,
77
            true,
78
            false,
79
            false
80
        );
81
82
        return $this;
83
    }
84
85
    /**
86
     * @param \Jellyfish\Queue\DestinationInterface $destination
87
     *
88
     * @return \Jellyfish\QueueRabbitMq\ConnectionInterface
89
     */
90
    public function createQueueAndBind(DestinationInterface $destination): ConnectionInterface
91
    {
92
        $this->createQueue($destination);
93
94
        $bind = $destination->getProperty('bind');
95
96
        if ($bind === null) {
97
            throw new CouldNotBindQueueException('Destination property "bind" is not set.');
98
        }
99
100
        $this->getChannel()->queue_bind($destination->getName(), $bind);
101
102
        return $this;
103
    }
104
105
    /**
106
     * @return \Jellyfish\QueueRabbitMq\ConnectionInterface
107
     */
108
    public function close(): ConnectionInterface
109
    {
110
        if ($this->channel !== null) {
111
            $this->channel->close();
112
        }
113
114
        if ($this->connection->isConnected()) {
115
            $this->connection->close();
116
        }
117
118
        return $this;
119
    }
120
121
    public function __destruct()
122
    {
123
        try {
124
            $this->close();
125
        } catch (Exception $exception) {
126
            return;
127
        }
128
    }
129
}
130