GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

Pheanstalk   C
last analyzed

Complexity

Total Complexity 53

Size/Duplication

Total Lines 443
Duplicated Lines 0 %

Test Coverage

Coverage 88.67%

Importance

Changes 6
Bugs 2 Features 0
Metric Value
wmc 53
eloc 118
dl 0
loc 443
ccs 133
cts 150
cp 0.8867
rs 6.96
c 6
b 2
f 0

33 Methods

Rating   Name   Duplication   Size   Complexity  
A bury() 0 3 1
A reconnect() 0 17 5
A touch() 0 3 1
A release() 0 7 1
A pauseTube() 0 3 1
A delete() 0 3 1
A kickJob() 0 3 1
A __construct() 0 3 1
A resumeTube() 0 4 1
A put() 0 11 1
A statsJob() 0 3 1
A kick() 0 5 1
A useTube() 0 8 2
A dispatch() 0 10 2
A reserve() 0 7 1
A peekDelayed() 0 10 2
A peek() 0 7 1
A reserveJob() 0 16 3
A listTubeUsed() 0 10 2
A createWithFactory() 0 3 1
A listTubesWatched() 0 10 2
A listTubes() 0 4 1
A withWatchedTube() 0 12 3
A withUsedTube() 0 8 1
A stats() 0 3 1
A create() 0 3 1
A peekBuried() 0 10 2
A statsTube() 0 3 1
A peekReady() 0 10 2
A ignore() 0 8 2
A watch() 0 8 2
A reserveWithTimeout() 0 15 3
A watchOnly() 0 10 2

How to fix   Complexity   

Complex Class

Complex classes like Pheanstalk often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Pheanstalk, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
declare(strict_types=1);
4
5
namespace Pheanstalk;
6
7
use Pheanstalk\Command\PeekCommand;
8
use Pheanstalk\Contract\CommandInterface;
9
use Pheanstalk\Contract\JobIdInterface;
10
use Pheanstalk\Contract\PheanstalkInterface;
11
use Pheanstalk\Contract\ResponseInterface;
12
use Pheanstalk\Contract\SocketFactoryInterface;
13
use Pheanstalk\Exception\DeadlineSoonException;
14
15
/**
16
 * Pheanstalk is a PHP client for the beanstalkd workqueue.
17
 */
18
class Pheanstalk implements PheanstalkInterface
19
{
20
    /**
21
     * @var Connection
22
     */
23
    private $connection;
24
    /**
25
     * @var ?string
26
     */
27
    private $using = PheanstalkInterface::DEFAULT_TUBE;
28
    /**
29
     * @var array<string,bool>
30
     */
31
    private $watching = [PheanstalkInterface::DEFAULT_TUBE => true];
32
33 27
    public function __construct(Connection $connection)
34
    {
35 27
        $this->connection = $connection;
36 27
    }
37
38
    /**
39
     * Static constructor that uses autodetection to choose an underlying socket implementation
40
     * @param string $host
41
     * @param int $port
42
     * @param int $connectTimeout
43
     * @return Pheanstalk
44
     */
45 23
    public static function create(string $host, int $port = 11300, int $connectTimeout = 10)
46
    {
47 23
        return self::createWithFactory(new SocketFactory($host, $port, $connectTimeout));
48
    }
49
50
    /**
51
     * Static constructor that uses a given socket factory for underlying connections
52
     * @param SocketFactoryInterface $factory
53
     * @return Pheanstalk
54
     */
55 23
    public static function createWithFactory(SocketFactoryInterface $factory)
56
    {
57 23
        return new self(new Connection($factory));
58
    }
59
60
    // ----------------------------------------
61
62
    /**
63
     * {@inheritdoc}
64
     */
65 2
    public function bury(JobIdInterface $job, int $priority = PheanstalkInterface::DEFAULT_PRIORITY): void
66
    {
67 2
        $this->dispatch(new Command\BuryCommand($job, $priority));
68 2
    }
69
70
    /**
71
     * {@inheritdoc}
72
     */
73 13
    public function delete(JobIdInterface $job): void
74
    {
75 13
        $this->dispatch(new Command\DeleteCommand($job));
76 13
    }
77
78
    /**
79
     * {@inheritdoc}
80
     */
81 12
    public function ignore(string $tube): PheanstalkInterface
82
    {
83 12
        if (isset($this->watching[$tube])) {
84 12
            $this->dispatch(new Command\IgnoreCommand($tube));
85 11
            unset($this->watching[$tube]);
86
        }
87
88 11
        return $this;
89
    }
90
91
    /**
92
     * {@inheritdoc}
93
     */
94 1
    public function kick(int $max): int
95
    {
96 1
        $response = $this->dispatch(new Command\KickCommand($max));
97
98 1
        return $response['kicked'];
99
    }
100
101
    /**
102
     * {@inheritdoc}
103
     */
104
    public function kickJob(JobIdInterface $job): void
105
    {
106
        $this->dispatch(new Command\KickJobCommand($job));
107
    }
108
109
    /**
110
     * {@inheritdoc}
111
     */
112 21
    public function listTubes(): array
113
    {
114 21
        return (array)$this->dispatch(
115 21
            new Command\ListTubesCommand()
116
        );
117
    }
118
119
    /**
120
     * {@inheritdoc}
121
     */
122 2
    public function listTubesWatched(bool $askServer = false): array
123
    {
124 2
        if ($askServer) {
125 1
            $response = (array)$this->dispatch(
126 1
                new Command\ListTubesWatchedCommand()
127
            );
128 1
            $this->watching = array_fill_keys($response, true);
129
        }
130
131 2
        return array_keys($this->watching);
132
    }
133
134
    /**
135
     * {@inheritdoc}
136
     */
137 3
    public function listTubeUsed(bool $askServer = false): string
138
    {
139 3
        if ($askServer) {
140 1
            $response = $this->dispatch(
141 1
                new Command\ListTubeUsedCommand()
142
            );
143 1
            $this->using = $response['tube'];
144
        }
145
146 3
        return $this->using;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this->using could return the type null which is incompatible with the type-hinted return string. Consider adding an additional type-check to rule them out.
Loading history...
147
    }
148
149
    /**
150
     * {@inheritdoc}
151
     */
152 1
    public function pauseTube(string $tube, int $delay): void
153
    {
154 1
        $this->dispatch(new Command\PauseTubeCommand($tube, $delay));
155 1
    }
156
157
    /**
158
     * {@inheritdoc}
159
     */
160 1
    public function resumeTube(string $tube): void
161
    {
162
        // Pause a tube with zero delay will resume the tube
163 1
        $this->pauseTube($tube, 0);
164 1
    }
165
166
    /**
167
     * {@inheritdoc}
168
     */
169 1
    public function peek(JobIdInterface $job): Job
170
    {
171 1
        $response = $this->dispatch(
172 1
            new Command\PeekJobCommand($job)
173
        );
174
175 1
        return new Job($response['id'], $response['jobdata']);
176
    }
177
178
    /**
179
     * {@inheritdoc}
180
     */
181 23
    public function peekReady(): ?Job
182
    {
183 23
        $response = $this->dispatch(
184 23
            new Command\PeekCommand(PeekCommand::TYPE_READY)
185
        );
186 23
        if ($response->getResponseName() === ResponseInterface::RESPONSE_NOT_FOUND) {
187 23
            return null;
188
        }
189
190 11
        return new Job($response['id'], $response['jobdata']);
191
    }
192
193
    /**
194
     * {@inheritdoc}
195
     */
196 23
    public function peekDelayed(): ?Job
197
    {
198 23
        $response = $this->dispatch(
199 23
            new Command\PeekCommand(Command\PeekCommand::TYPE_DELAYED)
200
        );
201 23
        if ($response->getResponseName() === ResponseInterface::RESPONSE_NOT_FOUND) {
202 23
            return null;
203
        }
204
205 2
        return new Job($response['id'], $response['jobdata']);
206
    }
207
208
    /**
209
     * {@inheritdoc}
210
     */
211 21
    public function peekBuried(): ?Job
212
    {
213 21
        $response = $this->dispatch(
214 21
            new Command\PeekCommand(Command\PeekCommand::TYPE_BURIED)
215
        );
216 21
        if ($response->getResponseName() === ResponseInterface::RESPONSE_NOT_FOUND) {
217 21
            return null;
218
        }
219
220 2
        return new Job($response['id'], $response['jobdata']);
221
    }
222
223
    /**
224
     * {@inheritdoc}
225
     */
226 15
    public function put(
227
        string $data,
228
        int $priority = PheanstalkInterface::DEFAULT_PRIORITY,
229
        int $delay = PheanstalkInterface::DEFAULT_DELAY,
230
        int $ttr = PheanstalkInterface::DEFAULT_TTR
231
    ): Job {
232 15
        $response = $this->dispatch(
233 15
            new Command\PutCommand($data, $priority, $delay, $ttr)
234
        );
235
236 14
        return new Job($response['id'], $data);
237
    }
238
239
    /**
240
     * {@inheritdoc}
241
     */
242 2
    public function release(
243
        JobIdInterface $job,
244
        int $priority = PheanstalkInterface::DEFAULT_PRIORITY,
245
        int $delay = PheanstalkInterface::DEFAULT_DELAY
246
    ): void {
247 2
        $this->dispatch(
248 2
            new Command\ReleaseCommand($job, $priority, $delay)
249
        );
250 2
    }
251
252
    /**
253
     * {@inheritdoc}
254
     */
255 6
    public function reserve(): Job
256
    {
257 6
        $response = $this->dispatch(
258 6
            new Command\ReserveCommand()
259
        );
260
261 6
        return new Job($response['id'], $response['jobdata']);
262
    }
263
264
    /**
265
     * {@inheritdoc}
266
     */
267
    public function reserveJob(JobIdInterface $job): Job
268
    {
269
        // New in 1.12, beanstalkd returns BadFormat instead of UnknownCommand
270
        try {
271
            $response = $this->dispatch(
272
                new Command\ReserveJobCommand($job)
273
            );
274
        } catch (Exception\ServerBadFormatException $e) {
275
            throw new Exception\ServerUnknownCommandException();
276
        }
277
278
        if ($response->getResponseName() === ResponseInterface::RESPONSE_BAD_FORMAT) {
279
            throw new Exception\ServerUnknownCommandException();
280
        }
281
282
        return new Job($response['id'], $response['jobdata']);
283
    }
284
285
    /**
286
     * {@inheritdoc}
287
     */
288 4
    public function reserveWithTimeout(int $timeout): ?Job
289
    {
290 4
        $response = $this->dispatch(
291 4
            new Command\ReserveWithTimeoutCommand($timeout)
292
        );
293
294 4
        if ($response->getResponseName() === ResponseInterface::RESPONSE_DEADLINE_SOON) {
295
            throw new DeadlineSoonException();
296
        }
297
298 4
        if ($response->getResponseName() === ResponseInterface::RESPONSE_TIMED_OUT) {
299 3
            return null;
300
        }
301
302 4
        return new Job($response['id'], $response['jobdata']);
303
    }
304
305
    /**
306
     * {@inheritdoc}
307
     */
308 3
    public function statsJob(JobIdInterface $job): ResponseInterface
309
    {
310 3
        return $this->dispatch(new Command\StatsJobCommand($job));
311
    }
312
313
    /**
314
     * {@inheritdoc}
315
     */
316 1
    public function statsTube(string $tube): ResponseInterface
317
    {
318 1
        return $this->dispatch(new Command\StatsTubeCommand($tube));
319
    }
320
321
    /**
322
     * {@inheritdoc}
323
     */
324 6
    public function stats(): ResponseInterface
325
    {
326 6
        return $this->dispatch(new Command\StatsCommand());
327
    }
328
329
    /**
330
     * {@inheritdoc}
331
     */
332 1
    public function touch(JobIdInterface $job): void
333
    {
334 1
        $this->dispatch(new Command\TouchCommand($job));
335 1
    }
336
337
    /**
338
     * {@inheritdoc}
339
     */
340 23
    public function useTube(string $tube): PheanstalkInterface
341
    {
342 23
        if ($this->using !== $tube) {
343 23
            $this->dispatch(new Command\UseCommand($tube));
344 23
            $this->using = $tube;
345
        }
346
347 23
        return $this;
348
    }
349
350
    /**
351
     * {@inheritdoc}
352
     */
353 12
    public function watch(string $tube): PheanstalkInterface
354
    {
355 12
        if (!isset($this->watching[$tube])) {
356 12
            $this->dispatch(new Command\WatchCommand($tube));
357 12
            $this->watching[$tube] = true;
358
        }
359
360 12
        return $this;
361
    }
362
363
    /**
364
     * {@inheritdoc}
365
     */
366 2
    public function watchOnly(string $tube): PheanstalkInterface
367
    {
368 2
        $this->watch($tube);
369
370 2
        $ignoreTubes = array_diff_key($this->watching, [$tube => true]);
371 2
        foreach ($ignoreTubes as $ignoreTube => $true) {
372 2
            $this->ignore($ignoreTube);
373
        }
374
375 2
        return $this;
376
    }
377
378
    // ----------------------------------------
379
380
    /**
381
     * Dispatches the specified command to the connection object.
382
     *
383
     * If a SocketException occurs, the connection is reset, and the command is
384
     * re-attempted once.
385
     *
386
     * @param CommandInterface $command
387
     *
388
     * @return ResponseInterface
389
     */
390 27
    private function dispatch($command)
391
    {
392
        try {
393 27
            $response = $this->connection->dispatchCommand($command);
394 3
        } catch (Exception\SocketException $e) {
395 1
            $this->reconnect();
396 1
            $response = $this->connection->dispatchCommand($command);
397
        }
398
399 27
        return $response;
400
    }
401
402
    /**
403
     * Creates a new connection object, based on the existing connection object,
404
     * and re-establishes the used tube and watchlist.
405
     */
406 1
    private function reconnect()
407
    {
408 1
        $this->connection->disconnect();
409
410 1
        if ($this->using !== PheanstalkInterface::DEFAULT_TUBE) {
411
            $this->dispatch(new Command\UseCommand($this->using));
0 ignored issues
show
Bug introduced by
It seems like $this->using can also be of type null; however, parameter $tube of Pheanstalk\Command\UseCommand::__construct() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

411
            $this->dispatch(new Command\UseCommand(/** @scrutinizer ignore-type */ $this->using));
Loading history...
412
        }
413
414 1
        foreach ($this->watching as $tube => $true) {
415 1
            if ($tube != PheanstalkInterface::DEFAULT_TUBE) {
416
                unset($this->watching[$tube]);
417
                $this->watch($tube);
418
            }
419
        }
420
421 1
        if (!isset($this->watching[PheanstalkInterface::DEFAULT_TUBE])) {
422
            $this->ignore(PheanstalkInterface::DEFAULT_TUBE);
423
        }
424 1
    }
425
426
    /**
427
     * @param string $tube The tube to use during execution
428
     * @param \Closure $closure Closure to execute while using the specified tube
429
     * @return mixed the return value of the closure.
430
     * @internal This is marked as internal since it is not part of a stabilized interface.
431
     */
432 2
    public function withUsedTube(string $tube, \Closure $closure)
433
    {
434 2
        $used = $this->listTubeUsed();
435
        try {
436 2
            $this->useTube($tube);
437 2
            return $closure($this);
438
        } finally {
439 2
            $this->useTube($used);
440
        }
441
    }
442
443
    /**
444
     * @param string $tube The tube to watch during execution
445
     * @param \Closure $closure Closure to execute while using the specified tube
446
     * @return mixed the return value of the closure.
447
     * @internal This is marked as internal since it is not part of a stabilized interface.
448
     */
449 1
    public function withWatchedTube(string $tube, \Closure $closure)
450
    {
451 1
        $watched = $this->listTubesWatched();
452
        try {
453 1
            $this->watchOnly($tube);
454 1
            return $closure($this);
455
        } finally {
456 1
            foreach ($watched as $tube) {
457 1
                $this->watch($tube);
458
            }
459 1
            if (!in_array($tube, $watched)) {
460
                $this->ignore($tube);
461
            }
462
        }
463
    }
464
}
465