GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 6ae2b1...a5de13 )
by Alex
03:49
created

Producer::factory()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 2
Metric Value
c 3
b 0
f 2
dl 0
loc 4
rs 10
cc 1
eloc 2
nc 1
nop 1
1
<?php
2
3
4
namespace AmqpWorkers;
5
6
use AmqpWorkers\Definition\Exchange;
7
use AmqpWorkers\Definition\Queue;
8
use AmqpWorkers\Exception\ProducerNotProperlyConfigured;
9
use PhpAmqpLib\Channel\AMQPChannel;
10
use PhpAmqpLib\Connection\AbstractConnection;
11
use PhpAmqpLib\Message\AMQPMessage;
12
13
/**
14
 * Producer is an object that sends something into AMQP exchange.
15
 * It can format given messages somehow and can work with both queues and exchanges as a target.
16
 *
17
 * @package AmqpWorkers
18
 * @author Alex Panshin <[email protected]>
19
 * @since 1.0
20
 */
21
class Producer
22
{
23
    /**
24
     * @var callable
25
     */
26
    private $formatter;
27
28
    /**
29
     * @var bool
30
     */
31
    private $isExchange;
32
33
    /**
34
     * @var Queue
35
     */
36
    protected $queue;
37
38
    /**
39
     * @var Exchange
40
     */
41
    protected $exchange;
42
43
    /**
44
     * @var AbstractConnection
45
     */
46
    private $connection;
47
48
    /**
49
     * Simple fluent constructor to avoid weird-looking constructions like
50
     *
51
     * ```php
52
     * $qos = (new Producer())->toExchange();
53
     * ```
54
     *
55
     * @param AbstractConnection $connection
56
     * @return Producer $this
57
     */
58
    public static function factory(AbstractConnection $connection)
59
    {
60
        return new static($connection);
61
    }
62
63
    /**
64
     * Producer constructor.
65
     * @param AbstractConnection $connection
66
     */
67
    public function __construct(AbstractConnection $connection)
68
    {
69
        $this->connection = $connection;
70
71
        $this->withFormatter(function ($message) {
72
            return (string)$message;
73
        });
74
    }
75
76
    /**
77
     * @param Exchange $exchange
78
     * @return Producer $this
79
     */
80
    public function withExchange(Exchange $exchange)
81
    {
82
        $this->exchange = $exchange;
83
        $this->isExchange = true;
84
        return $this;
85
    }
86
87
    /**
88
     * @param Queue $queue
89
     * @return Producer $this
90
     */
91
    public function withQueue(Queue $queue)
92
    {
93
        $this->queue = $queue;
94
        $this->isExchange = false;
95
        return $this;
96
    }
97
98
    /**
99
     * @param \Closure|callable $formatter
100
     * @return Producer $this
101
     * @throws ProducerNotProperlyConfigured
102
     */
103
    public function withFormatter($formatter)
104
    {
105
        if (!is_callable($formatter)) {
106
            throw new ProducerNotProperlyConfigured('Formatter must be a callable.');
107
        }
108
        $this->formatter = $formatter;
109
110
        return $this;
111
    }
112
113
    /**
114
     * @param mixed $payload
115
     * @todo: maybe add properties array as second parameter
116
     * @throws ProducerNotProperlyConfigured if queue nor exchange not given.
117
     */
118
    public function produce($payload)
119
    {
120
        if ($this->isExchange()) {
121
            $this->getChannel()->basic_publish(
122
                $this->createMessage($payload),
123
                $this->exchange->getName()
124
            );
125
        } else {
126
            $this->getChannel()->basic_publish(
127
                $this->createMessage($payload),
128
                '',
129
                $this->queue->getName()
130
            );
131
        }
132
    }
133
134
    /**
135
     * Returns `true` if producer is properly configured. Throws exception otherwise.
136
     * Function is public just because Consumer needs to check if given producer configured before consuming the queue.
137
     *
138
     * @return bool
139
     * @throws ProducerNotProperlyConfigured
140
     */
141
    public function selfCheck()
142
    {
143
        if ($this->exchange === null && $this->queue === null) {
144
            throw new ProducerNotProperlyConfigured('Nor queue nor exchange given.');
145
        }
146
147
        return true;
148
    }
149
150
    /**
151
     * @return AMQPChannel
152
     * @throws ProducerNotProperlyConfigured
153
     */
154
    protected function getChannel()
155
    {
156
        $this->selfCheck();
157
158
        $channel = $this->connection->channel();
159
160
        if ($this->isExchange()) {
161
            list ($passive, $durable, $autoDelete, $internal, $nowait, $arguments, $ticket) =
162
                $this->exchange->listParams();
163
164
            $channel->exchange_declare(
165
                $this->exchange->getName(),
166
                $this->exchange->getType(),
167
                $passive,
168
                $durable,
169
                $autoDelete,
170
                $internal,
171
                $nowait,
172
                $arguments,
173
                $ticket
174
            );
175
        } else {
176
            list ($passive, $durable, $exclusive, $autoDelete, $nowait, $arguments, $ticket) =
177
                $this->queue->listParams();
178
179
            $channel->queue_declare(
180
                $this->queue->getName(),
181
                $passive,
182
                $durable,
183
                $exclusive,
184
                $autoDelete,
185
                $nowait,
186
                $arguments,
187
                $ticket
188
            );
189
        }
190
191
        return $channel;
192
    }
193
194
    /**
195
     * @param mixed $payload
196
     * @return AMQPMessage
197
     */
198
    protected function createMessage($payload)
199
    {
200
        return new AMQPMessage(call_user_func($this->formatter, $payload));
201
    }
202
203
    /**
204
     * @return bool
205
     */
206
    protected function isExchange()
207
    {
208
        return $this->isExchange;
209
    }
210
}
211