Completed
Push — master ( 17b364...ed10f9 )
by Peter
02:07
created

PredisSubscribeCommandQueue::unsubscribe()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 12
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 12
ccs 0
cts 6
cp 0
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 6
nc 2
nop 1
crap 6
1
<?php
2
3
/**
4
 * GpsLab component.
5
 *
6
 * @author    Peter Gribanov <[email protected]>
7
 * @copyright Copyright (c) 2011, Peter Gribanov
8
 * @license   http://opensource.org/licenses/MIT
9
 */
10
11
namespace GpsLab\Component\Command\Queue\Subscribe;
12
13
use GpsLab\Component\Command\Command;
14
use GpsLab\Component\Command\Queue\Serializer\Serializer;
15
use Psr\Log\LoggerInterface;
16
use Superbalist\PubSub\Redis\RedisPubSubAdapter;
17
18
class PredisSubscribeCommandQueue implements SubscribeCommandQueue
19
{
20
    /**
21
     * @var RedisPubSubAdapter
22
     */
23
    private $client;
24
25
    /**
26
     * @var Serializer
27
     */
28
    private $serializer;
29
30
    /**
31
     * @var LoggerInterface
32
     */
33
    private $logger;
34
35
    /**
36
     * @var callable[]
37
     */
38
    private $handlers = [];
39
40
    /**
41
     * @var string
42
     */
43
    private $queue_name = '';
44
45
    /**
46
     * @var bool
47
     */
48
    private $subscribed = false;
49
50
    /**
51
     * @param RedisPubSubAdapter $client
52
     * @param Serializer         $serializer
53
     * @param LoggerInterface    $logger
54
     * @param string             $queue_name
55
     */
56
    public function __construct(
57
        RedisPubSubAdapter $client,
58
        Serializer $serializer,
59
        LoggerInterface $logger,
60
        $queue_name
61
    ) {
62
        $this->client = $client;
63
        $this->serializer = $serializer;
64
        $this->logger = $logger;
65
        $this->queue_name = $queue_name;
66
    }
67
68
    /**
69
     * Publish command to queue.
70
     *
71
     * @param Command $command
72
     *
73
     * @return bool
74
     */
75
    public function publish(Command $command)
76
    {
77
        $massage = $this->serializer->serialize($command);
78
        $this->client->publish($this->queue_name, $massage);
79
80
        return true;
81
    }
82
83
    /**
84
     * Subscribe on command queue.
85
     *
86
     * @param callable $handler
87
     */
88
    public function subscribe(callable $handler)
89
    {
90
        $this->handlers[] = $handler;
91
92
        // laze subscribe
93
        if (!$this->subscribed) {
94
            $this->client->subscribe($this->queue_name, function ($message) {
95
                $this->handle($message);
96
            });
97
            $this->subscribed = true;
98
        }
99
    }
100
101
    /**
102
     * Unsubscribe on command queue.
103
     *
104
     * @param callable $handler
105
     *
106
     * @return bool
107
     */
108
    public function unsubscribe(callable $handler)
109
    {
110
        $index = array_search($handler, $this->handlers);
111
112
        if ($index === false) {
113
            return false;
114
        }
115
116
        unset($this->handlers[$index]);
117
118
        return true;
119
    }
120
121
    /**
122
     * @param mixed $message
123
     */
124
    private function handle($message)
125
    {
126
        try {
127
            $command = $this->serializer->deserialize($message);
128
        } catch (\Exception $e) { // catch only deserialize exception
129
            // it's a critical error
130
            // it is necessary to react quickly to it
131
            $this->logger->critical('Failed denormalize a command in the Redis queue', [$message, $e->getMessage()]);
132
133
            // try denormalize in later
134
            $this->client->publish($this->queue_name, $message);
135
136
            return; // no command for handle
137
        }
138
139
        foreach ($this->handlers as $handler) {
140
            call_user_func($handler, $command);
141
        }
142
    }
143
}
144