Passed
Push — master ( 61f3bb...4f60fb )
by PHPinnacle
03:13 queued 01:31
created

Consumer::runCallbacks()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 21
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 14
nc 2
nop 0
dl 0
loc 21
rs 9.7998
c 0
b 0
f 0
1
<?php
2
/**
3
 * This file is part of PHPinnacle/Ridge.
4
 *
5
 * (c) PHPinnacle Team <[email protected]>
6
 *
7
 * For the full copyright and license information, please view the LICENSE
8
 * file that was distributed with this source code.
9
 */
10
11
declare(strict_types = 1);
12
13
namespace PHPinnacle\Ridge;
14
15
use function Amp\asyncCall;
16
17
final class Consumer
18
{
19
    /**
20
     * @var Channel
21
     */
22
    private $channel;
23
24
    /**
25
     * @var Buffer
26
     */
27
    private $buffer;
28
29
    /**
30
     * @var callable[]
31
     */
32
    private $callbacks = [];
33
34
    /**
35
     * @var Protocol\BasicDeliverFrame
36
     */
37
    private $deliver;
38
39
    /**
40
     * @var Protocol\ContentHeaderFrame
41
     */
42
    private $header;
43
44
    /**
45
     * @var int
46
     */
47
    private $remaining = 0;
48
49
    /**
50
     * @param Channel $channel
51
     */
52
    public function __construct(Channel $channel)
53
    {
54
        $this->channel = $channel;
55
        $this->buffer  = new Buffer;
56
    }
57
58
    /**
59
     * @param string   $consumerTag
60
     * @param callable $callback
61
     */
62
    public function listen(string $consumerTag, callable $callback): void
63
    {
64
        $this->callbacks[$consumerTag] = $callback;
65
    }
66
67
    /**
68
     * @param string $consumerTag
69
     */
70
    public function cancel(string $consumerTag): void
71
    {
72
        unset($this->callbacks[$consumerTag]);
73
    }
74
75
    /**
76
     * @param Protocol\BasicDeliverFrame $frame
77
     *
78
     * @return void
79
     */
80
    public function onDeliver(Protocol\BasicDeliverFrame $frame): void
81
    {
82
        if (!isset($this->callbacks[$frame->consumerTag])) {
83
            return;
84
        }
85
86
        $this->deliver = $frame;
87
    }
88
89
    /**
90
     * @param Protocol\ContentHeaderFrame $frame
91
     *
92
     * @return void
93
     */
94
    public function onHeader(Protocol\ContentHeaderFrame $frame): void
95
    {
96
        if ($this->deliver === null) {
97
            return;
98
        }
99
100
        $this->header    = $frame;
101
        $this->remaining = $frame->bodySize;
102
103
        $this->runCallbacks();
104
    }
105
106
    /**
107
     * @param Protocol\ContentBodyFrame $frame
108
     *
109
     * @return void
110
     */
111
    public function onBody(Protocol\ContentBodyFrame $frame): void
112
    {
113
        if ($this->header === null) {
114
            return;
115
        }
116
117
        $this->buffer->append($frame->payload);
118
119
        $this->remaining -= $frame->size;
120
121
        if ($this->remaining < 0) {
122
            throw Exception\ChannelException::bodyOverflow($this->remaining);
123
        }
124
125
        $this->runCallbacks();
126
    }
127
128
    /**
129
     * @return void
130
     */
131
    private function runCallbacks(): void
132
    {
133
        if ($this->remaining !== 0) {
134
            return;
135
        }
136
137
        $consumer = $this->deliver->consumerTag;
138
        $message  = new Message(
139
            $this->buffer->flush(),
140
            $this->deliver->exchange,
141
            $this->deliver->routingKey,
142
            $this->deliver->consumerTag,
143
            $this->deliver->deliveryTag,
144
            $this->deliver->redelivered,
145
            $this->header->toArray()
146
        );
147
148
        $this->deliver = null;
149
        $this->header = null;
150
151
        asyncCall($this->callbacks[$consumer], $message, $this->channel);
152
    }
153
}
154