Completed
Pull Request — master (#9)
by Adam
04:10 queued 01:38
created

Worker::tick()   A

Complexity

Conditions 4
Paths 6

Size

Total Lines 20
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 7
Bugs 1 Features 4
Metric Value
c 7
b 1
f 4
dl 0
loc 20
ccs 0
cts 17
cp 0
rs 9.2
cc 4
eloc 12
nc 6
nop 1
crap 20
1
<?php
2
3
namespace Equip\Queue;
4
5
use Equip\Queue\Driver\DriverInterface;
6
use Equip\Queue\Command\CommandFactoryInterface;
7
use Exception;
8
use Psr\Log\LoggerInterface;
9
10
class Worker
11
{
12
    /**
13
     * @var DriverInterface
14
     */
15
    private $driver;
16
17
    /**
18
     * @var Event
19
     */
20
    private $event;
21
22
    /**
23
     * @var LoggerInterface
24
     */
25
    private $logger;
26
27
    /**
28
     * @var CommandFactoryInterface
29
     */
30
    private $commands;
31
32
    /**
33
     * @param DriverInterface $driver
34
     * @param Event $event
35
     * @param LoggerInterface $logger
36
     * @param CommandFactoryInterface $commands
37
     */
38
    public function __construct(
39
        DriverInterface $driver,
40
        Event $event,
41
        LoggerInterface $logger,
42
        CommandFactoryInterface $commands
43
    ) {
44
        $this->driver = $driver;
45
        $this->event = $event;
46
        $this->logger = $logger;
47
        $this->commands = $commands;
48
    }
49
50
    /**
51
     * Consumes messages off of the queue
52
     *
53
     * @param string $queue
54
     */
55
    public function consume($queue)
56
    {
57
        while ($this->tick($queue)) { /* NOOP */ }
58
    }
59
60
    /**
61
     * Handles fetching messages from the queue
62
     *
63
     * @param string $queue
64
     *
65
     * @return bool
66
     */
67
    protected function tick($queue)
68
    {
69
        $packet = $this->driver->dequeue($queue);
70
        if (empty($packet)) {
71
            return true;
72
        }
73
74
        try {
75
            $options = unserialize($packet);
76
77
            if ($this->invoke($options) === false) {
78
                $this->jobShutdown($options);
79
                return false;
80
            }
81
        } catch (Exception $exception) {
82
            $this->jobException($options, $exception);
83
        }
84
85
        return true;
86
    }
87
88
    /**
89
     * Invoke the options command
90
     *
91
     * @param AbstractOptions $options
92
     *
93
     * @return null|bool
94
     */
95
    private function invoke(AbstractOptions $options)
96
    {
97
        $this->jobStart($options);
98
99
        $result = $this->commands
100
            ->make($options->command())
101
            ->withOptions($options)
102
            ->execute();
103
104
        $this->jobFinish($options);
105
106
        return $result;
107
    }
108
109
    /**
110
     * Handles actions related to a job starting
111
     *
112
     * @param AbstractOptions $options
113
     */
114
    private function jobStart(AbstractOptions $options)
115
    {
116
        $this->event->acknowledge($options);
117
        $this->logger->info(sprintf('`%s` job started', $options->command()));
118
    }
119
120
    /**
121
     * Handles actions related to a job finishing
122
     *
123
     * @param AbstractOptions $options
124
     */
125
    private function jobFinish(AbstractOptions $options)
126
    {
127
        $this->event->finish($options);
128
        $this->logger->info(sprintf('`%s` job finished', $options->command()));
129
    }
130
131
    /**
132
     * Handles actions related to a job shutting down the consumer
133
     *
134
     * @param AbstractOptions $options
135
     */
136
    private function jobShutdown(AbstractOptions $options)
137
    {
138
        $this->logger->notice(sprintf('shutting down by request of `%s`', $options->command()));
139
    }
140
141
    /**
142
     * Handles actions related to job exceptions
143
     *
144
     * @param AbstractOptions $options
145
     * @param Exception $exception
146
     */
147
    private function jobException(AbstractOptions $options, Exception $exception)
148
    {
149
        $this->logger->error($exception->getMessage());
150
        $this->event->reject($options, $exception);
151
    }
152
}
153