GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

Producer::factory()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 4
rs 10
cc 1
eloc 2
nc 1
nop 1
1
<?php
2
3
4
namespace AmqpWorkers;
5
6
use AmqpWorkers\Definition\Exchange;
7
use AmqpWorkers\Definition\Queue;
8
use AmqpWorkers\Exception\ProducerNotProperlyConfigured;
9
use PhpAmqpLib\Channel\AMQPChannel;
10
use PhpAmqpLib\Connection\AbstractConnection;
11
use PhpAmqpLib\Message\AMQPMessage;
12
13
/**
14
 * Producer is an object that sends something into AMQP exchange.
15
 * It can format given messages somehow and can work with both queues and exchanges as a target.
16
 *
17
 * @package AmqpWorkers
18
 * @author Alex Panshin <[email protected]>
19
 * @since 1.0
20
 */
21
class Producer implements ProducerInterface
22
{
23
    /**
24
     * @var callable
25
     */
26
    private $formatter;
27
28
    /**
29
     * @var bool
30
     */
31
    private $isExchange;
32
33
    /**
34
     * @var Queue
35
     */
36
    protected $queue;
37
38
    /**
39
     * @var Exchange
40
     */
41
    protected $exchange;
42
43
    /**
44
     * @var AbstractConnection
45
     */
46
    private $connection;
47
    /**
48
     * @var AMQPChannel
49
     */
50
    private $channel;
51
52
    /**
53
     * Simple fluent constructor to avoid weird-looking constructions like
54
     *
55
     * ```php
56
     * $qos = (new Producer())->toExchange();
57
     * ```
58
     *
59
     * @param AbstractConnection $connection
60
     * @return Producer $this
61
     */
62
    public static function factory(AbstractConnection $connection)
63
    {
64
        return new static($connection);
65
    }
66
67
    /**
68
     * Producer constructor.
69
     * @param AbstractConnection $connection
70
     */
71
    public function __construct(AbstractConnection $connection)
72
    {
73
        $this->connection = $connection;
74
75
        $this->withFormatter(function ($message) {
76
            return (string)$message;
77
        });
78
    }
79
80
    /**
81
     * @param Exchange $exchange
82
     * @return Producer $this
83
     */
84
    public function withExchange(Exchange $exchange)
85
    {
86
        $this->exchange = $exchange;
87
        $this->isExchange = true;
88
        $this->channel = null;
89
        return $this;
90
    }
91
92
    /**
93
     * @param Queue $queue
94
     * @return Producer $this
95
     */
96
    public function withQueue(Queue $queue)
97
    {
98
        $this->queue = $queue;
99
        $this->isExchange = false;
100
        $this->channel = null;
101
        return $this;
102
    }
103
104
    /**
105
     * @param \Closure|callable $formatter
106
     * @return Producer $this
107
     * @throws ProducerNotProperlyConfigured
108
     */
109
    public function withFormatter($formatter)
110
    {
111
        if (!is_callable($formatter)) {
112
            throw new ProducerNotProperlyConfigured('Formatter must be a callable.');
113
        }
114
        $this->formatter = $formatter;
115
116
        return $this;
117
    }
118
119
    /**
120
     * @param mixed $payload
121
     * @todo: maybe add properties array as second parameter
122
     * @throws ProducerNotProperlyConfigured if queue nor exchange not given.
123
     */
124
    public function produce($payload)
125
    {
126
        if ($this->isExchange()) {
127
            $this->getChannel()->basic_publish(
128
                $this->createMessage($payload),
129
                $this->exchange->getName()
130
            );
131
        } else {
132
            $this->getChannel()->basic_publish(
133
                $this->createMessage($payload),
134
                '',
135
                $this->queue->getName()
136
            );
137
        }
138
    }
139
140
    /**
141
     * Returns `true` if producer is properly configured. Throws exception otherwise.
142
     * Function is public just because Consumer needs to check if given producer configured before consuming the queue.
143
     *
144
     * @return bool
145
     * @throws ProducerNotProperlyConfigured
146
     */
147
    public function selfCheck()
148
    {
149
        if ($this->exchange === null && $this->queue === null) {
150
            throw new ProducerNotProperlyConfigured('Nor queue nor exchange given.');
151
        }
152
153
        return true;
154
    }
155
156
    /**
157
     * @return AMQPChannel
158
     * @throws ProducerNotProperlyConfigured
159
     */
160
    protected function getChannel()
161
    {
162
        $this->selfCheck();
163
164
        if (!$this->channel) {
165
            $this->channel = $this->connection->channel();
166
167
            if ($this->isExchange()) {
168
                list ($passive, $durable, $autoDelete, $internal, $nowait, $arguments, $ticket) =
169
                    $this->exchange->listParams();
170
171
                $this->channel->exchange_declare(
172
                    $this->exchange->getName(),
173
                    $this->exchange->getType(),
174
                    $passive,
175
                    $durable,
176
                    $autoDelete,
177
                    $internal,
178
                    $nowait,
179
                    $arguments,
180
                    $ticket
181
                );
182
            } else {
183
                list ($passive, $durable, $exclusive, $autoDelete, $nowait, $arguments, $ticket) =
184
                    $this->queue->listParams();
185
186
                $this->channel->queue_declare(
187
                    $this->queue->getName(),
188
                    $passive,
189
                    $durable,
190
                    $exclusive,
191
                    $autoDelete,
192
                    $nowait,
193
                    $arguments,
194
                    $ticket
195
                );
196
            }
197
        }
198
199
        return $this->channel;
200
    }
201
202
    /**
203
     * @param mixed $payload
204
     * @return AMQPMessage
205
     */
206
    protected function createMessage($payload)
207
    {
208
        return new AMQPMessage(call_user_func($this->formatter, $payload));
209
    }
210
211
    /**
212
     * @return bool
213
     */
214
    protected function isExchange()
215
    {
216
        return $this->isExchange;
217
    }
218
}
219