GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( d2c1fa...92e8b2 )
by Sam
17:17 queued 15:28
created

Pheanstalk::reserveJob()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 16
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 2
Bugs 0 Features 0
Metric Value
eloc 8
c 2
b 0
f 0
dl 0
loc 16
ccs 0
cts 8
cp 0
rs 10
cc 3
nc 3
nop 1
crap 12
1
<?php
2
3
namespace Pheanstalk;
4
5
use Pheanstalk\Contract\CommandInterface;
6
use Pheanstalk\Contract\JobIdInterface;
7
use Pheanstalk\Contract\PheanstalkInterface;
8
use Pheanstalk\Contract\ResponseInterface;
9
use Pheanstalk\Contract\SocketFactoryInterface;
10
use Pheanstalk\Exception\DeadlineSoonException;
11
12
/**
13
 * Pheanstalk is a PHP client for the beanstalkd workqueue.
14
 */
15
class Pheanstalk implements PheanstalkInterface
16
{
17
    /**
18
     * @var Connection
19
     */
20
    private $connection;
21
    /**
22
     * @var ?string
23
     */
24
    private $using = PheanstalkInterface::DEFAULT_TUBE;
25
    /**
26
     * @var array<string,bool>
27
     */
28
    private $watching = [PheanstalkInterface::DEFAULT_TUBE => true];
29
30 27
    public function __construct(Connection $connection)
31
    {
32 27
        $this->connection = $connection;
33
    }
34
35
    /**
36
     * Static constructor that uses autodetection to choose an underlying socket implementation
37
     * @param string $host
38
     * @param int $port
39
     * @param int $connectTimeout
40
     * @return Pheanstalk
41
     */
42 23
    public static function create(string $host, int $port = 11300, int $connectTimeout = 10)
43
    {
44 23
        return self::createWithFactory(new SocketFactory($host, $port, $connectTimeout));
45
    }
46
47
    /**
48
     * Static constructor that uses a given socket factory for underlying connections
49
     * @param SocketFactoryInterface $factory
50
     * @return Pheanstalk
51
     */
52 23
    public static function createWithFactory(SocketFactoryInterface $factory)
53
    {
54 23
        return new self(new Connection($factory));
55
    }
56
57
    // ----------------------------------------
58
59
    /**
60
     * {@inheritdoc}
61
     */
62 2
    public function bury(JobIdInterface $job, int $priority = PheanstalkInterface::DEFAULT_PRIORITY): void
63
    {
64 2
        $this->dispatch(new Command\BuryCommand($job, $priority));
65
    }
66
67
    /**
68
     * {@inheritdoc}
69
     */
70 13
    public function delete(JobIdInterface $job): void
71
    {
72 13
        $this->dispatch(new Command\DeleteCommand($job));
73
    }
74
75
    /**
76
     * {@inheritdoc}
77
     */
78 12
    public function ignore(string $tube): PheanstalkInterface
79
    {
80 12
        if (isset($this->watching[$tube])) {
81 12
            $this->dispatch(new Command\IgnoreCommand($tube));
82 11
            unset($this->watching[$tube]);
83
        }
84
85 11
        return $this;
86
    }
87
88
    /**
89
     * {@inheritdoc}
90
     */
91 1
    public function kick(int $max): int
92
    {
93 1
        $response = $this->dispatch(new Command\KickCommand($max));
94
95 1
        return $response['kicked'];
96
    }
97
98
    /**
99
     * {@inheritdoc}
100
     */
101
    public function kickJob(JobIdInterface $job): void
102
    {
103
        $this->dispatch(new Command\KickJobCommand($job));
104
    }
105
106
    /**
107
     * {@inheritdoc}
108
     */
109 21
    public function listTubes(): array
110
    {
111 21
        return (array)$this->dispatch(
112 21
            new Command\ListTubesCommand()
113
        );
114
    }
115
116
    /**
117
     * {@inheritdoc}
118
     */
119 2
    public function listTubesWatched(bool $askServer = false): array
120
    {
121 2
        if ($askServer) {
122 1
            $response = (array)$this->dispatch(
123 1
                new Command\ListTubesWatchedCommand()
124
            );
125 1
            $this->watching = array_fill_keys($response, true);
126
        }
127
128 2
        return array_keys($this->watching);
129
    }
130
131
    /**
132
     * {@inheritdoc}
133
     */
134 3
    public function listTubeUsed(bool $askServer = false): string
135
    {
136 3
        if ($askServer) {
137 1
            $response = $this->dispatch(
138 1
                new Command\ListTubeUsedCommand()
139
            );
140 1
            $this->using = $response['tube'];
141
        }
142
143 3
        return $this->using;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this->using could return the type null which is incompatible with the type-hinted return string. Consider adding an additional type-check to rule them out.
Loading history...
144
    }
145
146
    /**
147
     * {@inheritdoc}
148
     */
149 1
    public function pauseTube(string $tube, int $delay): void
150
    {
151 1
        $this->dispatch(new Command\PauseTubeCommand($tube, $delay));
152
    }
153
154
    /**
155
     * {@inheritdoc}
156
     */
157 1
    public function resumeTube(string $tube): void
158
    {
159
        // Pause a tube with zero delay will resume the tube
160 1
        $this->pauseTube($tube, 0);
161
    }
162
163
    /**
164
     * {@inheritdoc}
165
     */
166 1
    public function peek(JobIdInterface $job): Job
167
    {
168 1
        $response = $this->dispatch(
169 1
            new Command\PeekJobCommand($job)
170
        );
171
172 1
        return new Job($response['id'], $response['jobdata']);
173
    }
174
175
    /**
176
     * {@inheritdoc}
177
     */
178 23
    public function peekReady(): ?Job
179
    {
180 23
        $response = $this->dispatch(
181 23
            new Command\PeekCommand(Command\PeekCommand::TYPE_READY)
182
        );
183 23
        if ($response->getResponseName() === ResponseInterface::RESPONSE_NOT_FOUND) {
184 23
            return null;
185
        }
186
187 11
        return new Job($response['id'], $response['jobdata']);
188
    }
189
190
    /**
191
     * {@inheritdoc}
192
     */
193 23
    public function peekDelayed(): ?Job
194
    {
195 23
        $response = $this->dispatch(
196 23
            new Command\PeekCommand(Command\PeekCommand::TYPE_DELAYED)
197
        );
198 23
        if ($response->getResponseName() === ResponseInterface::RESPONSE_NOT_FOUND) {
199 23
            return null;
200
        }
201
202 2
        return new Job($response['id'], $response['jobdata']);
203
    }
204
205
    /**
206
     * {@inheritdoc}
207
     */
208 21
    public function peekBuried(): ?Job
209
    {
210 21
        $response = $this->dispatch(
211 21
            new Command\PeekCommand(Command\PeekCommand::TYPE_BURIED)
212
        );
213 21
        if ($response->getResponseName() === ResponseInterface::RESPONSE_NOT_FOUND) {
214 21
            return null;
215
        }
216
217 2
        return new Job($response['id'], $response['jobdata']);
218
    }
219
220
    /**
221
     * {@inheritdoc}
222
     */
223 15
    public function put(
224
        string $data,
225
        int $priority = PheanstalkInterface::DEFAULT_PRIORITY,
226
        int $delay = PheanstalkInterface::DEFAULT_DELAY,
227
        int $ttr = PheanstalkInterface::DEFAULT_TTR
228
    ): Job {
229 15
        $response = $this->dispatch(
230 15
            new Command\PutCommand($data, $priority, $delay, $ttr)
231
        );
232
233 14
        return new Job($response['id'], $data);
234
    }
235
236
    /**
237
     * {@inheritdoc}
238
     */
239 2
    public function release(
240
        JobIdInterface $job,
241
        int $priority = PheanstalkInterface::DEFAULT_PRIORITY,
242
        int $delay = PheanstalkInterface::DEFAULT_DELAY
243
    ): void {
244 2
        $this->dispatch(
245 2
            new Command\ReleaseCommand($job, $priority, $delay)
246
        );
247
    }
248
249
    /**
250
     * {@inheritdoc}
251
     */
252 6
    public function reserve(): Job
253
    {
254 6
        $response = $this->dispatch(
255 6
            new Command\ReserveCommand()
256
        );
257
258 6
        return new Job($response['id'], $response['jobdata']);
259
    }
260
261
    /**
262
     * {@inheritdoc}
263
     */
264
    public function reserveJob(JobIdInterface $job): Job
265
    {
266
        // New in 1.12, beanstalkd returns BadFormat instead of UnknownCommand
267
        try {
268
            $response = $this->dispatch(
269
                new Command\ReserveJobCommand($job)
270
            );
271
        } catch (Exception\ServerBadFormatException $e) {
272
            throw new Exception\ServerUnknownCommandException();
273
        }
274
275
        if ($response->getResponseName() === ResponseInterface::RESPONSE_BAD_FORMAT) {
276
            throw new Exception\ServerUnknownCommandException();
277
        }
278
279
        return new Job($response['id'], $response['jobdata']);
280
    }
281
282
    /**
283
     * {@inheritdoc}
284
     */
285 4
    public function reserveWithTimeout(int $timeout): ?Job
286
    {
287 4
        $response = $this->dispatch(
288 4
            new Command\ReserveWithTimeoutCommand($timeout)
289
        );
290
291 4
        if ($response->getResponseName() === ResponseInterface::RESPONSE_DEADLINE_SOON) {
292
            throw new DeadlineSoonException();
293
        }
294
295 4
        if ($response->getResponseName() === ResponseInterface::RESPONSE_TIMED_OUT) {
296 3
            return null;
297
        }
298
299 4
        return new Job($response['id'], $response['jobdata']);
300
    }
301
302
    /**
303
     * {@inheritdoc}
304
     */
305 3
    public function statsJob(JobIdInterface $job): ResponseInterface
306
    {
307 3
        return $this->dispatch(new Command\StatsJobCommand($job));
308
    }
309
310
    /**
311
     * {@inheritdoc}
312
     */
313 1
    public function statsTube(string $tube): ResponseInterface
314
    {
315 1
        return $this->dispatch(new Command\StatsTubeCommand($tube));
316
    }
317
318
    /**
319
     * {@inheritdoc}
320
     */
321 6
    public function stats(): ResponseInterface
322
    {
323 6
        return $this->dispatch(new Command\StatsCommand());
324
    }
325
326
    /**
327
     * {@inheritdoc}
328
     */
329 1
    public function touch(JobIdInterface $job): void
330
    {
331 1
        $this->dispatch(new Command\TouchCommand($job));
332
    }
333
334
    /**
335
     * {@inheritdoc}
336
     */
337 23
    public function useTube(string $tube): PheanstalkInterface
338
    {
339 23
        if ($this->using !== $tube) {
340 23
            $this->dispatch(new Command\UseCommand($tube));
341 23
            $this->using = $tube;
342
        }
343
344 23
        return $this;
345
    }
346
347
    /**
348
     * {@inheritdoc}
349
     */
350 12
    public function watch(string $tube): PheanstalkInterface
351
    {
352 12
        if (!isset($this->watching[$tube])) {
353 12
            $this->dispatch(new Command\WatchCommand($tube));
354 12
            $this->watching[$tube] = true;
355
        }
356
357 12
        return $this;
358
    }
359
360
    /**
361
     * {@inheritdoc}
362
     */
363 2
    public function watchOnly(string $tube): PheanstalkInterface
364
    {
365 2
        $this->watch($tube);
366
367 2
        $ignoreTubes = array_diff_key($this->watching, [$tube => true]);
368 2
        foreach ($ignoreTubes as $ignoreTube => $true) {
369 2
            $this->ignore($ignoreTube);
370
        }
371
372 2
        return $this;
373
    }
374
375
    // ----------------------------------------
376
377
    /**
378
     * Dispatches the specified command to the connection object.
379
     *
380
     * If a SocketException occurs, the connection is reset, and the command is
381
     * re-attempted once.
382
     *
383
     * @param CommandInterface $command
384
     *
385
     * @return ResponseInterface
386
     */
387 27
    private function dispatch($command)
388
    {
389
        try {
390 27
            $response = $this->connection->dispatchCommand($command);
391 3
        } catch (Exception\SocketException $e) {
392 1
            $this->reconnect();
393 1
            $response = $this->connection->dispatchCommand($command);
394
        }
395
396 27
        return $response;
397
    }
398
399
    /**
400
     * Creates a new connection object, based on the existing connection object,
401
     * and re-establishes the used tube and watchlist.
402
     */
403 1
    private function reconnect()
404
    {
405 1
        $this->connection->disconnect();
406
407 1
        if ($this->using !== PheanstalkInterface::DEFAULT_TUBE) {
408
            $this->dispatch(new Command\UseCommand($this->using));
0 ignored issues
show
Bug introduced by
It seems like $this->using can also be of type null; however, parameter $tube of Pheanstalk\Command\UseCommand::__construct() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

408
            $this->dispatch(new Command\UseCommand(/** @scrutinizer ignore-type */ $this->using));
Loading history...
409
        }
410
411 1
        foreach ($this->watching as $tube => $true) {
412 1
            if ($tube != PheanstalkInterface::DEFAULT_TUBE) {
413
                unset($this->watching[$tube]);
414
                $this->watch($tube);
415
            }
416
        }
417
418 1
        if (!isset($this->watching[PheanstalkInterface::DEFAULT_TUBE])) {
419
            $this->ignore(PheanstalkInterface::DEFAULT_TUBE);
420
        }
421
    }
422
423
    /**
424
     * @param string $tube The tube to use during execution
425
     * @param \Closure $closure Closure to execute while using the specified tube
426
     * @return mixed the return value of the closure.
427
     * @internal This is marked as internal since it is not part of a stabilized interface.
428
     */
429 2
    public function withUsedTube(string $tube, \Closure $closure)
430
    {
431 2
        $used = $this->listTubeUsed();
432
        try {
433 2
            $this->useTube($tube);
434 2
            return $closure($this);
435
        } finally {
436 2
            $this->useTube($used);
437
        }
438
    }
439
440
    /**
441
     * @param string $tube The tube to watch during execution
442
     * @param \Closure $closure Closure to execute while using the specified tube
443
     * @return mixed the return value of the closure.
444
     * @internal This is marked as internal since it is not part of a stabilized interface.
445
     */
446 1
    public function withWatchedTube(string $tube, \Closure $closure)
447
    {
448 1
        $watched = $this->listTubesWatched();
449
        try {
450 1
            $this->watchOnly($tube);
451 1
            return $closure($this);
452
        } finally {
453 1
            foreach ($watched as $tube) {
454 1
                $this->watch($tube);
455
            }
456 1
            if (!in_array($tube, $watched)) {
457
                $this->ignore($tube);
458
            }
459
        }
460
    }
461
}
462