Code Duplication    Length = 73-76 lines in 2 locations

src/Queue/PredisEventQueue.php 1 location

@@ 17-89 (lines=73) @@
14
use Psr\Log\LoggerInterface;
15
use Symfony\Component\Serializer\Serializer;
16
17
class PredisEventQueue implements EventQueueInterface
18
{
19
    const SET_KEY = 'events';
20
    const FORMAT = 'predis';
21
22
    /**
23
     * @var Client
24
     */
25
    private $client;
26
27
    /**
28
     * @var Serializer
29
     */
30
    private $serializer;
31
32
    /**
33
     * @var LoggerInterface
34
     */
35
    private $logger;
36
37
    /**
38
     * @param Client $client
39
     * @param Serializer $serializer
40
     * @param LoggerInterface $logger
41
     */
42
    public function __construct(Client $client, Serializer $serializer, LoggerInterface $logger)
43
    {
44
        $this->client = $client;
45
        $this->serializer = $serializer;
46
        $this->logger = $logger;
47
    }
48
49
    /**
50
     * Push event to queue.
51
     *
52
     * @param EventInterface $event
53
     *
54
     * @return bool
55
     */
56
    public function push(EventInterface $event)
57
    {
58
        $value = $this->serializer->normalize($event, self::FORMAT);
59
60
        return (bool) $this->client->rpush(self::SET_KEY, [$value]);
61
    }
62
63
    /**
64
     * Pop event from queue. Return NULL if queue is empty.
65
     *
66
     * @return EventInterface|null
67
     */
68
    public function pop()
69
    {
70
        $value = $this->client->lpop(self::SET_KEY);
71
72
        if (!$value) {
73
            return null;
74
        }
75
76
        try {
77
            return $this->serializer->denormalize($value, EventInterface::class, self::FORMAT);
78
        } catch (\Exception $e) {
79
            // it's a critical error
80
            // it is necessary to react quickly to it
81
            $this->logger->critical('Failed denormalize a event in the Redis queue', [$value, $e->getMessage()]);
82
83
            // try denormalize in later
84
            $this->client->rpush(self::SET_KEY, [$value]);
85
86
            return null;
87
        }
88
    }
89
}
90

src/Queue/PredisUniqueEventQueue.php 1 location

@@ 17-92 (lines=76) @@
14
use Psr\Log\LoggerInterface;
15
use Symfony\Component\Serializer\Serializer;
16
17
class PredisUniqueEventQueue implements EventQueueInterface
18
{
19
    const LIST_KEY = 'unique_events';
20
    const FORMAT = 'predis';
21
22
    /**
23
     * @var Client
24
     */
25
    private $client;
26
27
    /**
28
     * @var Serializer
29
     */
30
    private $serializer;
31
32
    /**
33
     * @var LoggerInterface
34
     */
35
    private $logger;
36
37
    /**
38
     * @param Client $client
39
     * @param Serializer $serializer
40
     * @param LoggerInterface $logger
41
     */
42
    public function __construct(Client $client, Serializer $serializer, LoggerInterface $logger)
43
    {
44
        $this->client = $client;
45
        $this->serializer = $serializer;
46
        $this->logger = $logger;
47
    }
48
49
    /**
50
     * Push event to queue.
51
     *
52
     * @param EventInterface $event
53
     *
54
     * @return bool
55
     */
56
    public function push(EventInterface $event)
57
    {
58
        $value = $this->serializer->normalize($event, self::FORMAT);
59
60
        // remove already exists value to remove duplication
61
        $this->client->lrem(self::LIST_KEY, 0, $value);
62
63
        return (bool) $this->client->rpush(self::LIST_KEY, [$value]);
64
    }
65
66
    /**
67
     * Pop event from queue. Return NULL if queue is empty.
68
     *
69
     * @return EventInterface|null
70
     */
71
    public function pop()
72
    {
73
        $value = $this->client->lpop(self::LIST_KEY);
74
75
        if (!$value) {
76
            return null;
77
        }
78
79
        try {
80
            return $this->serializer->denormalize($value, EventInterface::class, self::FORMAT);
81
        } catch (\Exception $e) {
82
            // it's a critical error
83
            // it is necessary to react quickly to it
84
            $this->logger->critical('Failed denormalize a event in the Redis queue', [$value, $e->getMessage()]);
85
86
            // try denormalize in later
87
            $this->client->rpush(self::LIST_KEY, [$value]);
88
89
            return null;
90
        }
91
    }
92
}
93