anonymous//src/Queue.php$0   A
last analyzed

Complexity

Total Complexity 2

Size/Duplication

Total Lines 17
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
dl 0
loc 17
ccs 9
cts 9
cp 1
rs 10
c 0
b 0
f 0
wmc 2
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Yiisoft\Queue;
6
7
use Psr\Log\LoggerInterface;
8
use Yiisoft\Queue\Adapter\AdapterInterface;
9
use Yiisoft\Queue\Cli\LoopInterface;
10
use Yiisoft\Queue\Enum\JobStatus;
11
use Yiisoft\Queue\Exception\AdapterConfiguration\AdapterNotConfiguredException;
12
use Yiisoft\Queue\Message\MessageInterface;
13
use Yiisoft\Queue\Middleware\Push\AdapterPushHandler;
14
use Yiisoft\Queue\Middleware\Push\MessageHandlerPushInterface;
15
use Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface;
16
use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher;
17
use Yiisoft\Queue\Middleware\Push\PushRequest;
18
use Yiisoft\Queue\Worker\WorkerInterface;
19
use Yiisoft\Queue\Message\IdEnvelope;
20
21
final class Queue implements QueueInterface
22
{
23
    /**
24
     * @var array|array[]|callable[]|MiddlewarePushInterface[]|string[]
25
     */
26
    private array $middlewareDefinitions;
27
    private AdapterPushHandler $adapterPushHandler;
28
29 18
    public function __construct(
30
        private WorkerInterface $worker,
31
        private LoopInterface $loop,
32
        private LoggerInterface $logger,
33
        private PushMiddlewareDispatcher $pushMiddlewareDispatcher,
34
        private ?AdapterInterface $adapter = null,
35
        private string $channelName = QueueFactoryInterface::DEFAULT_CHANNEL_NAME,
36
        MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions
37
    ) {
38 18
        $this->middlewareDefinitions = $middlewareDefinitions;
39 18
        $this->adapterPushHandler = new AdapterPushHandler();
40
    }
41
42 2
    public function getChannelName(): string
43
    {
44 2
        return $this->channelName;
45
    }
46
47 9
    public function push(
48
        MessageInterface $message,
49
        MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions
50
    ): MessageInterface {
51 9
        $this->logger->debug(
52 9
            'Preparing to push message with handler name "{handlerName}".',
53 9
            ['handlerName' => $message->getHandlerName()]
54 9
        );
55
56 9
        $request = new PushRequest($message, $this->adapter);
57 9
        $message = $this->pushMiddlewareDispatcher
58 9
            ->dispatch($request, $this->createPushHandler($middlewareDefinitions))
59 9
            ->getMessage();
60
61 8
        $messageId = $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null';
62 8
        $this->logger->info(
63 8
            'Pushed message with handler name "{handlerName}" to the queue. Assigned ID #{id}.',
64 8
            ['handlerName' => $message->getHandlerName(), 'id' => $messageId]
65 8
        );
66
67 8
        return $message;
68
    }
69
70 5
    public function run(int $max = 0): int
71
    {
72 5
        $this->checkAdapter();
73
74 4
        $this->logger->debug('Start processing queue messages.');
75 4
        $count = 0;
76
77 4
        $handlerCallback = function (MessageInterface $message) use (&$max, &$count): bool {
78 4
            if (($max > 0 && $max <= $count) || !$this->handle($message)) {
79 1
                return false;
80
            }
81 4
            $count++;
82
83 4
            return true;
84 4
        };
85
86
        /** @psalm-suppress PossiblyNullReference */
87 4
        $this->adapter->runExisting($handlerCallback);
0 ignored issues
show
Bug introduced by
The method runExisting() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

87
        $this->adapter->/** @scrutinizer ignore-call */ 
88
                        runExisting($handlerCallback);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
88
89 4
        $this->logger->info(
90 4
            'Processed {count} queue messages.',
91 4
            ['count' => $count]
92 4
        );
93
94 4
        return $count;
95
    }
96
97 1
    public function listen(): void
98
    {
99 1
        $this->checkAdapter();
100
101 1
        $this->logger->info('Start listening to the queue.');
102
        /** @psalm-suppress PossiblyNullReference */
103 1
        $this->adapter->subscribe(fn (MessageInterface $message) => $this->handle($message));
104 1
        $this->logger->info('Finish listening to the queue.');
105
    }
106
107 2
    public function status(string|int $id): JobStatus
108
    {
109 2
        $this->checkAdapter();
110
111
        /** @psalm-suppress PossiblyNullReference */
112 2
        return $this->adapter->status($id);
113
    }
114
115 9
    public function withAdapter(AdapterInterface $adapter): self
116
    {
117 9
        $new = clone $this;
118 9
        $new->adapter = $adapter;
119
120 9
        return $new;
121
    }
122
123 1
    public function withMiddlewares(MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions): self
124
    {
125 1
        $instance = clone $this;
126 1
        $instance->middlewareDefinitions = $middlewareDefinitions;
127
128 1
        return $instance;
129
    }
130
131 1
    public function withMiddlewaresAdded(MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions): self
132
    {
133 1
        $instance = clone $this;
134 1
        $instance->middlewareDefinitions = [...array_values($instance->middlewareDefinitions), ...array_values($middlewareDefinitions)];
135
136 1
        return $instance;
137
    }
138
139 2
    public function withChannelName(string $channel): self
140
    {
141 2
        $instance = clone $this;
142 2
        $instance->channelName = $channel;
143
144 2
        return $instance;
145
    }
146
147 5
    private function handle(MessageInterface $message): bool
148
    {
149 5
        $this->worker->process($message, $this);
150
151 5
        return $this->loop->canContinue();
152
    }
153
154 7
    private function checkAdapter(): void
155
    {
156 7
        if ($this->adapter === null) {
157 1
            throw new AdapterNotConfiguredException();
158
        }
159
    }
160
161 9
    private function createPushHandler(array $middlewares): MessageHandlerPushInterface
162
    {
163 9
        return new class (
164 9
            $this->adapterPushHandler,
165 9
            $this->pushMiddlewareDispatcher,
166 9
            array_merge($this->middlewareDefinitions, $middlewares)
167 9
        ) implements MessageHandlerPushInterface {
168
            public function __construct(
169
                private AdapterPushHandler $adapterPushHandler,
170
                private PushMiddlewareDispatcher $dispatcher,
171
                private array $middlewares,
172
            ) {
173 9
            }
174
175
            public function handlePush(PushRequest $request): PushRequest
176
            {
177 9
                return $this->dispatcher
178 9
                    ->withMiddlewares($this->middlewares)
179 9
                    ->dispatch($request, $this->adapterPushHandler);
180
            }
181 9
        };
182
    }
183
}
184