GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

Basic::ack()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 7
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 5
nc 1
nop 2
dl 0
loc 7
ccs 6
cts 6
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php
2
declare(strict_types = 1);
3
4
namespace Innmind\AMQP\Transport\Protocol\v091;
5
6
use Innmind\AMQP\{
7
    Model\Basic\Ack,
8
    Model\Basic\Cancel,
9
    Model\Basic\Consume,
10
    Model\Basic\Get,
11
    Model\Basic\Publish,
12
    Model\Basic\Qos,
13
    Model\Basic\Recover,
14
    Model\Basic\Reject,
15
    Model\Basic\Message,
16
    Model\Connection\MaxFrameSize,
17
    Transport\Frame,
18
    Transport\Frame\Type,
19
    Transport\Frame\Channel as FrameChannel,
20
    Transport\Frame\Method,
21
    Transport\Frame\Value,
22
    Transport\Frame\Value\UnsignedLongLongInteger,
23
    Transport\Frame\Value\UnsignedLongInteger,
24
    Transport\Frame\Value\Bits,
25
    Transport\Frame\Value\ShortString,
26
    Transport\Frame\Value\UnsignedShortInteger,
27
    Transport\Frame\Value\Table,
28
    Transport\Frame\Value\UnsignedOctet,
29
    Transport\Frame\Value\Timestamp,
30
    Transport\Protocol\Basic as BasicInterface,
31
    Transport\Protocol\ArgumentTranslator,
32
};
33
use Innmind\Math\Algebra\Integer;
34
use Innmind\Immutable\{
35
    Str,
36
    MapInterface,
37
    Map,
38
    StreamInterface,
39
    Stream,
40
};
41
42
final class Basic implements BasicInterface
43
{
44
    private $translate;
45
46 268
    public function __construct(ArgumentTranslator $translator)
47
    {
48 268
        $this->translate = $translator;
49 268
    }
50
51 22
    public function ack(FrameChannel $channel, Ack $command): Frame
52
    {
53 22
        return Frame::method(
54 22
            $channel,
55 22
            Methods::get('basic.ack'),
56 22
            UnsignedLongLongInteger::of(new Integer($command->deliveryTag())),
57 22
            new Bits($command->isMultiple())
58
        );
59
    }
60
61 20
    public function cancel(FrameChannel $channel, Cancel $command): Frame
62
    {
63 20
        return Frame::method(
64 20
            $channel,
65 20
            Methods::get('basic.cancel'),
66 20
            ShortString::of(new Str($command->consumerTag())),
67 20
            new Bits(!$command->shouldWait())
68
        );
69
    }
70
71 20
    public function consume(FrameChannel $channel, Consume $command): Frame
72
    {
73 20
        $consumerTag = '';
74
75 20
        if (!$command->shouldAutoGenerateConsumerTag()) {
76 6
            $consumerTag = $command->consumerTag();
77
        }
78
79 20
        return Frame::method(
80 20
            $channel,
81 20
            Methods::get('basic.consume'),
82 20
            new UnsignedShortInteger(new Integer(0)), //ticket (reserved)
83 20
            ShortString::of(new Str($command->queue())),
84 20
            ShortString::of(new Str($consumerTag)),
85 20
            new Bits(
86 20
                !$command->isLocal(),
87 20
                $command->shouldAutoAcknowledge(),
88 20
                $command->isExclusive(),
89 20
                !$command->shouldWait()
90
            ),
91 20
            $this->arguments($command->arguments())
92
        );
93
    }
94
95 32
    public function get(FrameChannel $channel, Get $command): Frame
96
    {
97 32
        return Frame::method(
98 32
            $channel,
99 32
            Methods::get('basic.get'),
100 32
            new UnsignedShortInteger(new Integer(0)), //ticket (reserved)
101 32
            ShortString::of(new Str($command->queue())),
102 32
            new Bits($command->shouldAutoAcknowledge())
103
        );
104
    }
105
106
    /**
107
     * {@inheritdoc}
108
     */
109 52
    public function publish(
110
        FrameChannel $channel,
111
        Publish $command,
112
        MaxFrameSize $maxFrameSize
113
    ): StreamInterface {
114 52
        $frames = Stream::of(
115 52
            Frame::class,
116 52
            Frame::method(
117 52
                $channel,
118 52
                Methods::get('basic.publish'),
119 52
                new UnsignedShortInteger(new Integer(0)), //ticket (reserved)
120 52
                ShortString::of(new Str($command->exchange())),
121 52
                ShortString::of(new Str($command->routingKey())),
122 52
                new Bits(
123 52
                    $command->mandatory(),
124 52
                    $command->immediate()
125
                )
126
            ),
127 52
            Frame::header(
128 52
                $channel,
129 52
                Methods::classId('basic'),
130 52
                UnsignedLongLongInteger::of(new Integer(
131 52
                    $command->message()->body()->length()
132
                )),
133 52
                ...$this->serializeProperties($command->message())
134
            )
135
        );
136
137
        //the "-8" is due to the content frame extra informations (type, channel and end flag)
138 52
        $chunk = $maxFrameSize->isLimited() ? ($maxFrameSize->toInt() - 8) : $command->message()->body()->length();
139
140
        return $command
141 52
            ->message()
142 52
            ->body()
143 52
            ->chunk($chunk)
144 52
            ->reduce(
145 52
                $frames,
146
                static function(Stream $frames, Str $chunk) use ($channel): Stream {
147 44
                    return $frames->add(Frame::body($channel, $chunk));
148 52
                }
149
            );
150
    }
151
152 4
    public function qos(FrameChannel $channel, Qos $command): Frame
153
    {
154 4
        return Frame::method(
155 4
            $channel,
156 4
            Methods::get('basic.qos'),
157 4
            UnsignedLongInteger::of(new Integer($command->prefetchSize())),
158 4
            UnsignedShortInteger::of(new Integer($command->prefetchCount())),
159 4
            new Bits($command->isGlobal())
160
        );
161
    }
162
163 2
    public function recover(FrameChannel $channel, Recover $command): Frame
164
    {
165 2
        return Frame::method(
166 2
            $channel,
167 2
            Methods::get('basic.recover'),
168 2
            new Bits($command->shouldRequeue())
169
        );
170
    }
171
172 24
    public function reject(FrameChannel $channel, Reject $command): Frame
173
    {
174 24
        return Frame::method(
175 24
            $channel,
176 24
            Methods::get('basic.reject'),
177 24
            UnsignedLongLongInteger::of(new Integer($command->deliveryTag())),
178 24
            new Bits($command->shouldRequeue())
179
        );
180
    }
181
182 28
    private function arguments(MapInterface $arguments): Table
183
    {
184 28
        return new Table(
185 28
            $arguments->reduce(
186 28
                new Map('string', Value::class),
187
                function(Map $carry, string $key, $value): Map {
188 10
                    return $carry->put(
189 10
                        $key,
190 10
                        ($this->translate)($value)
191
                    );
192 28
                }
193
            )
194
        );
195
    }
196
197 52
    private function serializeProperties(Message $message): array
198
    {
199 52
        $properties = [];
200 52
        $flagBits = 0;
201
202 52
        if ($message->hasContentType()) {
203 8
            $properties[] = ShortString::of(
204 8
                new Str((string) $message->contentType())
205
            );
206 8
            $flagBits |= (1 << 15);
207
        }
208
209 52
        if ($message->hasContentEncoding()) {
210 8
            $properties[] = ShortString::of(
211 8
                new Str((string) $message->contentEncoding())
212
            );
213 8
            $flagBits |= (1 << 14);
214
        }
215
216 52
        if ($message->hasHeaders()) {
217 8
            $properties[] = $this->arguments($message->headers());
218 8
            $flagBits |= (1 << 13);
219
        }
220
221 52
        if ($message->hasDeliveryMode()) {
222 8
            $properties[] = UnsignedOctet::of(
223 8
                new Integer($message->deliveryMode()->toInt())
224
            );
225 8
            $flagBits |= (1 << 12);
226
        }
227
228 52
        if ($message->hasPriority()) {
229 8
            $properties[] = UnsignedOctet::of(
230 8
                new Integer($message->priority()->toInt())
231
            );
232 8
            $flagBits |= (1 << 11);
233
        }
234
235 52
        if ($message->hasCorrelationId()) {
236 8
            $properties[] = ShortString::of(
237 8
                new Str((string) $message->correlationId())
238
            );
239 8
            $flagBits |= (1 << 10);
240
        }
241
242 52
        if ($message->hasReplyTo()) {
243 8
            $properties[] = ShortString::of(
244 8
                new Str((string) $message->replyTo())
245
            );
246 8
            $flagBits |= (1 << 9);
247
        }
248
249 52
        if ($message->hasExpiration()) {
250 8
            $properties[] = ShortString::of(
251 8
                new Str((string) $message->expiration()->milliseconds())
252
            );
253 8
            $flagBits |= (1 << 8);
254
        }
255
256 52
        if ($message->hasId()) {
257 8
            $properties[] = ShortString::of(
258 8
                new Str((string) $message->id())
259
            );
260 8
            $flagBits |= (1 << 7);
261
        }
262
263 52
        if ($message->hasTimestamp()) {
264 8
            $properties[] = new Timestamp($message->timestamp());
265 8
            $flagBits |= (1 << 6);
266
        }
267
268 52
        if ($message->hasType()) {
269 8
            $properties[] = ShortString::of(
270 8
                new Str((string) $message->type())
271
            );
272 8
            $flagBits |= (1 << 5);
273
        }
274
275 52
        if ($message->hasUserId()) {
276 8
            $properties[] = ShortString::of(
277 8
                new Str((string) $message->userId())
278
            );
279 8
            $flagBits |= (1 << 4);
280
        }
281
282 52
        if ($message->hasAppId()) {
283 8
            $properties[] = ShortString::of(
284 8
                new Str((string) $message->appId())
285
            );
286 8
            $flagBits |= (1 << 3);
287
        }
288
289 52
        \array_unshift(
290 52
            $properties,
291 52
            new UnsignedShortInteger(new Integer($flagBits))
292
        );
293
294 52
        return $properties;
295
    }
296
}
297