Completed
Push — master ( 2f7a99...5fa5de )
by Matze
10:42
created

Gateway::addJob()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 16
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 16
rs 9.4286
ccs 9
cts 9
cp 1
cc 2
eloc 9
nc 2
nop 1
crap 2
1
<?php
2
3
namespace BrainExe\Core\MessageQueue;
4
5
use BrainExe\Annotations\Annotations\Service;
6
use BrainExe\Core\EventDispatcher\AbstractEvent;
7
use BrainExe\Core\Traits\IdGeneratorTrait;
8
use BrainExe\Core\Traits\RedisTrait;
9
use BrainExe\Core\Traits\TimeTrait;
10
use Generator;
11
12
/**
13
 * @api
14
 * @Service("MessageQueue.Gateway", public=false)
15
 */
16
class Gateway
17
{
18
19
    use TimeTrait;
20
    use RedisTrait;
21
    use IdGeneratorTrait;
22
23
    const QUEUE_DELAYED   = 'message_queue:delayed';
24
    const QUEUE_IMMEDIATE = 'message_queue:immediate';
25
    const META_DATA       = 'message_queue:meta_data';
26
    const RETRY_TIME      = 3600; // try again after 20 seconds
27
28
    /**
29
     * @param integer $eventId
30
     * @param null $eventType
31
     * @return bool success
32
     */
33 3
    public function deleteEvent($eventId, $eventType = null)
34
    {
35 3
        if ($eventType) {
36 3
            $eventId = sprintf('%s:%s', $eventType, $eventId);
37
        }
38
39 3
        $redis = $this->getRedis();
40 3
        $delayed = $redis->ZREM(self::QUEUE_DELAYED, $eventId);
41 3
        if ($delayed) {
42 1
            $redis->HDEL(self::META_DATA, $eventId);
43 1
            return true;
44
        }
45
46 2
        $immediate = $this->getRedis()->lrange(self::QUEUE_IMMEDIATE, 0, 100);
47 2
        foreach ($immediate as $rawJob) {
48 1
            list($jobId) = explode('#', $rawJob, 2);
49 1
            if (strpos($jobId, "$eventId") === 0) {
0 ignored issues
show
Coding Style Best Practice introduced by
As per coding-style, please use concatenation or sprintf for the variable $eventId instead of interpolation.

It is generally a best practice as it is often more readable to use concatenation instead of interpolation for variables inside strings.

// Instead of
$x = "foo $bar $baz";

// Better use either
$x = "foo " . $bar . " " . $baz;
$x = sprintf("foo %s %s", $bar, $baz);
Loading history...
50 1
                $result = $this->redis->lrem(self::QUEUE_IMMEDIATE, 1, $rawJob);
51 1
                return (bool)$result;
52
            }
53
        }
54
55 1
        return false;
56
    }
57
58
    /**
59
     * @param AbstractEvent $event
60
     * @param integer $timestamp
61
     * @return string
62
     */
63 2
    public function addEvent(AbstractEvent $event, $timestamp = 0)
64
    {
65 2
        $randomId = $this->generateUniqueId();
66 2
        $jobId    = sprintf('%s:%s', $event->eventName, $randomId);
67
68 2
        $job = new Job($event, $jobId, $timestamp);
69
70 2
        $this->addJob($job);
71 2
    }
72
73
    /**
74
     * @param Job $job
75
     */
76 3
    public function addJob(Job $job)
77
    {
78 3
        $serialized = base64_encode(serialize($job));
79
80 3
        $pipeline = $this->getRedis()->pipeline(['fire-and-forget' => true]);
81 3
        if (empty($job->timestamp)) {
82
            // immediate execution in background
83 1
            $pipeline->LPUSH(self::QUEUE_IMMEDIATE, $job->jobId . '#' . $serialized);
84
        } else {
85
            // delayed execution
86 2
            $pipeline->HSET(self::META_DATA, $job->jobId, $serialized);
87 2
            $pipeline->ZADD(self::QUEUE_DELAYED, (int)$job->timestamp, $job->jobId);
88
        }
89
90 3
        $pipeline->execute();
91 3
    }
92
93
    /**
94
     * @param string $eventType
95
     * @param int $since
96
     * @return Job[]
97
     */
98 3
    public function getEventsByType($eventType = null, $since = 0)
99
    {
100 3
        return iterator_to_array($this->getEventsByTypeGenerator($eventType, $since));
101
    }
102
103
    /**
104
     * @param string $eventType
105
     * @param integer $since
106
     * @return Generator|Job[]
107
     */
108 3
    public function getEventsByTypeGenerator($eventType = null, $since = 0)
109
    {
110 3
        $redis = $this->getRedis();
111
112 3
        $resultRaw = $redis->ZRANGEBYSCORE(
113 3
            self::QUEUE_DELAYED,
114
            $since,
115 3
            '+inf',
116 3
            ['withscores' => true]
117
        );
118
119 3
        if (!empty($resultRaw)) {
120 2
            $keys = [];
121 2
            foreach ($resultRaw as $jobId => $timestamp) {
122 2
                if (empty($eventType) || strpos($jobId, "$eventType:") === 0) {
0 ignored issues
show
Coding Style Best Practice introduced by
As per coding-style, please use concatenation or sprintf for the variable $eventType instead of interpolation.

It is generally a best practice as it is often more readable to use concatenation instead of interpolation for variables inside strings.

// Instead of
$x = "foo $bar $baz";

// Better use either
$x = "foo " . $bar . " " . $baz;
$x = sprintf("foo %s %s", $bar, $baz);
Loading history...
123 2
                    $keys[$jobId] = $timestamp;
124
                }
125
            }
126
127 2
            if (!empty($keys)) {
128 2
                $events = $redis->hmget(self::META_DATA, array_keys($keys));
129 2
                foreach ($events as $jobId => $rawJob) {
130
                    /** @var Job $job */
131 2
                    $job = unserialize(base64_decode($rawJob));
132 2
                    yield $job->jobId => $job;
133
                }
134
            }
135
        }
136
137 3
        $immediate = $redis->lrange(self::QUEUE_IMMEDIATE, 0, 100);
138 3
        foreach ($immediate as $rawJob) {
139 1
            list($jobId, $rawJob) = explode('#', $rawJob, 2);
140 1
            if (empty($eventType) || strpos($jobId, "$eventType:") === 0) {
0 ignored issues
show
Coding Style Best Practice introduced by
As per coding-style, please use concatenation or sprintf for the variable $eventType instead of interpolation.

It is generally a best practice as it is often more readable to use concatenation instead of interpolation for variables inside strings.

// Instead of
$x = "foo $bar $baz";

// Better use either
$x = "foo " . $bar . " " . $baz;
$x = sprintf("foo %s %s", $bar, $baz);
Loading history...
141
                /** @var Job $job */
142 1
                $job = unserialize(base64_decode($rawJob));
143 1
                yield $job->jobId => $job;
144
            }
145
        }
146 3
    }
147
148
    /**
149
     * @param Job $job
150
     */
151 1
    public function restoreJob(Job $job)
152
    {
153 1
        $now = $this->now();
154
155 1
        $job->timestamp = $now + self::RETRY_TIME;
156 1
        $job->errorCounter++;
157 1
        $this->addJob($job);
158 1
    }
159
160
    /**
161
     * @return integer
162
     */
163 1
    public function countAllJobs()
164
    {
165 1
        $delayed   = $this->getRedis()->ZCARD(self::QUEUE_DELAYED);
166 1
        $immediate = $this->getRedis()->LLEN(self::QUEUE_IMMEDIATE);
167
168 1
        return $delayed + $immediate;
169
    }
170
}
171