Completed
Push — master ( d91936...07c06b )
by John
03:13
created

BaseAmqp::isValidExchangeName()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
cc 1
eloc 2
nc 1
nop 1
crap 1
1
<?php
2
/**
3
 * The MIT License
4
 *
5
 * Copyright (c) 2010 Alvaro Videla
6
 *
7
 * Permission is hereby granted, free of charge, to any person obtaining a copy
8
 * of this software and associated documentation files (the "Software"), to deal
9
 * in the Software without restriction, including without limitation the rights
10
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11
 * copies of the Software, and to permit persons to whom the Software is
12
 * furnished to do so, subject to the following conditions:
13
 *
14
 * The above copyright notice and this permission notice shall be included in
15
 * all copies or substantial portions of the Software.
16
 *
17
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23
 * SOFTWARE.
24
 *
25
 * PHP version 5.3
26
 *
27
 * @category   Thumper
28
 * @package    Thumper
29
 * @author     Alvaro Videla
30
 * @copyright  2010 Alvaro Videla. All rights reserved.
31
 * @license    MIT http://opensource.org/licenses/MIT
32
 * @link       https://github.com/videlalvaro/Thumper
33
 */
34
namespace Thumper;
35
36
use InvalidArgumentException;
37
use PhpAmqpLib\Channel\AMQPChannel;
38
use PhpAmqpLib\Connection\AbstractConnection;
39
40
abstract class BaseAmqp
41
{
42
    /**
43
     * @var AbstractConnection
44
     */
45
    protected $connection;
46
47
    /**
48
     * @var AMQPChannel
49
     */
50
    protected $channel;
51
52
    /**
53
     * @var array
54
     */
55
    protected $exchangeOptions = array(
56
        'passive' => false,
57
        'durable' => true,
58
        'auto_delete' => false,
59
        'internal' => false,
60
        'nowait' => false,
61
        'arguments' => null,
62
        'ticket' => null
63
    );
64
65
    /**
66
     * @var array
67
     */
68
    protected $queueOptions = array(
69
        'name' => '',
70
        'passive' => false,
71
        'durable' => true,
72
        'exclusive' => false,
73
        'auto_delete' => false,
74
        'nowait' => false,
75
        'arguments' => null,
76
        'ticket' => null
77
    );
78
79
    /**
80
     * @var array
81
     */
82
    protected $consumerOptions = array(
83
        'qos' => array()
84
    );
85
86
    /**
87
     * @var string
88
     */
89
    protected $routingKey = '';
90
91
    /**
92
     * BaseAmqp constructor.
93
     * @param AbstractConnection $connection
94
     */
95 245
    public function __construct(AbstractConnection $connection)
96
    {
97 245
        $this->connection = $connection;
98 245
        $this->channel = $this->connection->channel();
99 245
    }
100
101
    /**
102
     * @param array $options
103
     */
104 135
    public function setExchangeOptions(array $options)
105
    {
106 135
        if (!isset($options['name']) || !$this->isValidExchangeName($options['name'])) {
107 10
            throw new InvalidArgumentException(
108 2
                'You must provide an exchange name'
109 8
            );
110
        }
111
112 125
        if (empty($options['type'])) {
113 20
            throw new InvalidArgumentException(
114 4
                'You must provide an exchange type'
115 16
            );
116
        }
117
118 105
        $this->exchangeOptions = array_merge(
119 105
            $this->exchangeOptions,
120
            $options
121 84
        );
122 105
    }
123
124
    /**
125
     * @param array $options
126
     */
127 40
    public function setQueueOptions(array $options)
128
    {
129 40
        $this->queueOptions = array_merge(
130 40
            $this->queueOptions,
131
            $options
132 32
        );
133 40
    }
134
135
    /**
136
     * @param string $routingKey
137
     */
138 5
    public function setRoutingKey($routingKey)
139
    {
140 5
        $this->routingKey = $routingKey;
141 5
    }
142
143
    /**
144
     * @param array $options
145
     */
146 10
    public function setQos(array $options)
147
    {
148 10
        $this->consumerOptions['qos'] = array_merge($this->consumerOptions['qos'], $options);
149 10
    }
150
151
    /**
152
     * Setup consumer.
153
     */
154 70
    protected function setUpConsumer()
155
    {
156 70
        if(isset($this->exchangeOptions['name'])) {
157 70
            $this->channel
158 70
                ->exchange_declare(
159 70
                    $this->exchangeOptions['name'],
160 70
                    $this->exchangeOptions['type'],
161 70
                    $this->exchangeOptions['passive'],
162 70
                    $this->exchangeOptions['durable'],
163 70
                    $this->exchangeOptions['auto_delete'],
164 70
                    $this->exchangeOptions['internal'],
165 70
                    $this->exchangeOptions['nowait'],
166 70
                    $this->exchangeOptions['arguments'],
167 70
                    $this->exchangeOptions['ticket']
168 56
                );
169
170 60
            if (!empty($this->consumerOptions['qos'])) {
171 5
                $this->channel
172 5
                    ->basic_qos(
173 5
                        $this->consumerOptions['qos']['prefetch_size'],
174 5
                        $this->consumerOptions['qos']['prefetch_count'],
175 5
                        $this->consumerOptions['qos']['global']
176 4
                    );
177 4
            }
178 48
        }
179
180 60
        list($queueName, , ) = $this->channel
181 60
            ->queue_declare(
182 60
                $this->queueOptions['name'],
183 60
                $this->queueOptions['passive'],
184 60
                $this->queueOptions['durable'],
185 60
                $this->queueOptions['exclusive'],
186 60
                $this->queueOptions['auto_delete'],
187 60
                $this->queueOptions['nowait'],
188 60
                $this->queueOptions['arguments'],
189 60
                $this->queueOptions['ticket']
190 48
            );
191
192 50
        if(isset($this->exchangeOptions['name'])) {
193 50
            $this->channel
194 50
                ->queue_bind($queueName, $this->exchangeOptions['name'], $this->routingKey);
195 32
        }
196
197 40
        $this->channel
198 40
            ->basic_consume(
199 32
                $queueName,
200 40
                $this->getConsumerTag(),
201 40
                false,
202 40
                false,
203 40
                false,
204 40
                false,
205 40
                array($this, 'processMessage')
206 32
            );
207 30
    }
208
209
    /**
210
     * @return string
211
     */
212 40
    protected function getConsumerTag()
213
    {
214 40
        return 'PHPPROCESS_' . getmypid();
215
    }
216
217
    /**
218
     * Verifies exchange name meets the 0.9.1 protocol standard.
219
     *
220
     * letters, digits, hyphen, underscore, period, or colon
221
     *
222
     * @param string $exchangeName
223
     * @return bool
224
     */
225 125
    private function isValidExchangeName($exchangeName)
226
    {
227 125
        return preg_match('/^[A-Za-z0-9_\-\.\;]*$/', $exchangeName);
228
    }
229
}
230