Completed
Pull Request — master (#1)
by Adam
06:20 queued 03:33
created

Worker::getHandler()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 13
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 1 Features 2
Metric Value
c 3
b 1
f 2
dl 0
loc 13
rs 9.4285
cc 3
eloc 7
nc 3
nop 2
1
<?php
2
3
namespace Equip\Queue;
4
5
use Equip\Queue\Driver\DriverInterface;
6
use Equip\Queue\Exception\HandlerException;
7
use Equip\Queue\Serializer\JsonSerializer;
8
use Equip\Queue\Serializer\MessageSerializerInterface;
9
use Exception;
10
11
class Worker
12
{
13
    /**
14
     * @var DriverInterface
15
     */
16
    private $driver;
17
18
    /**
19
     * @var Event
20
     */
21
    private $event;
22
23
    /**
24
     * @var MessageSerializerInterface
25
     */
26
    private $serializer;
27
28
    /**
29
     * @var array
30
     */
31
    private $handlers;
32
33
    /**
34
     * @param DriverInterface $driver
35
     * @param Event $event
36
     * @param MessageSerializerInterface $serializer
37
     * @param array $handlers
38
     */
39
    public function __construct(
40
        DriverInterface $driver,
41
        Event $event,
42
        MessageSerializerInterface $serializer = null,
43
        array $handlers = []
44
    ) {
45
        $this->driver = $driver;
46
        $this->event = $event;
47
        $this->serializer = $serializer ?: new JsonSerializer;
48
        $this->handlers = $handlers;
49
    }
50
51
    /**
52
     * Consumes messages off of the queue
53
     *
54
     * @param string $queue
55
     */
56
    public function consume($queue)
57
    {
58
        while ($this->tick($queue)) { /* NOOP */ }
1 ignored issue
show
Unused Code introduced by
This while loop is empty and can be removed.

This check looks for while loops that have no statements or where all statements have been commented out. This may be the result of changes for debugging or the code may simply be obsolete.

Consider removing the loop.

Loading history...
59
    }
60
61
    /**
62
     * Handles fetching messages from the queue
63
     *
64
     * @param string $queue
65
     *
66
     * @return bool
67
     */
68
    private function tick($queue)
69
    {
70
        $packet = $this->driver->pop($queue);
71
        if (empty($packet)) {
72
             return true;
73
        }
74
75
        $message = $this->serializer->deserialize($packet);
76
77
        $handler = $this->getHandler($message->handler(), $this->handlers);
78
        if (!$handler) {
79
            return true;
80
        }
81
82
        try {
83
            $result = call_user_func($handler, $message);
84
85
            $this->event->acknowledge($message);
86
87
            if ($result === false) {
88
                return false;
89
            }
90
        } catch (Exception $exception) {
91
            $this->event->reject($message, $exception);
92
        }
93
94
        return true;
95
    }
96
97
    /**
98
     * @param string $handler
99
     * @param array $router
100
     *
101
     * @return null|callable
102
     * @throws HandlerException If handler is not callable
103
     */
104
    private function getHandler($handler, array $router = [])
105
    {
106
        if (!isset($router[$handler])) {
107
            return null;
108
        }
109
110
        $callable = $router[$handler];
111
        if (!is_callable($callable)) {
112
            throw HandlerException::invalidHandler($handler);
113
        }
114
115
        return $callable;
116
    }
117
}
118