Completed
Push — master ( cef50b...fa3cc9 )
by Thomas Mauro
02:58
created

Channel::setConnection()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 5
ccs 0
cts 3
cp 0
rs 9.4285
cc 1
eloc 3
nc 1
nop 1
crap 2
1
<?php
2
3
namespace AMQPAL\Adapter\AMQP;
4
5
use AMQPAL\Adapter\ChannelInterface;
6
use AMQPAL\Options;
7
8
/**
9
 * Class Channel
10
 *
11
 * @package AMQPAL\Adapter\AMQP
12
 */
13
class Channel implements ChannelInterface
14
{
15
    /**
16
     * @var \AMQPChannel
17
     */
18
    protected $resource;
19
    /**
20
     * @var Connection
21
     */
22
    protected $connection;
23
    /**
24
     * @var Exchange
25
     */
26
    protected $exchangePrototype;
27
    /**
28
     * @var Queue
29
     */
30
    protected $queuePrototype;
31
32
    /**
33
     * Channel constructor.
34
     *
35
     * @param Exchange $exchangePrototype
36
     * @param Queue    $queuePrototype
37
     */
38 9
    public function __construct(Exchange $exchangePrototype = null, Queue $queuePrototype = null)
39
    {
40 9
        $this->registerExchange($exchangePrototype ?: new Exchange());
41 9
        $this->registerQueue($queuePrototype ?: new Queue());
42 9
    }
43
44
    /**
45
     * @param Exchange $exchange
46
     */
47 9
    public function registerExchange(Exchange $exchange)
48
    {
49 9
        $this->exchangePrototype = $exchange;
50 9
    }
51
52
    /**
53
     * @param Queue $queue
54
     */
55 9
    public function registerQueue(Queue $queue)
56
    {
57 9
        $this->queuePrototype = $queue;
58 9
    }
59
60
    /**
61
     * @return \AMQPChannel
62
     */
63 8
    public function getResource()
64
    {
65 8
        return $this->resource;
66
    }
67
68
    /**
69
     * @param \AMQPChannel $resource
70
     * @return $this
71
     */
72 8
    public function setResource(\AMQPChannel $resource)
73
    {
74 8
        $this->resource = $resource;
75
76 8
        return $this;
77
    }
78
79
    /**
80
     * Check the channel connection.
81
     *
82
     * @return bool Indicates whether the channel is connected.
83
     */
84 1
    public function isConnected()
85
    {
86 1
        return $this->getResource()->isConnected();
87
    }
88
89
    /**
90
     * Return internal channel ID
91
     *
92
     * @return integer
93
     */
94 1
    public function getChannelId()
95
    {
96 1
        return $this->getResource()->getChannelId();
97
    }
98
99
    /**
100
     * Set the window size and the number of messages to prefetch from the broker.
101
     *
102
     * @param int $prefetchSize  The window size, in octets, to prefetch.
103
     * @param int $prefetchCount The number of messages to prefetch.
104
     * @return $this
105
     * @throws \AMQPConnectionException
106
     */
107 1
    public function setQos($prefetchSize, $prefetchCount)
108
    {
109 1
        $this->getResource()->qos($prefetchSize, $prefetchCount);
110
111 1
        return $this;
112
    }
113
114
    /**
115
     * Start a transaction.
116
     *
117
     * @return $this
118
     * @throws \AMQPChannelException
119
     * @throws \AMQPConnectionException
120
     */
121 1
    public function startTransaction()
122
    {
123 1
        $this->getResource()->startTransaction();
124
125 1
        return $this;
126
    }
127
128
    /**
129
     * Commit a pending transaction.
130
     *
131
     * @return $this
132
     * @throws \AMQPChannelException
133
     * @throws \AMQPConnectionException
134
     */
135 1
    public function commitTransaction()
136
    {
137 1
        $this->getResource()->commitTransaction();
138
139 1
        return $this;
140
    }
141
142
    /**
143
     * Rollback a transaction.
144
     *
145
     * @return $this
146
     * @throws \AMQPChannelException
147
     * @throws \AMQPConnectionException
148
     */
149 1
    public function rollbackTransaction()
150
    {
151 1
        $this->getResource()->rollbackTransaction();
152
153 1
        return $this;
154
    }
155
156
    /**
157
     * @param Connection $connection
158
     * @return $this
159
     */
160
    public function setConnection(Connection $connection)
161
    {
162
        $this->connection = $connection;
163
        return $this;
164
    }
165
166
    /**
167
     * Get the connection object in use
168
     *
169
     * @return Connection
170
     */
171
    public function getConnection()
172
    {
173
        return $this->connection;
174
    }
175
176
    /**
177
     * Redeliver unacknowledged messages.
178
     *
179
     * @param bool $requeue
180
     * @return $this
181
     */
182 2
    public function basicRecover($requeue = true)
183
    {
184 2
        $this->getResource()->basicRecover($requeue);
185
186 2
        return $this;
187
    }
188
189
    /**
190
     * @param Options\QueueOptions $options
191
     * @param \AMQPQueue           $resource
192
     * @return Queue
193
     * @throws \AMQPConnectionException
194
     * @throws \AMQPQueueException
195
     */
196
    public function createQueue(Options\QueueOptions $options, $resource = null)
197
    {
198
        $queue = clone $this->queuePrototype;
199
200
        if ($resource instanceof \AMQPQueue) {
201
            $queue->setResource($resource);
202
        } else {
203
            $queue->setResource($this->createQueueResource());
204
        }
205
206
        $queue->setChannel($this);
207
        $queue->setOptions($options);
208
209
        return $queue;
210
    }
211
212
    /**
213
     * @param Options\ExchangeOptions $options
214
     * @param \AMQPExchange           $resource
215
     * @return Exchange
216
     * @throws \AMQPConnectionException
217
     * @throws \AMQPExchangeException
218
     */
219
    public function createExchange(Options\ExchangeOptions $options, $resource = null)
220
    {
221
        $exchange = clone $this->exchangePrototype;
222
223
        if ($resource instanceof \AMQPExchange) {
224
            $exchange->setResource($resource);
225
        } else {
226
            $exchange->setResource($this->createExchangeResource());
227
        }
228
229
        $exchange->setChannel($this);
230
        $exchange->setOptions($options);
231
232
        return $exchange;
233
    }
234
235
    /**
236
     * @return \AMQPQueue
237
     * @throws \AMQPConnectionException
238
     * @throws \AMQPQueueException
239
     * @codeCoverageIgnore
240
     */
241
    protected function createQueueResource()
242
    {
243
        return new \AMQPQueue($this->getResource());
244
    }
245
246
    /**
247
     * @return \AMQPExchange
248
     * @throws \AMQPConnectionException
249
     * @throws \AMQPExchangeException
250
     * @codeCoverageIgnore
251
     */
252
    protected function createExchangeResource()
253
    {
254
        return new \AMQPExchange($this->getResource());
255
    }
256
}
257