Completed
Push — master ( eff446...e0c858 )
by Adam
12:41
created

Worker::bindSignals()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 14
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 14
c 0
b 0
f 0
ccs 0
cts 0
cp 0
rs 9.4285
cc 1
eloc 7
nc 1
nop 0
crap 2
1
<?php
2
3
namespace Equip\Queue;
4
5
use Equip\Queue\Driver\DriverInterface;
6
use Exception;
7
use League\Tactician\CommandBus;
8
9
class Worker
10
{
11
    /**
12
     * @var DriverInterface
13
     */
14
    private $driver;
15
16
    /**
17
     * @var Queue
18
     */
19
    private $queue;
20
21
    /**
22
     * @var Event
23
     */
24
    private $event;
25
26
    /**
27
     * @var CommandBus
28
     */
29
    private $command_bus;
30
31
    /**
32
     * Will shutdown the worker on the next tick
33
     *
34
     * @var bool
35
     */
36
    private $shutdown = false;
37
38
    /**
39
     * Will shutdown the worker when the queue is drained
40
     *
41
     * @var bool
42
     */
43
    private $drain = false;
44
45 7
    public function __construct(
46
        DriverInterface $driver,
47
        Queue $queue,
48
        Event $event,
49
        CommandBus $command_bus
50
    ) {
51 7
        $this->driver = $driver;
52 7
        $this->queue = $queue;
53 7
        $this->event = $event;
54 7
        $this->command_bus = $command_bus;
55 7
    }
56
57
    /**
58
     * Consumes messages off of the queue
59
     *
60
     * @codeCoverageIgnore
61
     *
62
     * @param string $queue
63
     */
64
    public function consume($queue)
65
    {
66
        $this->bindSignals();
67
68
        while ($this->tick($queue)) {
69
            pcntl_signal_dispatch();
70
        }
71
    }
72
73
    /**
74
     * Handles fetching messages from the queue
75
     *
76
     * @param string $queue
77
     *
78
     * @return bool
79
     */
80 5
    protected function tick($queue)
81
    {
82 5
        if ($this->shutdown) {
83 1
            $this->event->shutdown();
84 1
            return false;
85
        }
86
87 4
        $message = $this->driver->dequeue($queue);
88 4
        if (empty($message) && $this->drain) {
89 1
            $this->event->drained();
90 1
            return false;
91 3
        } elseif (empty($message)) {
92 1
            return true;
93
        }
94
95 2
        $command = unserialize($message);
96
        try {
97 2
            $this->event->acknowledge($command);
98 2
            $this->command_bus->handle($command);
99 1
            $this->event->finish($command);
100 2
        } catch (Exception $exception) {
101 1
            $this->queue->add(sprintf('%s-failed', $queue), $command);
102 1
            $this->event->reject($command, $exception);
103
        }
104
105 2
        return true;
106
    }
107
108
    /**
109
     * Handles binding POSIX signals appropriately
110
     *
111
     * @codeCoverageIgnore
112
     */
113
    private function bindSignals()
114
    {
115
        // Shutdown the listener
116
        array_map(function ($signal) {
117
            pcntl_signal($signal, [$this, 'shutdown']);
118
        }, [
119
            SIGTERM,
120
            SIGINT,
121
            SIGQUIT,
122
        ]);
123
124
        // Drain the queue
125
        pcntl_signal(SIGHUP, [$this, 'drain']);
126
    }
127
128
    /**
129
     * Set the worker to shutdown on the next tick
130
     */
131 1
    private function shutdown()
132
    {
133 1
        $this->shutdown = true;
134 1
    }
135
136
    /**
137
     * Set the worker to shutdown when the queue is drained
138
     */
139 1
    private function drain()
140
    {
141 1
        $this->drain = true;
142 1
    }
143
}
144