GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — develop ( a71c17...924269 )
by Baptiste
02:08
created

Consumer::shouldConsume()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 12
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 3.0416

Importance

Changes 0
Metric Value
dl 0
loc 12
c 0
b 0
f 0
ccs 5
cts 6
cp 0.8333
rs 9.4285
cc 3
eloc 6
nc 3
nop 0
crap 3.0416
1
<?php
2
declare(strict_types = 1);
3
4
namespace Innmind\AMQP\Client\Channel\Basic\Consumer;
5
6
use Innmind\AMQP\{
7
    Client\Channel\Basic\Consumer as ConsumerInterface,
8
    Model\Basic\Consume,
9
    Model\Basic\Ack,
10
    Model\Basic\Reject as RejectCommand,
11
    Model\Basic\Cancel as CancelCommand,
12
    Model\Basic\Message\Locked,
13
    Transport\Connection,
14
    Transport\Connection\MessageReader,
15
    Transport\Frame\Channel,
16
    Transport\Frame\Type,
17
    Exception\MessageFromAnotherConsumerReceived,
18
    Exception\Requeue,
19
    Exception\Reject,
20
    Exception\Cancel
21
};
22
23
final class Consumer implements ConsumerInterface
24
{
25
    private $connection;
26
    private $command;
27
    private $channel;
28
    private $consumerTag;
29
    private $read;
30
    private $take;
31
    private $predicate;
32
    private $canceled = false;
33
34 7
    public function __construct(
35
        Connection $connection,
36
        Consume $command,
37
        Channel $channel,
38
        string $consumerTag
39
    ) {
40 7
        $this->connection = $connection;
41 7
        $this->command = $command;
42 7
        $this->channel = $channel;
43 7
        $this->consumerTag = $consumerTag;
44 7
        $this->read = new MessageReader;
45 6
        $this->predicate = static function(): bool {
46 6
            return true; //by default consume all messages
47
        };
48 7
    }
49
50
    /**
51
     * {@inheritdoc}
52
     */
53 7
    public function foreach(callable $consume): void
54
    {
55 7
        if ($this->canceled) {
56 2
            return;
57
        }
58
59 7
        while ($this->shouldConsume()) {
60
            try {
61 7
                $frame = $this->connection->wait('basic.deliver');
62 7
                $message = ($this->read)($this->connection);
63 7
                $message = new Locked($message);
64 7
                $consumerTag = (string) $frame->values()->first()->original();
65 7
                $deliveryTag = $frame->values()->get(1)->original()->value();
66 7
                $redelivered = $frame->values()->get(2)->original()->first();
67 7
                $exchange = (string) $frame->values()->get(3)->original();
68 7
                $routingKey = (string) $frame->values()->get(4)->original();
69
70 7
                if ($this->consumerTag !== $consumerTag) {
71
                    throw new MessageFromAnotherConsumerReceived(
72
                        $message,
73
                        $consumerTag,
74
                        $deliveryTag,
75
                        $redelivered,
76
                        $exchange,
77
                        $routingKey
78
                    );
79
                }
80
81 7
                $toProcess = ($this->predicate)(
82 7
                    $message,
83 7
                    $redelivered,
84 7
                    $exchange,
85 7
                    $routingKey
86
                );
87
88 7
                if (!$toProcess) {
89 1
                    throw new Requeue;
90
                }
91
92
                //flag before the consume call as the "take" applies to the number
93
                //of messages that match the predicate and the number of
94
                //successfully consumed messages
95 7
                $this->flagConsumed();
96
97 7
                $consume(
98 7
                    $message,
99 7
                    $redelivered,
100 7
                    $exchange,
101 7
                    $routingKey
102
                );
103
104 5 View Code Duplication
                if (!$this->command->shouldAutoAcknowledge()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
105 5
                    $this->connection->send(
106 5
                        $this->connection->protocol()->basic()->ack(
107 5
                            $this->channel,
108 5
                            new Ack($deliveryTag)
109
                        )
110
                    );
111
                }
112 5
            } catch (Reject $e) {
113 1
                $this->reject($deliveryTag);
1 ignored issue
show
Bug introduced by
The variable $deliveryTag does not seem to be defined for all execution paths leading up to this point.

If you define a variable conditionally, it can happen that it is not defined for all execution paths.

Let’s take a look at an example:

function myFunction($a) {
    switch ($a) {
        case 'foo':
            $x = 1;
            break;

        case 'bar':
            $x = 2;
            break;
    }

    // $x is potentially undefined here.
    echo $x;
}

In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined.

Available Fixes

  1. Check for existence of the variable explicitly:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        if (isset($x)) { // Make sure it's always set.
            echo $x;
        }
    }
    
  2. Define a default value for the variable:

    function myFunction($a) {
        $x = ''; // Set a default which gets overridden for certain paths.
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        echo $x;
    }
    
  3. Add a value for the missing path:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
    
            // We add support for the missing case.
            default:
                $x = '';
                break;
        }
    
        echo $x;
    }
    
Loading history...
114 4
            } catch (Requeue $e) {
115 2
                $this->requeue($deliveryTag);
116 2
            } catch (Cancel $e) {
117 1
                break;
118 1
            } catch (\Throwable $e) {
119 1
                $this->cancel();
120
121 1
                throw $e;
122
            }
123
        }
124
125 6
        $this->cancel();
126 6
    }
127
128
    /**
129
     * {@inheritdoc}
130
     */
131 5
    public function take(int $count): ConsumerInterface
132
    {
133 5
        $this->take = $count;
134
135 5
        return $this;
136
    }
137
138
    /**
139
     * {@inheritdoc}
140
     */
141 1
    public function filter(callable $predicate): ConsumerInterface
142
    {
143 1
        $this->predicate = $predicate;
144
145 1
        return $this;
146
    }
147
148 7
    private function shouldConsume(): bool
149
    {
150 7
        if ($this->canceled()) {
151
            return false;
152
        }
153
154 7
        if (is_null($this->take)) {
155 2
            return true;
156
        }
157
158 5
        return $this->take > 0;
159
    }
160
161 7
    private function flagConsumed(): void
162
    {
163 7
        if (is_null($this->take)) {
164 2
            return;
165
        }
166
167 5
        --$this->take;
168 5
    }
169
170 1
    private function reject(int $deliveryTag): void
171
    {
172 1
        $this->connection->send(
173 1
            $this->connection->protocol()->basic()->reject(
174 1
                $this->channel,
175 1
                new RejectCommand($deliveryTag)
176
            )
177
        );
178 1
    }
179
180 5
    private function requeue(int $deliveryTag): void
181
    {
182 5
        $this->connection->send(
183 5
            $this->connection->protocol()->basic()->reject(
184 5
                $this->channel,
185 5
                RejectCommand::requeue($deliveryTag)
186
            )
187
        );
188 5
    }
189
190 7
    private function canceled(): bool
191
    {
192 7
        if ($this->canceled) {
193
            return true;
194
        }
195
196 7
        return !$this->connection->opened();
197
    }
198
199 7
    private function cancel(): void
200
    {
201 7
        if ($this->canceled()) {
202
            return;
203
        }
204
205 7
        $this->connection->send($this->connection->protocol()->basic()->cancel(
206 7
            $this->channel,
207 7
            new CancelCommand($this->consumerTag)
208
        ));
209
210 7
        $deliver = $this->connection->protocol()->method('basic.deliver');
211 7
        $expected = $this->connection->protocol()->method('basic.cancel-ok');
212
213
        do {
214 7
            $frame = $this->connection->wait();
215
216
            if (
217 7
                $frame->type() === Type::method() &&
218 7
                $frame->method()->equals($deliver)
219
            ) {
220 5
                $message = ($this->read)($this->connection);
0 ignored issues
show
Unused Code introduced by
$message is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
221 5
                $this->requeue($frame->values()->get(1)->original()->value());
222
            }
223 7
        } while (!$frame->method()->equals($expected));
224
225 7
        $this->canceled = true;
226 7
    }
227
}
228