GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

Consumer::withQos()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 5
rs 9.4285
cc 1
eloc 3
nc 1
nop 1
1
<?php
2
3
4
namespace AmqpWorkers;
5
6
use AmqpWorkers\Definition\Qos;
7
use AmqpWorkers\Definition\Queue;
8
use AmqpWorkers\Exception\ConsumerNotProperlyConfigured;
9
use AmqpWorkers\Exception\ProducerNotProperlyConfigured;
10
use AmqpWorkers\Worker\WorkerInterface;
11
use PhpAmqpLib\Connection\AbstractConnection;
12
use PhpAmqpLib\Message\AMQPMessage;
13
14
/**
15
 * Consumer is an object which connects to an AMQP queue, retrieves messages and sends them to a Worker.
16
 *
17
 * @package AmqpWorkers
18
 * @author Alex Panshin <[email protected]>
19
 * @since 1.0
20
 */
21
class Consumer
22
{
23
    /**
24
     * @var AbstractConnection
25
     */
26
    private $connection;
27
28
    /**
29
     * @var null|Qos
30
     */
31
    private $qos;
32
33
    /**
34
     * @var Queue
35
     */
36
    private $queue;
37
38
    /**
39
     * @var WorkerInterface
40
     */
41
    private $worker;
42
43
    /**
44
     * @var null|Producer
45
     */
46
    private $producer;
47
48
49
    /**
50
     * Simple fluent constructor to avoid weird-looking constructions like
51
     *
52
     * ```php
53
     * $qos = (new Consumer())->run();
54
     * ```
55
     *
56
     * @param AbstractConnection $connection
57
     * @return Consumer
58
     */
59
    public static function factory(AbstractConnection $connection)
60
    {
61
        return new static($connection);
62
    }
63
64
    /**
65
     * Consumer constructor.
66
     * @param AbstractConnection $connection
67
     */
68
    public function __construct(AbstractConnection $connection)
69
    {
70
        $this->connection = $connection;
71
    }
72
73
    /**
74
     * @param Qos $qos
75
     * @return Consumer $this
76
     */
77
    public function withQos(Qos $qos)
78
    {
79
        $this->qos = $qos;
80
        return $this;
81
    }
82
83
    /**
84
     * @param Queue $queue
85
     * @return Consumer $this
86
     */
87
    public function withQueue(Queue $queue)
88
    {
89
        $this->queue = $queue;
90
        return $this;
91
    }
92
93
    /**
94
     * @param WorkerInterface $worker
95
     * @return Consumer $this
96
     * @throws \AmqpWorkers\Exception\ConsumerNotProperlyConfigured
97
     */
98
    public function withWorker(WorkerInterface $worker)
99
    {
100
        $this->worker = $worker;
101
102
        return $this;
103
    }
104
105
    /**
106
     * If producer is set, Consumer will call `Producer::produce` with whatever Worker will return
107
     *
108
     * @param Producer $producer
109
     * @return Consumer $this
110
     */
111
    public function produceResult(Producer $producer)
112
    {
113
        $this->producer = $producer;
114
        return $this;
115
    }
116
117
    /**
118
     * Starts consumer. By default, this function can be terminated only by Worker's exception
119
     *
120
     * @throws ConsumerNotProperlyConfigured
121
     * @throws ProducerNotProperlyConfigured if given producer is not properly configured
122
     */
123
    public function run()
124
    {
125
        if ($this->queue === null) {
126
            throw new ConsumerNotProperlyConfigured('Queue is not defined.');
127
        }
128
        if ($this->worker === null) {
129
            throw new ConsumerNotProperlyConfigured('Worker is not defined.');
130
        }
131
132
        $this->producer && $this->producer->selfCheck();
133
134
        $wrapper = function (AMQPMessage $message) {
135
            $result = call_user_func($this->worker, $message->getBody());
136
137
            if ($result && $this->producer) {
138
                $this->producer->produce($result);
139
            }
140
141
            $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
142
        };
143
144
        $channel = $this->connection->channel();
145
146
        // declare queue here
147
        list ($passive, $durable, $exclusive, $autoDelete, $nowait, $arguments, $ticket) = $this->queue->listParams();
148
        $channel->queue_declare(
149
            $this->queue->getName(),
150
            $passive,
151
            $durable,
152
            $exclusive,
153
            $autoDelete,
154
            $nowait,
155
            $arguments,
156
            $ticket
157
        );
158
159
        if ($this->qos) {
160
            list ($size, $count, $global) = $this->qos->listParams();
161
            $channel->basic_qos($size, $count, $global);
162
        }
163
164
        $channel->basic_consume($this->queue->getName(), '', false, false, false, false, $wrapper, null, []);
165
166
        while (count($channel->callbacks)) {
167
            $channel->wait();
168
        }
169
170
        $channel->close();
171
    }
172
}
173