Carrot::producer()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 4
rs 10
cc 1
eloc 2
nc 1
nop 2
1
<?php
2
/**
3
 * The MIT License (MIT)
4
 *
5
 * Copyright (c) 2016 Krishnaprasad MG <[email protected]>
6
 *
7
 * Permission is hereby granted, free of charge, to any person obtaining a copy
8
 * of this software and associated documentation files (the "Software"), to deal
9
 * in the Software without restriction, including without limitation the rights
10
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11
 * copies of the Software, and to permit persons to whom the Software is
12
 * furnished to do so, subject to the following conditions:
13
 *
14
 * The above copyright notice and this permission notice shall be included in all
15
 * copies or substantial portions of the Software.
16
 *
17
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23
 * SOFTWARE.
24
 */
25
26
namespace Sunspikes\Carrot;
27
28
use Sunspikes\Carrot\Exception\CarrotException;
29
use Sunspikes\Carrot\Exception\ConnectionException;
30
use Sunspikes\Carrot\Producer\Producer;
31
use Sunspikes\Carrot\Consumer\Consumer;
32
use Sunspikes\Carrot\Consumer\ConsumerInterface;
33
use Sunspikes\Carrot\Producer\ProducerInterface;
34
use PhpAmqpLib\Connection\AMQPStreamConnection;
35
36
/**
37
 * Queue builder
38
 */
39
class Carrot implements QueueInterface
40
{
41
    protected $channel;
42
    protected $exchange;
43
    protected $type;
44
    protected $config;
45
46
    /**
47
     * @param string $exchange
48
     * @param string $type
49
     * @param array $config
50
     * @throws ConnectionException
51
     */
52
    public function __construct($exchange, $type = 'direct', $config = [])
53
    {
54
        $this->config = $this->buildConfig($config);
55
        $this->exchange = $exchange;
56
        $this->type = $type;
57
58
        try {
59
            $connection = new AMQPStreamConnection(
60
                $this->config['host'],
61
                $this->config['port'],
62
                $this->config['username'],
63
                $this->config['password'],
64
                $this->config['vhost'],
65
                $this->config['connection']['insist'],
66
                $this->config['connection']['login_method'],
67
                $this->config['connection']['login_response'],
68
                $this->config['connection']['locale'],
69
                $this->config['connection']['timeout'],
70
                $this->config['connection']['read_write_timeout'],
71
                $this->config['connection']['context'],
72
                $this->config['connection']['keepalive'],
73
                $this->config['connection']['heartbeat']
74
            );
75
76
            $this->channel = $connection->channel();
77
            $this->channel->exchange_declare(
78
                $exchange,
79
                $type,
80
                $this->config['exchange']['passive'],
81
                $this->config['exchange']['durable'],
82
                $this->config['exchange']['auto_delete'],
83
                $this->config['exchange']['internal'],
84
                $this->config['exchange']['no_wait'],
85
                $this->config['exchange']['arguments'],
86
                $this->config['exchange']['ticket']
87
            );
88
        } catch (\Exception $e) {
89
            throw new ConnectionException('Carrot failed to build connection: '. $e->getMessage());
90
        }
91
    }
92
93
    /**
94
     * Build the connection configuration
95
     *
96
     * @param array $config
97
     * @return array
98
     */
99
    protected function buildConfig($config)
100
    {
101
        $defaultConfig = require __DIR__ . '/../config/config.php';
102
103
        return array_replace_recursive($defaultConfig, $config);
104
    }
105
106
    /**
107
     * @inheritdoc
108
     */
109
    public function getProducer()
110
    {
111
        $producer = new Producer($this->channel, $this->exchange);
112
        $producer->setConfig($this->config);
113
114
        return $producer;
115
    }
116
117
    /**
118
     * @inheritdoc
119
     */
120
    public function getConsumer()
121
    {
122
        $consumer = new Consumer($this->channel, $this->exchange);
123
        $consumer->setConfig($this->config);
124
125
        return $consumer;
126
    }
127
128
    /**
129
     * Configure the connection
130
     *
131
     * @param array $config
132
     * @return array
133
     */
134
    public static function config($config = [])
135
    {
136
        static $connectionConfig;
137
138
        if (! empty($config)) {
139
            $connectionConfig = $config;
140
        }
141
142
        return $connectionConfig;
143
    }
144
145
    /**
146
     * @inheritdoc
147
     */
148
    public static function producer($exchange, $exchangeType = 'direct')
149
    {
150
        return static::buildChannel('producer', $exchange, $exchangeType);
0 ignored issues
show
Bug Compatibility introduced by
The expression static::buildChannel('pr...change, $exchangeType); of type Sunspikes\Carrot\Consume...ducer\ProducerInterface adds the type Sunspikes\Carrot\Consumer\ConsumerInterface to the return on line 150 which is incompatible with the return type declared by the interface Sunspikes\Carrot\QueueInterface::producer of type Sunspikes\Carrot\Producer\ProducerInterface.
Loading history...
151
    }
152
153
    /**
154
     * @inheritdoc
155
     */
156
    public static function consumer($exchange, $exchangeType = 'direct')
157
    {
158
        return static::buildChannel('consumer', $exchange, $exchangeType);
0 ignored issues
show
Bug Compatibility introduced by
The expression static::buildChannel('co...change, $exchangeType); of type Sunspikes\Carrot\Consume...ducer\ProducerInterface adds the type Sunspikes\Carrot\Producer\ProducerInterface to the return on line 158 which is incompatible with the return type declared by the interface Sunspikes\Carrot\QueueInterface::consumer of type Sunspikes\Carrot\Consumer\ConsumerInterface.
Loading history...
159
    }
160
161
    /**
162
     * Build a producer/consumer channel
163
     *
164
     * @param string $type
165
     * @param string $exchange
166
     * @param string $exchangeType
167
     * @return ConsumerInterface|ProducerInterface
168
     * @throws CarrotException
169
     */
170
    public static function buildChannel($type, $exchange, $exchangeType)
171
    {
172
        static $channels = [];
173
174
        $config = static::config();
175
176
        try {
177
178
            if (!isset($channels[$type][$exchange])) {
179
                $queue = new static($exchange, $exchangeType, $config);
180
                $getMethod = 'get' . ucfirst($type);
181
                $channels[$type][$exchange] = $queue->$getMethod();
182
            }
183
        } catch (\Exception $e) {
184
            throw new CarrotException("Error in creating rabbitmq channel:" . $e->getMessage());
185
        }
186
187
        return $channels[$type][$exchange];
188
    }
189
}
190