SynchronousAdapter::__destruct()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 3
nc 1
nop 0
dl 0
loc 6
ccs 4
cts 4
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Yiisoft\Queue\Adapter;
6
7
use InvalidArgumentException;
8
use Yiisoft\Queue\Enum\JobStatus;
9
use Yiisoft\Queue\Message\MessageInterface;
10
use Yiisoft\Queue\QueueFactory;
11
use Yiisoft\Queue\QueueInterface;
12
use Yiisoft\Queue\Worker\WorkerInterface;
13
use Yiisoft\Queue\Message\IdEnvelope;
14
15
final class SynchronousAdapter implements AdapterInterface
16
{
17
    private array $messages = [];
18
    private int $current = 0;
19
20 14
    public function __construct(
21
        private WorkerInterface $worker,
22
        private QueueInterface $queue,
23
        private string $channel = QueueFactory::DEFAULT_CHANNEL_NAME,
24
    ) {
25 14
    }
26
27 4
    public function __destruct()
28
    {
29 4
        $this->runExisting(function (MessageInterface $message): bool {
30 1
            $this->worker->process($message, $this->queue);
31
32 1
            return true;
33 4
        });
34
    }
35
36 9
    public function runExisting(callable $handlerCallback): void
37
    {
38 9
        $result = true;
39 9
        while (isset($this->messages[$this->current]) && $result === true) {
40 7
            $result = $handlerCallback($this->messages[$this->current]);
41 7
            unset($this->messages[$this->current]);
42 7
            $this->current++;
43
        }
44
    }
45
46 4
    public function status(string|int $id): JobStatus
47
    {
48 4
        $id = (int) $id;
49
50 4
        if ($id < 0) {
51 1
            throw new InvalidArgumentException('This adapter IDs start with 0.');
52
        }
53
54 3
        if ($id < $this->current) {
55 1
            return JobStatus::done();
56
        }
57
58 3
        if (isset($this->messages[$id])) {
59 2
            return JobStatus::waiting();
60
        }
61
62 1
        throw new InvalidArgumentException('There is no message with the given ID.');
63
    }
64
65 9
    public function push(MessageInterface $message): MessageInterface
66
    {
67 9
        $key = count($this->messages) + $this->current;
68 9
        $this->messages[] = $message;
69
70 9
        return new IdEnvelope($message, $key);
71
    }
72
73 1
    public function subscribe(callable $handlerCallback): void
74
    {
75 1
        $this->runExisting($handlerCallback);
76
    }
77
78 3
    public function withChannel(string $channel): self
79
    {
80 3
        if ($channel === $this->channel) {
81 1
            return $this;
82
        }
83
84 2
        $new = clone $this;
85 2
        $new->channel = $channel;
86 2
        $new->messages = [];
87
88 2
        return $new;
89
    }
90
}
91