Completed
Pull Request — master (#38)
by
unknown
03:48
created

BaseAmqp::getChannel()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 3
ccs 0
cts 2
cp 0
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 0
crap 2
1
<?php
2
/**
3
 * The MIT License
4
 *
5
 * Copyright (c) 2010 Alvaro Videla
6
 *
7
 * Permission is hereby granted, free of charge, to any person obtaining a copy
8
 * of this software and associated documentation files (the "Software"), to deal
9
 * in the Software without restriction, including without limitation the rights
10
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11
 * copies of the Software, and to permit persons to whom the Software is
12
 * furnished to do so, subject to the following conditions:
13
 *
14
 * The above copyright notice and this permission notice shall be included in
15
 * all copies or substantial portions of the Software.
16
 *
17
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23
 * SOFTWARE.
24
 *
25
 * PHP version 5.3
26
 *
27
 * @category   Thumper
28
 * @package    Thumper
29
 * @author     Alvaro Videla
30
 * @copyright  2010 Alvaro Videla. All rights reserved.
31
 * @license    MIT http://opensource.org/licenses/MIT
32
 * @link       https://github.com/videlalvaro/Thumper
33
 */
34
namespace Thumper;
35
36
use InvalidArgumentException;
37
use PhpAmqpLib\Channel\AMQPChannel;
38
use PhpAmqpLib\Connection\AbstractConnection;
39
40
abstract class BaseAmqp
41
{
42
    const NON_PERSISTENT = 1;
43
    const PERSISTENT = 2;
44
45
    /**
46
     * @var AbstractConnection
47
     */
48
    protected $connection;
49
50
    /**
51
     * @var AMQPChannel
52
     */
53
    protected $channel;
54
55
    /**
56
     * @var array
57
     */
58
    protected $exchangeOptions = array(
59
        'passive' => false,
60
        'durable' => true,
61
        'auto_delete' => false,
62
        'internal' => false,
63
        'nowait' => false,
64
        'arguments' => null,
65
        'ticket' => null
66
    );
67
68
    /**
69
     * @var array
70
     */
71
    protected $queueOptions = array(
72
        'name' => '',
73
        'passive' => false,
74
        'durable' => true,
75
        'exclusive' => false,
76
        'auto_delete' => false,
77
        'nowait' => false,
78
        'arguments' => null,
79
        'ticket' => null
80
    );
81
82
    /**
83
     * @var array
84
     */
85
    protected $consumerOptions = array(
86
        'qos' => array()
87
    );
88
89
    /**
90
     * @var string
91
     */
92
    protected $routingKey = '';
93
94
    /**
95
     * @var array
96
     */
97
    protected $parameters = array(
98
        'content_type' => 'text/plain'
99
    );
100
101
    /**
102
     * BaseAmqp constructor.
103
     * @param AbstractConnection $connection
104
     */
105 245
    public function __construct(AbstractConnection $connection)
106
    {
107 245
        $this->connection = $connection;
108 245
        $this->channel = $this->connection->channel();
109 245
    }
110
111
    /**
112
     * @return AMQPChannel
113
     */
114
    public function getChannel() {
115
        return $this->channel;
116
    }
117
118
    /**
119
     * @param array $options
120
     */
121 135
    public function setExchangeOptions(array $options)
122
    {
123 135
        if (!isset($options['name']) || !$this->isValidExchangeName($options['name'])) {
124 10
            throw new InvalidArgumentException(
125 2
                'You must provide an exchange name'
126 8
            );
127
        }
128
129 125
        if (empty($options['type'])) {
130 20
            throw new InvalidArgumentException(
131 4
                'You must provide an exchange type'
132 16
            );
133
        }
134
135 105
        $this->exchangeOptions = array_merge(
136 105
            $this->exchangeOptions,
137 21
            $options
138 84
        );
139 105
    }
140
141
    /**
142
     * @param array $options
143
     */
144 40
    public function setQueueOptions(array $options)
145
    {
146 40
        $this->queueOptions = array_merge(
147 40
            $this->queueOptions,
148 8
            $options
149 32
        );
150 40
    }
151
152
    /**
153
     * @param string $routingKey
154
     */
155 5
    public function setRoutingKey($routingKey)
156
    {
157 5
        $this->routingKey = $routingKey;
158 5
    }
159
160
    /**
161
     * @param array $options
162
     */
163 10
    public function setQos(array $options)
164
    {
165 10
        $this->consumerOptions['qos'] = array_merge($this->consumerOptions['qos'], $options);
166 10
    }
167
168
    /**
169
     * Setup consumer.
170
     */
171 70
    protected function setUpConsumer()
172
    {
173 70
        if (isset($this->exchangeOptions['name'])) {
174 70
            $this->channel
175 70
                ->exchange_declare(
176 70
                    $this->exchangeOptions['name'],
177 70
                    $this->exchangeOptions['type'],
178 70
                    $this->exchangeOptions['passive'],
179 70
                    $this->exchangeOptions['durable'],
180 70
                    $this->exchangeOptions['auto_delete'],
181 70
                    $this->exchangeOptions['internal'],
182 70
                    $this->exchangeOptions['nowait'],
183 70
                    $this->exchangeOptions['arguments'],
184 70
                    $this->exchangeOptions['ticket']
185 56
                );
186
187 60
            if (!empty($this->consumerOptions['qos'])) {
188 5
                $this->channel
189 5
                    ->basic_qos(
190 5
                        $this->consumerOptions['qos']['prefetch_size'],
191 5
                        $this->consumerOptions['qos']['prefetch_count'],
192 5
                        $this->consumerOptions['qos']['global']
193 4
                    );
194 4
            }
195 48
        }
196
197 60
        list($queueName, , ) = $this->channel
198 60
            ->queue_declare(
199 60
                $this->queueOptions['name'],
200 60
                $this->queueOptions['passive'],
201 60
                $this->queueOptions['durable'],
202 60
                $this->queueOptions['exclusive'],
203 60
                $this->queueOptions['auto_delete'],
204 60
                $this->queueOptions['nowait'],
205 60
                $this->queueOptions['arguments'],
206 60
                $this->queueOptions['ticket']
207 48
            );
208
209 50
        if (isset($this->exchangeOptions['name'])) {
210 50
            $this->channel
211 50
                ->queue_bind($queueName, $this->exchangeOptions['name'], $this->routingKey);
212 32
        }
213
214 40
        $this->channel
215 40
            ->basic_consume(
216 32
                $queueName,
217 40
                $this->getConsumerTag(),
218 40
                false,
219 40
                false,
220 40
                false,
221 40
                false,
222 40
                array($this, 'processMessage')
223 32
            );
224 30
    }
225
226
    /**
227
     * @return string
228
     */
229 40
    protected function getConsumerTag()
230
    {
231 40
        return 'PHPPROCESS_' . getmypid();
232
    }
233
234
    /**
235
     * Verifies exchange name meets the 0.9.1 protocol standard.
236
     *
237
     * letters, digits, hyphen, underscore, period, or colon
238
     *
239
     * @param string $exchangeName
240
     * @return bool
241
     */
242 125
    private function isValidExchangeName($exchangeName)
243
    {
244 125
        return preg_match('/^[A-Za-z0-9_\-\.\;]*$/', $exchangeName);
245
    }
246
247
    /**
248
     * @param string $key
249
     * @param string $value
250
     */
251 35
    public function setParameter($key, $value)
252
    {
253 35
        $this->parameters[$key] = $value;
254 35
    }
255
256
    /**
257
     * @return array
258
     */
259 35
    public function getParameters()
260
    {
261 35
        return $this->parameters;
262
    }
263
}
264