Gateway   A
last analyzed

Complexity

Total Complexity 21

Size/Duplication

Total Lines 172
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 0

Test Coverage

Coverage 98.59%

Importance

Changes 0
Metric Value
wmc 21
lcom 1
cbo 0
dl 0
loc 172
ccs 70
cts 71
cp 0.9859
rs 10
c 0
b 0
f 0

9 Methods

Rating   Name   Duplication   Size   Complexity  
A deleteEvent() 0 21 4
A addEvent() 0 12 1
A addJob() 0 18 2
A getEventsByType() 0 4 1
A restoreJob() 0 8 1
A countAllJobs() 0 7 1
A getFromTimesQueue() 0 11 3
A getFromImmediateQueue() 0 12 4
A getEventsByTypeGenerator() 0 21 4
1
<?php
2
3
namespace BrainExe\Core\MessageQueue;
4
5
use BrainExe\Core\Annotations\Service;
6
use BrainExe\Core\EventDispatcher\AbstractEvent;
7
use BrainExe\Core\Traits\IdGeneratorTrait;
8
use BrainExe\Core\Traits\RedisTrait;
9
use BrainExe\Core\Traits\TimeTrait;
10
use Generator;
11
12
/**
13
 * @api
14
 * @Service
15
 */
16
class Gateway
17
{
18
19
    use TimeTrait;
20
    use RedisTrait;
21
    use IdGeneratorTrait;
22
23
    const QUEUE_DELAYED   = 'message_queue:delayed';
24
    const QUEUE_IMMEDIATE = 'message_queue:immediate';
25
    const META_DATA       = 'message_queue:meta_data';
26
    const RETRY_TIME      = 3600; // try again after 1 hour
27
28
    /**
29
     * @param string $eventId
30
     * @param string $eventType
31
     * @return bool success
32
     */
33 3
    public function deleteEvent(string $eventId, string $eventType = null) : bool
34
    {
35 3
        $eventId = sprintf('%s:%s', $eventType, $eventId);
36
37 3
        $redis = $this->getRedis();
38 3
        $delayed = $redis->zrem(self::QUEUE_DELAYED, $eventId);
39 3
        if ($delayed) {
40 1
            $redis->hdel(self::META_DATA, [$eventId]);
41 1
            return true;
42
        }
43
44 2
        $immediate = $this->getRedis()->lrange(self::QUEUE_IMMEDIATE, 0, 100);
45 2
        foreach ($immediate as $rawJob) {
46 1
            list($jobId) = explode('#', $rawJob, 2);
47 1
            if (strpos($jobId, "$eventId") === 0) {
0 ignored issues
show
Coding Style Best Practice introduced by
As per coding-style, please use concatenation or sprintf for the variable $eventId instead of interpolation.

It is generally a best practice as it is often more readable to use concatenation instead of interpolation for variables inside strings.

// Instead of
$x = "foo $bar $baz";

// Better use either
$x = "foo " . $bar . " " . $baz;
$x = sprintf("foo %s %s", $bar, $baz);
Loading history...
48 1
                return (bool)$this->redis->lrem(self::QUEUE_IMMEDIATE, 1, $rawJob);
49
            }
50
        }
51
52 1
        return false;
53
    }
54
55
    /**
56
     * @param AbstractEvent $event
57
     * @param int $timestamp
58
     * @return Job
59
     */
60 2
    public function addEvent(AbstractEvent $event, int $timestamp = 0) : Job
61
    {
62 2
        $jobId = $this->generateUniqueId('jobid:' . $event->getEventName());
63 2
        $jobId = sprintf('%s:%s', $event->getEventName(), $jobId);
64
65 2
        $job = new Job($event, $jobId, $timestamp);
66 2
        $job->startTime = $this->now();
67
68 2
        $this->addJob($job);
69
70 2
        return $job;
71
    }
72
73
    /**
74
     * @param Job $job
75
     */
76 3
    public function addJob(Job $job)
77
    {
78 3
        $serialized = base64_encode(serialize($job));
79
80 3
        $pipeline = $this->getRedis()->pipeline(['fire-and-forget' => true]);
81 3
        if (empty($job->timestamp)) {
82
            // immediate execution in background
83 1
            $pipeline->lpush(self::QUEUE_IMMEDIATE, $job->jobId . '#' . $serialized);
84
        } else {
85
            // delayed execution
86 2
            $pipeline->hset(self::META_DATA, $job->jobId, $serialized);
87 2
            $pipeline->zadd(self::QUEUE_DELAYED, [
88 2
                $job->jobId => (int)$job->timestamp
89
            ]);
90
        }
91
92 3
        $pipeline->execute();
93 3
    }
94
95
    /**
96
     * @param string $eventType
97
     * @param int $since
98
     * @return Job[]
99
     */
100 3
    public function getEventsByType(string $eventType = null, int $since = 0) : array
101
    {
102 3
        return iterator_to_array($this->getEventsByTypeGenerator($eventType, $since));
103
    }
104
105
    /**
106
     * @param string $eventType
107
     * @param int $since
108
     * @return Generator|Job[]
109
     */
110 3
    public function getEventsByTypeGenerator(string $eventType = null, int $since = 0) : Generator
111
    {
112 3
        $redis = $this->getRedis();
113
114 3
        $resultRaw = $redis->zrangebyscore(
115 3
            self::QUEUE_DELAYED,
116
            $since,
117 3
            '+inf',
118 3
            ['withscores' => true]
119
        );
120
121 3
        $keys = [];
122 3
        foreach ($resultRaw as $jobId => $timestamp) {
123 2
            if (empty($eventType) || strpos($jobId, $eventType . ':') === 0) {
124 2
                $keys[$jobId] = $timestamp;
125
            }
126
        }
127
128 3
        yield from $this->getFromTimesQueue($keys);
129 3
        yield from $this->getFromImmediateQueue($eventType);
130 3
    }
131
132
    /**
133
     * @param Job $job
134
     */
135 1
    public function restoreJob(Job $job)
136
    {
137 1
        $now = $this->now();
138
139 1
        $job->timestamp = $now + self::RETRY_TIME;
140 1
        $job->errorCounter++;
141 1
        $this->addJob($job);
142 1
    }
143
144
    /**
145
     * @return int
146
     */
147 1
    public function countAllJobs() : int
148
    {
149 1
        $delayed   = $this->getRedis()->zcard(self::QUEUE_DELAYED);
150 1
        $immediate = $this->getRedis()->llen(self::QUEUE_IMMEDIATE);
151
152 1
        return $delayed + $immediate;
153
    }
154
155
    /**
156
     * @param array $keys
157
     * @return Generator
158
     */
159 3
    private function getFromTimesQueue(array $keys) : Generator
160
    {
161 3
        if (!empty($keys)) {
162 2
            $events = $this->getRedis()->hmget(self::META_DATA, array_keys($keys));
163 2
            foreach ($events as $jobId => $rawJob) {
164
                /** @var Job $job */
165 2
                $job = unserialize(base64_decode($rawJob));
166 2
                yield $job->jobId => $job;
167
            }
168
        }
169 3
    }
170
171
    /**
172
     * @param string $eventType
173
     * @return Generator
174
     */
175 3
    private function getFromImmediateQueue(string $eventType = null) : Generator
176
    {
177 3
        $immediate = $this->getRedis()->lrange(self::QUEUE_IMMEDIATE, 0, 100);
178 3
        foreach ($immediate as $rawJob) {
179 1
            list($jobId, $rawJob) = explode('#', $rawJob, 2);
180 1
            if (empty($eventType) || strpos($jobId, $eventType . ':') === 0) {
181
                /** @var Job $job */
182 1
                $job = unserialize(base64_decode($rawJob));
183 1
                yield $job->jobId => $job;
184
            }
185
        }
186 3
    }
187
}
188