Completed
Push — master ( fbff81...a7cdb2 )
by Peter
08:05
created

PredisCommandQueue::publish()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 7
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 7
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 4
nc 1
nop 1
1
<?php
2
3
/**
4
 * GpsLab component.
5
 *
6
 * @author    Peter Gribanov <[email protected]>
7
 * @copyright Copyright (c) 2011, Peter Gribanov
8
 * @license   http://opensource.org/licenses/MIT
9
 */
10
11
namespace GpsLab\Component\Command\Queue\PubSub;
12
13
use GpsLab\Component\Command\Command;
14
use Psr\Log\LoggerInterface;
15
use Superbalist\PubSub\Redis\RedisPubSubAdapter;
16
use Symfony\Component\Serializer\SerializerInterface;
17
18
class PredisCommandQueue implements CommandQueue
19
{
20
    const FORMAT = 'predis';
21
22
    /**
23
     * @var RedisPubSubAdapter
24
     */
25
    private $client;
26
27
    /**
28
     * @var SerializerInterface
29
     */
30
    private $serializer;
31
32
    /**
33
     * @var LoggerInterface
34
     */
35
    private $logger;
36
37
    /**
38
     * @var string
39
     */
40
    private $queue_name = '';
41
42
    /**
43
     * @param RedisPubSubAdapter  $client
44
     * @param SerializerInterface $serializer
45
     * @param LoggerInterface     $logger
46
     * @param string              $queue_name
47
     */
48
    public function __construct(
49
        RedisPubSubAdapter $client,
50
        SerializerInterface $serializer,
51
        LoggerInterface $logger,
52
        $queue_name
53
    ) {
54
        $this->client = $client;
55
        $this->serializer = $serializer;
56
        $this->logger = $logger;
57
        $this->queue_name = $queue_name;
58
    }
59
60
    /**
61
     * Publish command to queue.
62
     *
63
     * @param Command $command
64
     *
65
     * @return bool
66
     */
67
    public function publish(Command $command)
68
    {
69
        $massage = $this->serializer->serialize($command, self::FORMAT);
70
        $this->client->publish($this->queue_name, $massage);
71
72
        return true;
73
    }
74
75
    /**
76
     * Subscribe on command queue.
77
     *
78
     * @param callable $handler
79
     */
80
    public function subscribe(callable $handler)
81
    {
82
        $this->client->subscribe($this->queue_name, function ($message) use ($handler) {
83
            try {
84
                $command = $this->serializer->deserialize($message, Command::class, self::FORMAT);
85
            } catch (\Exception $e) { // catch only deserialize exception
86
                // it's a critical error
87
                // it is necessary to react quickly to it
88
                $this->logger->critical(
89
                    'Failed denormalize a command in the Redis queue',
90
                    [$message, $e->getMessage()]
91
                );
92
93
                // try denormalize in later
94
                $this->client->publish($this->queue_name, $message);
95
            }
96
97
            if (isset($command)) {
98
                call_user_func($handler, $command);
99
            }
100
        });
101
    }
102
}
103