Passed
Pull Request — master (#190)
by Dmitriy
04:22 queued 01:36
created

Queue::withMiddlewaresAdded()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 9
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 5
nc 1
nop 1
dl 0
loc 9
ccs 7
cts 7
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Yiisoft\Queue;
6
7
use Psr\Log\LoggerInterface;
8
use Yiisoft\Queue\Adapter\AdapterInterface;
9
use Yiisoft\Queue\Cli\LoopInterface;
10
use Yiisoft\Queue\Enum\JobStatus;
11
use Yiisoft\Queue\Exception\AdapterConfiguration\AdapterNotConfiguredException;
12
use Yiisoft\Queue\Message\IdEnvelope;
13
use Yiisoft\Queue\Message\MessageInterface;
14
use Yiisoft\Queue\Middleware\AdapterHandler;
15
use Yiisoft\Queue\Middleware\MessageHandlerInterface;
16
use Yiisoft\Queue\Middleware\MiddlewareInterface;
17
use Yiisoft\Queue\Middleware\MiddlewareDispatcher;
18
use Yiisoft\Queue\Middleware\Request;
19
use Yiisoft\Queue\Worker\WorkerInterface;
20
21
final class Queue implements QueueInterface
22
{
23
    /**
24
     * @var array|array[]|callable[]|MiddlewareInterface[]|string[]
25
     */
26
    private array $middlewareDefinitions;
27
    private AdapterHandler $adapterHandler;
28
29 13
    public function __construct(
30
        private WorkerInterface $worker,
31
        private LoopInterface $loop,
32
        private LoggerInterface $logger,
33
        private MiddlewareDispatcher $pushMiddlewareDispatcher,
34
        private ?AdapterInterface $adapter = null,
35
        private string $channelName = QueueFactoryInterface::DEFAULT_CHANNEL_NAME,
36
        MiddlewareInterface|callable|array|string ...$middlewareDefinitions
37
    ) {
38 13
        $this->middlewareDefinitions = $middlewareDefinitions;
39 13
        $this->adapterHandler = new AdapterHandler();
40
    }
41
42 2
    public function getChannelName(): string
43
    {
44 2
        return $this->channelName;
45
    }
46
47 9
    public function push(
48
        MessageInterface $message,
49
        MiddlewareInterface|callable|array|string ...$middlewareDefinitions
50
    ): MessageInterface {
51 9
        $this->logger->debug(
52 9
            'Preparing to push message with data "{data}" and metadata: "{metadata}.',
53 9
            ['data' => json_encode($message->getData()), 'metadata' => json_encode($message->getMetadata())]
54 9
        );
55
56 9
        $request = new Request($message, $this);
57 9
        $message = $this->pushMiddlewareDispatcher
58 9
            ->dispatch($request, $this->createHandler($middlewareDefinitions))
59 9
            ->getMessage();
60
61 8
        $messageId = $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null';
62 8
        $this->logger->info(
63 8
            'Pushed message id: "{id}".',
64 8
            ['id' => $messageId]
65 8
        );
66
67 8
        return $message;
68
    }
69
70 5
    public function run(int $max = 0): int
71
    {
72 5
        $this->checkAdapter();
73
74 4
        $this->logger->debug('Start processing queue messages.');
75 4
        $count = 0;
76
77 4
        $handlerCallback = function (MessageInterface $message) use (&$max, &$count): bool {
78 4
            if (($max > 0 && $max <= $count) || !$this->handle($message)) {
79 1
                return false;
80
            }
81 4
            $count++;
82
83 4
            return true;
84 4
        };
85
86
        /** @psalm-suppress PossiblyNullReference */
87 4
        $this->adapter->runExisting($handlerCallback);
0 ignored issues
show
Bug introduced by
The method runExisting() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

87
        $this->adapter->/** @scrutinizer ignore-call */ 
88
                        runExisting($handlerCallback);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
88
89 4
        $this->logger->info(
90 4
            'Processed {count} queue messages.',
91 4
            ['count' => $count]
92 4
        );
93
94 4
        return $count;
95
    }
96
97 1
    public function listen(): void
98
    {
99 1
        $this->checkAdapter();
100
101 1
        $this->logger->info('Start listening to the queue.');
102
        /** @psalm-suppress PossiblyNullReference */
103 1
        $this->adapter->subscribe(fn (MessageInterface $message) => $this->handle($message));
104 1
        $this->logger->info('Finish listening to the queue.');
105
    }
106
107 2
    public function status(string|int $id): JobStatus
108
    {
109 2
        $this->checkAdapter();
110
111
        /** @psalm-suppress PossiblyNullReference */
112 2
        return $this->adapter->status($id);
113
    }
114
115 9
    public function withAdapter(AdapterInterface $adapter): self
116
    {
117 9
        $new = clone $this;
118 9
        $new->adapter = $adapter;
119
120 9
        return $new;
121
    }
122
123 9
    public function getAdapter(): ?AdapterInterface
124
    {
125 9
        return $this->adapter;
126
    }
127
128 1
    public function withMiddlewares(MiddlewareInterface|callable|array|string ...$middlewareDefinitions): self
129
    {
130 1
        $instance = clone $this;
131 1
        $instance->middlewareDefinitions = $middlewareDefinitions;
132
133 1
        return $instance;
134
    }
135
136 1
    public function withMiddlewaresAdded(MiddlewareInterface|callable|array|string ...$middlewareDefinitions): self
137
    {
138 1
        $instance = clone $this;
139 1
        $instance->middlewareDefinitions = [
140 1
            ...array_values($instance->middlewareDefinitions),
141 1
            ...array_values($middlewareDefinitions),
142 1
        ];
143
144 1
        return $instance;
145
    }
146
147 2
    public function withChannelName(string $channel): self
148
    {
149 2
        $instance = clone $this;
150 2
        $instance->channelName = $channel;
151
152 2
        return $instance;
153
    }
154
155 5
    private function handle(MessageInterface $message): bool
156
    {
157 5
        $this->worker->process($message, $this);
158
159 5
        return $this->loop->canContinue();
160
    }
161
162 7
    private function checkAdapter(): void
163
    {
164 7
        if ($this->adapter === null) {
165 1
            throw new AdapterNotConfiguredException();
166
        }
167
    }
168
169 9
    private function createHandler(array $middlewares): MessageHandlerInterface
170
    {
171 9
        return new class (
172 9
            $this->adapterHandler,
173 9
            $this->pushMiddlewareDispatcher,
174 9
            array_merge($this->middlewareDefinitions, $middlewares)
175 9
        ) implements MessageHandlerInterface {
176
            public function __construct(
177
                private AdapterHandler $adapterHandler,
178
                private MiddlewareDispatcher $dispatcher,
179
                private array $middlewares,
180
            ) {
181 9
            }
182
183
            public function handle(Request $request): Request
184
            {
185 9
                return $this->dispatcher
186 9
                    ->withMiddlewares($this->middlewares)
187 9
                    ->dispatch($request, $this->adapterHandler);
188
            }
189 9
        };
190
    }
191
}
192