PredisSubscribeCommandQueue::unsubscribe()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 12
ccs 6
cts 6
cp 1
rs 9.8666
c 0
b 0
f 0
cc 2
nc 2
nop 1
crap 2
1
<?php
2
declare(strict_types=1);
3
4
/**
5
 * GpsLab component.
6
 *
7
 * @author    Peter Gribanov <[email protected]>
8
 * @copyright Copyright (c) 2011, Peter Gribanov
9
 * @license   http://opensource.org/licenses/MIT
10
 */
11
12
namespace GpsLab\Component\Command\Queue\Subscribe;
13
14
use GpsLab\Component\Command\Command;
15
use GpsLab\Component\Command\Queue\Serializer\Serializer;
16
use Psr\Log\LoggerInterface;
17
use Superbalist\PubSub\Redis\RedisPubSubAdapter;
18
19
class PredisSubscribeCommandQueue implements SubscribeCommandQueue
20
{
21
    /**
22
     * @var RedisPubSubAdapter
23
     */
24
    private $client;
25
26
    /**
27
     * @var Serializer
28
     */
29
    private $serializer;
30
31
    /**
32
     * @var LoggerInterface
33
     */
34
    private $logger;
35
36
    /**
37
     * @var callable[]
38
     */
39
    private $handlers = [];
40
41
    /**
42
     * @var string
43
     */
44
    private $queue_name;
45
46
    /**
47
     * @var bool
48
     */
49
    private $subscribed = false;
50
51
    /**
52
     * @param RedisPubSubAdapter $client
53
     * @param Serializer         $serializer
54
     * @param LoggerInterface    $logger
55
     * @param string             $queue_name
56
     */
57 6
    public function __construct(
58
        RedisPubSubAdapter $client,
59
        Serializer $serializer,
60
        LoggerInterface $logger,
61
        string $queue_name
62
    ) {
63 6
        $this->client = $client;
64 6
        $this->serializer = $serializer;
65 6
        $this->logger = $logger;
66 6
        $this->queue_name = $queue_name;
67 6
    }
68
69
    /**
70
     * Publish command to queue.
71
     *
72
     * @param Command $command
73
     *
74
     * @return bool
75
     */
76 1
    public function publish(Command $command): bool
77
    {
78 1
        $massage = $this->serializer->serialize($command);
79 1
        $this->client->publish($this->queue_name, $massage);
80
81 1
        return true;
82
    }
83
84
    /**
85
     * Subscribe on command queue.
86
     *
87
     * @param callable $handler
88
     */
89 5
    public function subscribe(callable $handler): void
90
    {
91 5
        $this->handlers[] = $handler;
92
93
        // laze subscribe
94 5
        if (!$this->subscribed) {
95
            $this->client->subscribe($this->queue_name, function ($message): void {
96 4
                $this->handle($message);
97 5
            });
98 4
            $this->subscribed = true;
99
        }
100 4
    }
101
102
    /**
103
     * Unsubscribe on command queue.
104
     *
105
     * @param callable $handler
106
     *
107
     * @return bool
108
     */
109 1
    public function unsubscribe(callable $handler): bool
110
    {
111 1
        $index = array_search($handler, $this->handlers);
112
113 1
        if (false === $index) {
114 1
            return false;
115
        }
116
117 1
        unset($this->handlers[$index]);
118
119 1
        return true;
120
    }
121
122
    /**
123
     * @param mixed $message
124
     */
125 4
    private function handle($message): void
126
    {
127
        try {
128 4
            $command = $this->serializer->deserialize($message);
129
130 3
            if (!($command instanceof Command)) {
131 3
                throw new \RuntimeException(sprintf('The denormalization command is expected "%s", got "%s" inside.', Command::class, get_class($command)));
132
            }
133 2
        } catch (\Exception $e) { // catch only deserialize exception
134
            // it's a critical error
135
            // it is necessary to react quickly to it
136 2
            $this->logger->critical('Failed denormalize a command in the Redis queue', [$message, $e->getMessage()]);
137
138
            // try denormalize in later
139 2
            $this->client->publish($this->queue_name, $message);
140
141 2
            return; // no command for handle
142
        }
143
144 2
        foreach ($this->handlers as $handler) {
145 2
            $handler($command);
146
        }
147 1
    }
148
}
149