GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( c0447f...f0076f )
by Alex
54:49
created

Producer::produce()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 15
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 4
Bugs 0 Features 2
Metric Value
c 4
b 0
f 2
dl 0
loc 15
rs 9.4285
cc 2
eloc 10
nc 2
nop 1
1
<?php
2
3
4
namespace AmqpWorkers;
5
6
use AmqpWorkers\Definition\Exchange;
7
use AmqpWorkers\Definition\Queue;
8
use AmqpWorkers\Exception\ProducerNotProperlyConfigured;
9
use PhpAmqpLib\Channel\AMQPChannel;
10
use PhpAmqpLib\Connection\AbstractConnection;
11
use PhpAmqpLib\Message\AMQPMessage;
12
13
/**
14
 * Producer is an object that sends something into AMQP exchange.
15
 * It can format given messages somehow,
16
 * has very stupid (for now) batch messages processing
17
 * and can work with both queues and exchanges as a target.
18
 *
19
 * @package AmqpWorkers
20
 * @author Alex Panshin <[email protected]>
21
 * @since 1.0
22
 */
23
class Producer
24
{
25
    /**
26
     * @var callable
27
     */
28
    private $formatter;
29
30
    /**
31
     * @var bool
32
     */
33
    private $isExchange;
34
35
    /**
36
     * @var Queue
37
     */
38
    protected $queue;
39
40
    /**
41
     * @var Exchange
42
     */
43
    protected $exchange;
44
45
    /**
46
     * @var AbstractConnection
47
     */
48
    private $connection;
49
50
    /**
51
     * Simple fluent constructor to avoid weird-looking constructions like
52
     *
53
     * ```php
54
     * $qos = (new Producer())->toExchange();
55
     * ```
56
     *
57
     * @param AbstractConnection $connection
58
     * @return Producer
59
     */
60
    public static function factory(AbstractConnection $connection)
61
    {
62
        return new static($connection);
63
    }
64
65
    /**
66
     * Producer constructor.
67
     * @param AbstractConnection $connection
68
     */
69
    public function __construct(AbstractConnection $connection)
70
    {
71
        $this->connection = $connection;
72
73
        $this->withFormatter(function ($message) {
74
            return (string)$message;
75
        });
76
    }
77
78
    /**
79
     * @param Exchange $exchange
80
     * @return Producer $this
81
     */
82
    public function withExchange(Exchange $exchange)
83
    {
84
        $this->exchange = $exchange;
85
        $this->isExchange = true;
86
        return $this;
87
    }
88
89
    /**
90
     * @param Queue $queue
91
     * @return Producer $this
92
     */
93
    public function withQueue(Queue $queue)
94
    {
95
        $this->queue = $queue;
96
        $this->isExchange = false;
97
        return $this;
98
    }
99
100
    /**
101
     * @return bool
102
     */
103
    protected function isExchange()
104
    {
105
        return $this->isExchange;
106
    }
107
108
    /**
109
     * @param \Closure|callable $formatter
110
     * @return Producer $this
111
     * @throws \AmqpWorkers\Exception\ProducerNotProperlyConfigured
112
     */
113
    public function withFormatter($formatter)
114
    {
115
        if (!is_callable($formatter)) {
116
            throw new ProducerNotProperlyConfigured('Formatter must be a callable.');
117
        }
118
        $this->formatter = $formatter;
119
120
        return $this;
121
    }
122
123
    /**
124
     * @param mixed $payload
125
     * @todo: maybe add properties array as second parameter
126
     * @throws \AmqpWorkers\Exception\ProducerNotProperlyConfigured if queue nor exchange not given.
127
     */
128
    public function produce($payload)
129
    {
130
        if ($this->isExchange()) {
131
            $this->getChannel()->basic_publish(
132
                $this->createMessage($payload),
133
                $this->exchange->getName()
134
            );
135
        } else {
136
            $this->getChannel()->basic_publish(
137
                $this->createMessage($payload),
138
                '',
139
                $this->queue->getName()
140
            );
141
        }
142
    }
143
144
    /**
145
     * @return AMQPChannel
146
     * @todo: declare queue only once?
147
     * @throws \AmqpWorkers\Exception\ProducerNotProperlyConfigured
148
     */
149
    protected function getChannel()
150
    {
151
        if ($this->exchange === null && $this->queue === null) {
152
            throw new ProducerNotProperlyConfigured('Nor queue nor exchange given.');
153
        }
154
155
        $channel = $this->connection->channel();
156
157
        if ($this->isExchange()) {
158
            list ($passive, $durable, $autoDelete, $internal, $nowait, $arguments, $ticket) =
159
                $this->exchange->listParams();
160
161
            $channel->exchange_declare(
162
                $this->exchange->getName(),
163
                $this->exchange->getType(),
164
                $passive,
165
                $durable,
166
                $autoDelete,
167
                $internal,
168
                $nowait,
169
                $arguments,
170
                $ticket
171
            );
172
        } else {
173
            list ($passive, $durable, $exclusive, $autoDelete, $nowait, $arguments, $ticket) =
174
                $this->queue->listParams();
175
176
            $channel->queue_declare(
177
                $this->queue->getName(),
178
                $passive,
179
                $durable,
180
                $exclusive,
181
                $autoDelete,
182
                $nowait,
183
                $arguments,
184
                $ticket
185
            );
186
        }
187
188
        return $channel;
189
    }
190
191
    /**
192
     * @param mixed $payload
193
     * @return AMQPMessage
194
     */
195
    protected function createMessage($payload)
196
    {
197
        return new AMQPMessage(call_user_func($this->formatter, $payload));
198
    }
199
}
200