Subscriber   A
last analyzed

Complexity

Total Complexity 14

Size/Duplication

Total Lines 100
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 7

Test Coverage

Coverage 100%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 14
c 1
b 0
f 0
lcom 1
cbo 7
dl 0
loc 100
ccs 43
cts 43
cp 1
rs 10

6 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 6 1
A subscribe() 0 19 4
A executeCallback() 0 8 2
A consume() 0 10 2
A handleFailedJob() 0 13 3
A handleEvent() 0 8 2
1
<?php
2
3
namespace Zenstruck\Queue;
4
5
use SimpleBus\Asynchronous\Consumer\SerializedEnvelopeConsumer;
6
use SimpleBus\Message\Bus\MessageBus;
7
use Zenstruck\Queue\Event\FailedJob;
8
use Zenstruck\Queue\Event\MaxAttemptsReached;
9
use Zenstruck\Queue\Subscriber\ExitStrategy;
10
11
/**
12
 * @author Kevin Bond <[email protected]>
13
 */
14
final class Subscriber
15
{
16
    private $driver;
17
    private $consumer;
18
    private $eventBus;
19
20 10
    public function __construct(Driver $driver, SerializedEnvelopeConsumer $consumer, MessageBus $eventBus = null)
21
    {
22 10
        $this->driver = $driver;
23 10
        $this->consumer = $consumer;
24 10
        $this->eventBus = $eventBus;
25 10
    }
26
27
    /**
28
     * @param ExitStrategy  $exitStrategy
29
     * @param callable|null $callback     Callback that is called after job is consumed with args: count, job
30
     * @param int|null      $waitTime     Time to wait before consuming another job
31
     * @param int           $maxAttempts  The number of times to attempt a job before marking as failed, 0 for unlimited
32
     *
33
     * @return string
34
     */
35 9
    public function subscribe(ExitStrategy $exitStrategy, callable $callback = null, $waitTime = null, $maxAttempts = 50)
36
    {
37 9
        $count = 0;
38
39 9
        while (!$exitStrategy->shouldExit($count)) {
40 5
            $job = $this->driver->pop();
41
42 5
            if ($job instanceof Job) {
43 5
                $this->consume($job, $maxAttempts);
44 5
                $this->executeCallback(++$count, $job, $callback);
45 5
            }
46
47 5
            if (null !== $waitTime) {
48 1
                sleep($waitTime);
49 1
            }
50 5
        }
51
52 9
        return $exitStrategy->getReason();
53
    }
54
55
    /**
56
     * @param int          $count
57
     * @param Job          $job
58
     * @param callable|int $callback
59
     */
60 5
    private function executeCallback($count, Job $job, callable $callback = null)
61
    {
62 5
        if (!is_callable($callback)) {
63 2
            return;
64
        }
65
66 3
        $callback($count, $job);
67 3
    }
68
69
    /**
70
     * @param Job $job
71
     * @param int $maxAttempts
72
     */
73 5
    private function consume(Job $job, $maxAttempts)
74
    {
75
        try {
76 5
            $this->consumer->consume($job->serializedEnvelope());
77 1
            $this->driver->delete($job);
78 5
        } catch (\Exception $exception) {
79 4
            $job->fail($exception);
80 4
            $this->handleFailedJob($job, $maxAttempts);
81
        }
82 5
    }
83
84
    /**
85
     * @param Job $job
86
     * @param int $maxAttempts
87
     */
88 4
    private function handleFailedJob(Job $job, $maxAttempts)
89
    {
90 4
        $this->handleEvent(new FailedJob($job));
91
92 4
        if (0 !== $maxAttempts && $job->attempts() >= $maxAttempts) {
93 2
            $this->handleEvent(new MaxAttemptsReached($job, $maxAttempts));
94 2
            $this->driver->delete($job);
95
96 2
            return;
97
        }
98
99 2
        $this->driver->release($job);
100 2
    }
101
102
    /**
103
     * @param object $event
104
     */
105 4
    private function handleEvent($event)
106
    {
107 4
        if (!$this->eventBus instanceof MessageBus) {
108 2
            return;
109
        }
110
111 2
        $this->eventBus->handle($event);
112 2
    }
113
}
114