Worker::drain()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
c 0
b 0
f 0
ccs 3
cts 3
cp 1
rs 10
cc 1
eloc 2
nc 1
nop 0
crap 1
1
<?php
2
3
namespace Equip\Queue;
4
5
use Equip\Queue\Driver\DriverInterface;
6
use Exception;
7
use League\Tactician\CommandBus;
8
9
class Worker
10
{
11
    /**
12
     * @var DriverInterface
13
     */
14
    private $driver;
15
16
    /**
17
     * @var Queue
18
     */
19
    private $queue;
20
21
    /**
22
     * @var Event
23
     */
24
    private $event;
25
26
    /**
27
     * @var CommandBus
28
     */
29
    private $command_bus;
30
31
    /**
32
     * Will shutdown the worker on the next tick
33
     *
34
     * @var bool
35
     */
36
    private $shutdown = false;
37
38
    /**
39
     * Will shutdown the worker when the queue is drained
40
     *
41
     * @var bool
42
     */
43
    private $drain = false;
44
45 7
    public function __construct(
46
        DriverInterface $driver,
47
        Queue $queue,
48
        Event $event,
49
        CommandBus $command_bus
50
    ) {
51 7
        $this->driver = $driver;
52 7
        $this->queue = $queue;
53 7
        $this->event = $event;
54 7
        $this->command_bus = $command_bus;
55 7
    }
56
57
    /**
58
     * Consumes messages off of the queue
59
     *
60
     * @codeCoverageIgnore
61
     *
62
     * @param string $queue
63
     */
64
    public function consume($queue)
65
    {
66
        $this->bindSignals();
67
68
        while ($this->tick($queue)) {
69
            pcntl_signal_dispatch();
70
        }
71
    }
72
73
    /**
74
     * Handles fetching messages from the queue
75
     *
76
     * @param string $queue
77
     *
78
     * @return bool
79
     */
80 5
    protected function tick($queue)
81
    {
82 5
        if ($this->shutdown) {
83 1
            $this->event->shutdown();
84 1
            return false;
85
        }
86
87 4
        list($command, $job) = $this->driver->dequeue($queue);
88 4
        if (empty($command) && $this->drain) {
89 1
            $this->event->drained();
90 1
            return false;
91 3
        } elseif (empty($command)) {
92 1
            return true;
93
        }
94
95
        try {
96 2
            $this->event->acknowledge($command);
97 2
            $this->command_bus->handle($command);
98 1
            $this->event->finish($command);
99 2
        } catch (Exception $exception) {
100 1
            $this->queue->add(sprintf('%s-failed', $queue), $command);
101 1
            $this->event->reject($command, $exception);
102 2
        } finally {
103 2
            $this->driver->processed($job);
104
        }
105
106 2
        return true;
107
    }
108
109
    /**
110
     * Handles binding POSIX signals appropriately
111
     *
112
     * @codeCoverageIgnore
113
     */
114
    private function bindSignals()
115
    {
116
        // Shutdown the listener
117
        array_map(function ($signal) {
118
            pcntl_signal($signal, [$this, 'shutdown']);
119
        }, [
120
            SIGTERM,
121
            SIGINT,
122
            SIGQUIT,
123
        ]);
124
125
        // Drain the queue
126
        pcntl_signal(SIGHUP, [$this, 'drain']);
127
    }
128
129
    /**
130
     * Set the worker to shutdown on the next tick
131
     */
132 1
    public function shutdown()
133
    {
134 1
        $this->shutdown = true;
135 1
    }
136
137
    /**
138
     * Set the worker to shutdown when the queue is drained
139
     */
140 1
    public function drain()
141
    {
142 1
        $this->drain = true;
143 1
    }
144
}
145