Completed
Push — master ( 438392...68aed1 )
by John
04:20
created

BaseAmqp::setParameter()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 4
ccs 3
cts 3
cp 1
rs 10
cc 1
eloc 2
nc 1
nop 2
crap 1
1
<?php
2
/**
3
 * The MIT License
4
 *
5
 * Copyright (c) 2010 Alvaro Videla
6
 *
7
 * Permission is hereby granted, free of charge, to any person obtaining a copy
8
 * of this software and associated documentation files (the "Software"), to deal
9
 * in the Software without restriction, including without limitation the rights
10
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11
 * copies of the Software, and to permit persons to whom the Software is
12
 * furnished to do so, subject to the following conditions:
13
 *
14
 * The above copyright notice and this permission notice shall be included in
15
 * all copies or substantial portions of the Software.
16
 *
17
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23
 * SOFTWARE.
24
 *
25
 * PHP version 5.3
26
 *
27
 * @category   Thumper
28
 * @package    Thumper
29
 * @author     Alvaro Videla
30
 * @copyright  2010 Alvaro Videla. All rights reserved.
31
 * @license    MIT http://opensource.org/licenses/MIT
32
 * @link       https://github.com/videlalvaro/Thumper
33
 */
34
namespace Thumper;
35
36
use InvalidArgumentException;
37
use PhpAmqpLib\Channel\AMQPChannel;
38
use PhpAmqpLib\Connection\AbstractConnection;
39
40
abstract class BaseAmqp
41
{
42
    const NON_PERSISTENT = 1;
43
    const PERSISTENT = 2;
44
    
45
    /**
46
     * @var AbstractConnection
47
     */
48
    protected $connection;
49
50
    /**
51
     * @var AMQPChannel
52
     */
53
    protected $channel;
54
55
    /**
56
     * @var array
57
     */
58
    protected $exchangeOptions = array(
59
        'passive' => false,
60
        'durable' => true,
61
        'auto_delete' => false,
62
        'internal' => false,
63
        'nowait' => false,
64
        'arguments' => null,
65
        'ticket' => null
66
    );
67
68
    /**
69
     * @var array
70
     */
71
    protected $queueOptions = array(
72
        'name' => '',
73
        'passive' => false,
74
        'durable' => true,
75
        'exclusive' => false,
76
        'auto_delete' => false,
77
        'nowait' => false,
78
        'arguments' => null,
79
        'ticket' => null
80
    );
81
82
    /**
83
     * @var array
84
     */
85
    protected $consumerOptions = array(
86
        'qos' => array()
87
    );
88
89
    /**
90
     * @var string
91
     */
92
    protected $routingKey = '';
93
94
    /**
95
     * @var array
96
     */
97
    protected $parameters = array(
98
        'content_type' => 'text/plain'
99
    );
100
101
    /**
102
     * BaseAmqp constructor.
103
     * @param AbstractConnection $connection
104
     */
105 245
    public function __construct(AbstractConnection $connection)
106
    {
107 245
        $this->connection = $connection;
108 245
        $this->channel = $this->connection->channel();
109 245
    }
110
111
    /**
112
     * @param array $options
113
     */
114 135
    public function setExchangeOptions(array $options)
115
    {
116 135
        if (!isset($options['name']) || !$this->isValidExchangeName($options['name'])) {
117 10
            throw new InvalidArgumentException(
118 2
                'You must provide an exchange name'
119 8
            );
120
        }
121
122 125
        if (empty($options['type'])) {
123 20
            throw new InvalidArgumentException(
124 4
                'You must provide an exchange type'
125 16
            );
126
        }
127
128 105
        $this->exchangeOptions = array_merge(
129 105
            $this->exchangeOptions,
130
            $options
131 84
        );
132 105
    }
133
134
    /**
135
     * @param array $options
136
     */
137 40
    public function setQueueOptions(array $options)
138
    {
139 40
        $this->queueOptions = array_merge(
140 40
            $this->queueOptions,
141
            $options
142 32
        );
143 40
    }
144
145
    /**
146
     * @param string $routingKey
147
     */
148 5
    public function setRoutingKey($routingKey)
149
    {
150 5
        $this->routingKey = $routingKey;
151 5
    }
152
153
    /**
154
     * @param array $options
155
     */
156 10
    public function setQos(array $options)
157
    {
158 10
        $this->consumerOptions['qos'] = array_merge($this->consumerOptions['qos'], $options);
159 10
    }
160
161
    /**
162
     * Setup consumer.
163
     */
164 70
    protected function setUpConsumer()
165
    {
166 70
        if (isset($this->exchangeOptions['name'])) {
167 70
            $this->channel
168 70
                ->exchange_declare(
169 70
                    $this->exchangeOptions['name'],
170 70
                    $this->exchangeOptions['type'],
171 70
                    $this->exchangeOptions['passive'],
172 70
                    $this->exchangeOptions['durable'],
173 70
                    $this->exchangeOptions['auto_delete'],
174 70
                    $this->exchangeOptions['internal'],
175 70
                    $this->exchangeOptions['nowait'],
176 70
                    $this->exchangeOptions['arguments'],
177 70
                    $this->exchangeOptions['ticket']
178 56
                );
179
180 60
            if (!empty($this->consumerOptions['qos'])) {
181 5
                $this->channel
182 5
                    ->basic_qos(
183 5
                        $this->consumerOptions['qos']['prefetch_size'],
184 5
                        $this->consumerOptions['qos']['prefetch_count'],
185 5
                        $this->consumerOptions['qos']['global']
186 4
                    );
187 4
            }
188 48
        }
189
190 60
        list($queueName, , ) = $this->channel
191 60
            ->queue_declare(
192 60
                $this->queueOptions['name'],
193 60
                $this->queueOptions['passive'],
194 60
                $this->queueOptions['durable'],
195 60
                $this->queueOptions['exclusive'],
196 60
                $this->queueOptions['auto_delete'],
197 60
                $this->queueOptions['nowait'],
198 60
                $this->queueOptions['arguments'],
199 60
                $this->queueOptions['ticket']
200 48
            );
201
202 50
        if (isset($this->exchangeOptions['name'])) {
203 50
            $this->channel
204 50
                ->queue_bind($queueName, $this->exchangeOptions['name'], $this->routingKey);
205 32
        }
206
207 40
        $this->channel
208 40
            ->basic_consume(
209 32
                $queueName,
210 40
                $this->getConsumerTag(),
211 40
                false,
212 40
                false,
213 40
                false,
214 40
                false,
215 40
                array($this, 'processMessage')
216 32
            );
217 30
    }
218
219
    /**
220
     * @return string
221
     */
222 40
    protected function getConsumerTag()
223
    {
224 40
        return 'PHPPROCESS_' . getmypid();
225
    }
226
227
    /**
228
     * Verifies exchange name meets the 0.9.1 protocol standard.
229
     *
230
     * letters, digits, hyphen, underscore, period, or colon
231
     *
232
     * @param string $exchangeName
233
     * @return bool
234
     */
235 125
    private function isValidExchangeName($exchangeName)
236
    {
237 125
        return preg_match('/^[A-Za-z0-9_\-\.\;]*$/', $exchangeName);
238
    }
239
240
    /**
241
     * @param string $key
242
     * @param string $value
243
     */
244 35
    public function setParameter($key, $value)
245
    {
246 35
        $this->parameters[$key] = $value;
247 35
    }
248
249
    /**
250
     * @return array
251
     */
252 35
    public function getParameters()
253
    {
254 35
        return $this->parameters;
255
    }
256
}
257