StreamReader::readMessages()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 9
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 7
c 0
b 0
f 0
nc 1
nop 0
dl 0
loc 9
ccs 8
cts 8
cp 1
crap 1
rs 10
1
<?php
2
/**
3
 * Copyright Aleksandar Panic
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 **/
17
18
namespace ArekX\DataStreamer;
19
20
21
use ArekX\DataStreamer\Contracts\Driver;
22
use ArekX\DataStreamer\Contracts\FailHandler;
23
use ArekX\DataStreamer\Contracts\MessageHandler;
24
use ArekX\DataStreamer\Contracts\MessageParser;
25
use ArekX\DataStreamer\Contracts\StreamSettings;
26
27
class StreamReader
28
{
29
    protected Driver $driver;
30
    protected MessageParser $parser;
31
    protected MessageHandler $handler;
32
    protected StreamSettings $settings;
33
    protected ?FailHandler $failHandler = null;
34
35
    protected $readFrom = Driver::FROM_START;
36
37
    protected $failedMessages = [];
38
39 4
    public function __construct(
40
        Driver $driver,
41
        MessageParser $parser,
42
        MessageHandler $handler,
43
        StreamSettings $settings,
44
        FailHandler $failHandler = null
45
    )
46
    {
47 4
        $this->driver = $driver;
48 4
        $this->parser = $parser;
49 4
        $this->handler = $handler;
50 4
        $this->settings = $settings;
51 4
        $this->failHandler = $failHandler;
52 4
    }
53
54
    /**
55
     * @codeCoverageIgnore
56
     */
57
    public function runLoop()
58
    {
59
        $this->initializeStream();
60
61
        while (true) {
62
            $this->processPendingMessages();
63
        }
64
    }
65
66 1
    public function getReadFrom(): string
67
    {
68 1
        return $this->readFrom;
69
    }
70
71 4
    public function processPendingMessages()
72
    {
73 4
        $this->failedMessages = [];
74
75 4
        $messages = $this->readMessages();
76
77 4
        if (empty($messages)) {
78 1
            if ($this->readFrom === Driver::FROM_START) {
79 1
                $this->readFrom = Driver::FROM_LATEST;
80
            }
81 1
            return;
82
        }
83
84 3
        $handledIds = [];
85
86 3
        foreach ($messages as $id => $rawMessage) {
87 3
            if ($this->processMessage($id, $rawMessage)) {
88 1
                $handledIds[] = $id;
89
            }
90
        }
91
92 2
        $this->driver->acknowledge(
93 2
            $this->settings->getStreamName(),
94 2
            $this->settings->getConsumerGroup(),
95
            $handledIds
96
        );
97
98 2
        if ($this->failHandler && !empty($this->failedMessages)) {
99 1
            $this->failHandler->handle($this->failedMessages);
100
        }
101 2
    }
102
103 3
    protected function processMessage($id, $rawMessage): bool
104
    {
105
        try {
106 3
            $this->handler->handle($this->parser->parse($id, $rawMessage));
107 1
            return true;
108 2
        } catch (\Exception $e) {
109 2
            if ($this->failHandler) {
110 1
                $this->pushErrorMessage($id, $rawMessage, $e);
111 1
                return false;
112
            }
113
114 1
            throw $e;
115
        }
116
    }
117
118 1
    protected function pushErrorMessage($id, $rawMessage, ?\Exception $exception = null)
119
    {
120 1
        $this->failedMessages[] = [
121 1
            'id' => $id,
122 1
            'raw' => $rawMessage,
123 1
            'error' => $exception
124
        ];
125 1
    }
126
127 4
    protected function readMessages(): array
128
    {
129 4
        return $this->driver->readMessages(
130 4
            $this->settings->getConsumerGroup(),
131 4
            $this->settings->getConsumerName(),
132 4
            $this->settings->getStreamName(),
133 4
            $this->readFrom,
134 4
            $this->settings->getMessageReadCount(),
135 4
            $this->settings->getMessageWaitTimeout()
136
        );
137
    }
138
139 4
    public function initializeStream(): void
140
    {
141 4
        $this->driver->createGroup($this->settings->getStreamName(), $this->settings->getConsumerGroup());
142
    }
143
}