Pheanstalk   C
last analyzed

Complexity

Total Complexity 53

Size/Duplication

Total Lines 443
Duplicated Lines 0 %

Test Coverage

Coverage 88.67%

Importance

Changes 6
Bugs 2 Features 0
Metric Value
wmc 53
eloc 118
dl 0
loc 443
ccs 133
cts 150
cp 0.8867
rs 6.96
c 6
b 2
f 0

33 Methods

Rating   Name   Duplication   Size   Complexity  
A bury() 0 3 1
A reconnect() 0 17 5
A touch() 0 3 1
A release() 0 7 1
A pauseTube() 0 3 1
A delete() 0 3 1
A kickJob() 0 3 1
A __construct() 0 3 1
A resumeTube() 0 4 1
A put() 0 11 1
A statsJob() 0 3 1
A kick() 0 5 1
A useTube() 0 8 2
A dispatch() 0 10 2
A reserve() 0 7 1
A peekDelayed() 0 10 2
A peek() 0 7 1
A reserveJob() 0 16 3
A listTubeUsed() 0 10 2
A createWithFactory() 0 3 1
A listTubesWatched() 0 10 2
A listTubes() 0 4 1
A withWatchedTube() 0 12 3
A withUsedTube() 0 8 1
A stats() 0 3 1
A create() 0 3 1
A peekBuried() 0 10 2
A statsTube() 0 3 1
A peekReady() 0 10 2
A ignore() 0 8 2
A watch() 0 8 2
A reserveWithTimeout() 0 15 3
A watchOnly() 0 10 2

How to fix   Complexity   

Complex Class

Complex classes like Pheanstalk often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Pheanstalk, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
declare(strict_types=1);
4
5
namespace Pheanstalk;
6
7
use Pheanstalk\Command\PeekCommand;
8
use Pheanstalk\Contract\CommandInterface;
9
use Pheanstalk\Contract\JobIdInterface;
10
use Pheanstalk\Contract\PheanstalkInterface;
11
use Pheanstalk\Contract\ResponseInterface;
12
use Pheanstalk\Contract\SocketFactoryInterface;
13
use Pheanstalk\Exception\DeadlineSoonException;
14
15
/**
16
 * Pheanstalk is a PHP client for the beanstalkd workqueue.
17
 */
18
class Pheanstalk implements PheanstalkInterface
19
{
20
    /**
21
     * @var Connection
22
     */
23
    private $connection;
24
    /**
25
     * @var ?string
26
     */
27
    private $using = PheanstalkInterface::DEFAULT_TUBE;
28
    /**
29
     * @var array<string,bool>
30
     */
31
    private $watching = [PheanstalkInterface::DEFAULT_TUBE => true];
32
33 27
    public function __construct(Connection $connection)
34
    {
35 27
        $this->connection = $connection;
36 27
    }
37
38
    /**
39
     * Static constructor that uses autodetection to choose an underlying socket implementation
40
     * @param string $host
41
     * @param int $port
42
     * @param int $connectTimeout
43
     * @return Pheanstalk
44
     */
45 23
    public static function create(string $host, int $port = 11300, int $connectTimeout = 10)
46
    {
47 23
        return self::createWithFactory(new SocketFactory($host, $port, $connectTimeout));
48
    }
49
50
    /**
51
     * Static constructor that uses a given socket factory for underlying connections
52
     * @param SocketFactoryInterface $factory
53
     * @return Pheanstalk
54
     */
55 23
    public static function createWithFactory(SocketFactoryInterface $factory)
56
    {
57 23
        return new self(new Connection($factory));
58
    }
59
60
    // ----------------------------------------
61
62
    /**
63
     * {@inheritdoc}
64
     */
65 2
    public function bury(JobIdInterface $job, int $priority = PheanstalkInterface::DEFAULT_PRIORITY): void
66
    {
67 2
        $this->dispatch(new Command\BuryCommand($job, $priority));
68 2
    }
69
70
    /**
71
     * {@inheritdoc}
72
     */
73 13
    public function delete(JobIdInterface $job): void
74
    {
75 13
        $this->dispatch(new Command\DeleteCommand($job));
76 13
    }
77
78
    /**
79
     * {@inheritdoc}
80
     */
81 12
    public function ignore(string $tube): PheanstalkInterface
82
    {
83 12
        if (isset($this->watching[$tube])) {
84 12
            $this->dispatch(new Command\IgnoreCommand($tube));
85 11
            unset($this->watching[$tube]);
86
        }
87
88 11
        return $this;
89
    }
90
91
    /**
92
     * {@inheritdoc}
93
     */
94 1
    public function kick(int $max): int
95
    {
96 1
        $response = $this->dispatch(new Command\KickCommand($max));
97
98 1
        return $response['kicked'];
99
    }
100
101
    /**
102
     * {@inheritdoc}
103
     */
104
    public function kickJob(JobIdInterface $job): void
105
    {
106
        $this->dispatch(new Command\KickJobCommand($job));
107
    }
108
109
    /**
110
     * {@inheritdoc}
111
     */
112 21
    public function listTubes(): array
113
    {
114 21
        return (array)$this->dispatch(
115 21
            new Command\ListTubesCommand()
116
        );
117
    }
118
119
    /**
120
     * {@inheritdoc}
121
     */
122 2
    public function listTubesWatched(bool $askServer = false): array
123
    {
124 2
        if ($askServer) {
125 1
            $response = (array)$this->dispatch(
126 1
                new Command\ListTubesWatchedCommand()
127
            );
128 1
            $this->watching = array_fill_keys($response, true);
129
        }
130
131 2
        return array_keys($this->watching);
132
    }
133
134
    /**
135
     * {@inheritdoc}
136
     */
137 3
    public function listTubeUsed(bool $askServer = false): string
138
    {
139 3
        if ($askServer) {
140 1
            $response = $this->dispatch(
141 1
                new Command\ListTubeUsedCommand()
142
            );
143 1
            $this->using = $response['tube'];
144
        }
145
146 3
        return $this->using;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this->using could return the type null which is incompatible with the type-hinted return string. Consider adding an additional type-check to rule them out.
Loading history...
147
    }
148
149
    /**
150
     * {@inheritdoc}
151
     */
152 1
    public function pauseTube(string $tube, int $delay): void
153
    {
154 1
        $this->dispatch(new Command\PauseTubeCommand($tube, $delay));
155 1
    }
156
157
    /**
158
     * {@inheritdoc}
159
     */
160 1
    public function resumeTube(string $tube): void
161
    {
162
        // Pause a tube with zero delay will resume the tube
163 1
        $this->pauseTube($tube, 0);
164 1
    }
165
166
    /**
167
     * {@inheritdoc}
168
     */
169 1
    public function peek(JobIdInterface $job): Job
170
    {
171 1
        $response = $this->dispatch(
172 1
            new Command\PeekJobCommand($job)
173
        );
174
175 1
        return new Job($response['id'], $response['jobdata']);
176
    }
177
178
    /**
179
     * {@inheritdoc}
180
     */
181 23
    public function peekReady(): ?Job
182
    {
183 23
        $response = $this->dispatch(
184 23
            new Command\PeekCommand(PeekCommand::TYPE_READY)
185
        );
186 23
        if ($response->getResponseName() === ResponseInterface::RESPONSE_NOT_FOUND) {
187 23
            return null;
188
        }
189
190 11
        return new Job($response['id'], $response['jobdata']);
191
    }
192
193
    /**
194
     * {@inheritdoc}
195
     */
196 23
    public function peekDelayed(): ?Job
197
    {
198 23
        $response = $this->dispatch(
199 23
            new Command\PeekCommand(Command\PeekCommand::TYPE_DELAYED)
200
        );
201 23
        if ($response->getResponseName() === ResponseInterface::RESPONSE_NOT_FOUND) {
202 23
            return null;
203
        }
204
205 2
        return new Job($response['id'], $response['jobdata']);
206
    }
207
208
    /**
209
     * {@inheritdoc}
210
     */
211 21
    public function peekBuried(): ?Job
212
    {
213 21
        $response = $this->dispatch(
214 21
            new Command\PeekCommand(Command\PeekCommand::TYPE_BURIED)
215
        );
216 21
        if ($response->getResponseName() === ResponseInterface::RESPONSE_NOT_FOUND) {
217 21
            return null;
218
        }
219
220 2
        return new Job($response['id'], $response['jobdata']);
221
    }
222
223
    /**
224
     * {@inheritdoc}
225
     */
226 15
    public function put(
227
        string $data,
228
        int $priority = PheanstalkInterface::DEFAULT_PRIORITY,
229
        int $delay = PheanstalkInterface::DEFAULT_DELAY,
230
        int $ttr = PheanstalkInterface::DEFAULT_TTR
231
    ): Job {
232 15
        $response = $this->dispatch(
233 15
            new Command\PutCommand($data, $priority, $delay, $ttr)
234
        );
235
236 14
        return new Job($response['id'], $data);
237
    }
238
239
    /**
240
     * {@inheritdoc}
241
     */
242 2
    public function release(
243
        JobIdInterface $job,
244
        int $priority = PheanstalkInterface::DEFAULT_PRIORITY,
245
        int $delay = PheanstalkInterface::DEFAULT_DELAY
246
    ): void {
247 2
        $this->dispatch(
248 2
            new Command\ReleaseCommand($job, $priority, $delay)
249
        );
250 2
    }
251
252
    /**
253
     * {@inheritdoc}
254
     */
255 6
    public function reserve(): Job
256
    {
257 6
        $response = $this->dispatch(
258 6
            new Command\ReserveCommand()
259
        );
260
261 6
        return new Job($response['id'], $response['jobdata']);
262
    }
263
264
    /**
265
     * {@inheritdoc}
266
     */
267
    public function reserveJob(JobIdInterface $job): Job
268
    {
269
        // New in 1.12, beanstalkd returns BadFormat instead of UnknownCommand
270
        try {
271
            $response = $this->dispatch(
272
                new Command\ReserveJobCommand($job)
273
            );
274
        } catch (Exception\ServerBadFormatException $e) {
275
            throw new Exception\ServerUnknownCommandException();
276
        }
277
278
        if ($response->getResponseName() === ResponseInterface::RESPONSE_BAD_FORMAT) {
279
            throw new Exception\ServerUnknownCommandException();
280
        }
281
282
        return new Job($response['id'], $response['jobdata']);
283
    }
284
285
    /**
286
     * {@inheritdoc}
287
     */
288 4
    public function reserveWithTimeout(int $timeout): ?Job
289
    {
290 4
        $response = $this->dispatch(
291 4
            new Command\ReserveWithTimeoutCommand($timeout)
292
        );
293
294 4
        if ($response->getResponseName() === ResponseInterface::RESPONSE_DEADLINE_SOON) {
295
            throw new DeadlineSoonException();
296
        }
297
298 4
        if ($response->getResponseName() === ResponseInterface::RESPONSE_TIMED_OUT) {
299 3
            return null;
300
        }
301
302 4
        return new Job($response['id'], $response['jobdata']);
303
    }
304
305
    /**
306
     * {@inheritdoc}
307
     */
308 3
    public function statsJob(JobIdInterface $job): ResponseInterface
309
    {
310 3
        return $this->dispatch(new Command\StatsJobCommand($job));
311
    }
312
313
    /**
314
     * {@inheritdoc}
315
     */
316 1
    public function statsTube(string $tube): ResponseInterface
317
    {
318 1
        return $this->dispatch(new Command\StatsTubeCommand($tube));
319
    }
320
321
    /**
322
     * {@inheritdoc}
323
     */
324 6
    public function stats(): ResponseInterface
325
    {
326 6
        return $this->dispatch(new Command\StatsCommand());
327
    }
328
329
    /**
330
     * {@inheritdoc}
331
     */
332 1
    public function touch(JobIdInterface $job): void
333
    {
334 1
        $this->dispatch(new Command\TouchCommand($job));
335 1
    }
336
337
    /**
338
     * {@inheritdoc}
339
     */
340 23
    public function useTube(string $tube): PheanstalkInterface
341
    {
342 23
        if ($this->using !== $tube) {
343 23
            $this->dispatch(new Command\UseCommand($tube));
344 23
            $this->using = $tube;
345
        }
346
347 23
        return $this;
348
    }
349
350
    /**
351
     * {@inheritdoc}
352
     */
353 12
    public function watch(string $tube): PheanstalkInterface
354
    {
355 12
        if (!isset($this->watching[$tube])) {
356 12
            $this->dispatch(new Command\WatchCommand($tube));
357 12
            $this->watching[$tube] = true;
358
        }
359
360 12
        return $this;
361
    }
362
363
    /**
364
     * {@inheritdoc}
365
     */
366 2
    public function watchOnly(string $tube): PheanstalkInterface
367
    {
368 2
        $this->watch($tube);
369
370 2
        $ignoreTubes = array_diff_key($this->watching, [$tube => true]);
371 2
        foreach ($ignoreTubes as $ignoreTube => $true) {
372 2
            $this->ignore($ignoreTube);
373
        }
374
375 2
        return $this;
376
    }
377
378
    // ----------------------------------------
379
380
    /**
381
     * Dispatches the specified command to the connection object.
382
     *
383
     * If a SocketException occurs, the connection is reset, and the command is
384
     * re-attempted once.
385
     *
386
     * @param CommandInterface $command
387
     *
388
     * @return ResponseInterface
389
     */
390 27
    private function dispatch($command)
391
    {
392
        try {
393 27
            $response = $this->connection->dispatchCommand($command);
394 3
        } catch (Exception\SocketException $e) {
395 1
            $this->reconnect();
396 1
            $response = $this->connection->dispatchCommand($command);
397
        }
398
399 27
        return $response;
400
    }
401
402
    /**
403
     * Creates a new connection object, based on the existing connection object,
404
     * and re-establishes the used tube and watchlist.
405
     */
406 1
    private function reconnect()
407
    {
408 1
        $this->connection->disconnect();
409
410 1
        if ($this->using !== PheanstalkInterface::DEFAULT_TUBE) {
411
            $this->dispatch(new Command\UseCommand($this->using));
0 ignored issues
show
Bug introduced by
It seems like $this->using can also be of type null; however, parameter $tube of Pheanstalk\Command\UseCommand::__construct() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

411
            $this->dispatch(new Command\UseCommand(/** @scrutinizer ignore-type */ $this->using));
Loading history...
412
        }
413
414 1
        foreach ($this->watching as $tube => $true) {
415 1
            if ($tube != PheanstalkInterface::DEFAULT_TUBE) {
416
                unset($this->watching[$tube]);
417
                $this->watch($tube);
418
            }
419
        }
420
421 1
        if (!isset($this->watching[PheanstalkInterface::DEFAULT_TUBE])) {
422
            $this->ignore(PheanstalkInterface::DEFAULT_TUBE);
423
        }
424 1
    }
425
426
    /**
427
     * @param string $tube The tube to use during execution
428
     * @param \Closure $closure Closure to execute while using the specified tube
429
     * @return mixed the return value of the closure.
430
     * @internal This is marked as internal since it is not part of a stabilized interface.
431
     */
432 2
    public function withUsedTube(string $tube, \Closure $closure)
433
    {
434 2
        $used = $this->listTubeUsed();
435
        try {
436 2
            $this->useTube($tube);
437 2
            return $closure($this);
438
        } finally {
439 2
            $this->useTube($used);
440
        }
441
    }
442
443
    /**
444
     * @param string $tube The tube to watch during execution
445
     * @param \Closure $closure Closure to execute while using the specified tube
446
     * @return mixed the return value of the closure.
447
     * @internal This is marked as internal since it is not part of a stabilized interface.
448
     */
449 1
    public function withWatchedTube(string $tube, \Closure $closure)
450
    {
451 1
        $watched = $this->listTubesWatched();
452
        try {
453 1
            $this->watchOnly($tube);
454 1
            return $closure($this);
455
        } finally {
456 1
            foreach ($watched as $tube) {
457 1
                $this->watch($tube);
458
            }
459 1
            if (!in_array($tube, $watched)) {
460
                $this->ignore($tube);
461
            }
462
        }
463
    }
464
}
465