Worker   A
last analyzed

Complexity

Total Complexity 7

Size/Duplication

Total Lines 91
Duplicated Lines 0 %

Test Coverage

Coverage 79.17%

Importance

Changes 0
Metric Value
wmc 7
dl 0
loc 91
ccs 19
cts 24
cp 0.7917
rs 10
c 0
b 0
f 0

4 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 6 1
A getName() 0 3 1
A getProfile() 0 3 1
B consume() 0 26 4
1
<?php
2
3
namespace JobQueue\Domain\Worker;
4
5
use JobQueue\Domain\Task\Profile;
6
use JobQueue\Domain\Task\Queue;
7
use JobQueue\Domain\Task\TaskHandler;
8
use JobQueue\Domain\Task\TaskWasFetched;
9
use Psr\Log\LoggerAwareInterface;
10
use Psr\Log\LoggerAwareTrait;
11
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
12
13
final class Worker implements LoggerAwareInterface
14
{
15
    use LoggerAwareTrait;
16
17
    /**
18
     *
19
     * @var string
20
     */
21
    private $name;
22
23
    /**
24
     *
25
     * @var Queue
26
     */
27
    private $queue;
28
29
    /**
30
     *
31
     * @var Profile
32
     */
33
    private $profile;
34
35
    /**
36
     *
37
     * @var EventDispatcherInterface
38
     */
39
    protected $eventDispatcher;
40
41
    /**
42
     *
43
     * @param string                   $name
44
     * @param Queue                    $queue
45
     * @param Profile                  $profile
46
     * @param EventDispatcherInterface $eventDispatcher
47
     */
48 3
    public function __construct(string $name, Queue $queue, Profile $profile, EventDispatcherInterface $eventDispatcher)
49
    {
50 3
        $this->name = $name;
51 3
        $this->queue = $queue;
52 3
        $this->profile = $profile;
53 3
        $this->eventDispatcher = $eventDispatcher;
54 3
    }
55
56
    /**
57
     *
58
     * @return string
59
     */
60
    public function getName(): string
61
    {
62
        return $this->name;
63
    }
64
65
    /**
66
     *
67
     * @return Profile
68
     */
69
    public function getProfile(): Profile
70
    {
71
        return $this->profile;
72
    }
73
74
    /**
75
     *
76
     * @param int|null $quantity Number of tasks to handle
77
     */
78 4
    public function consume(int $quantity = null)
79
    {
80 4
        $this->eventDispatcher->dispatch(WorkerWasStarted::NAME, new WorkerWasStarted($this));
81
82
        // Set up the Task handler which subscribe to tasks domain events
83 4
        $taskHandler = new TaskHandler($this->queue, $this->eventDispatcher);
84 4
        if ($this->logger) {
85
            $taskHandler->setLogger($this->logger);
86
        }
87 4
        $this->eventDispatcher->addSubscriber($taskHandler);
88
89 4
        $i = 0;
90 4
        while ($task = $this->queue->fetch($this->profile)) {
91 4
            $this->eventDispatcher->dispatch(TaskWasFetched::NAME, new TaskWasFetched($task));
92
93
            // Try to free memory
94 4
            unset($task);
95
96
            // Exit worker if a quantity has been set
97 4
            $i++;
98 4
            if ($quantity === $i) {
99 4
                break;
100
            }
101
        }
102
103 4
        $this->eventDispatcher->dispatch(WorkerHasFinished::NAME, new WorkerHasFinished($this));
104 4
    }
105
}
106