Passed
Push — master ( cf9606...e315e5 )
by Brice
02:40
created

RedisQueue   B

Complexity

Total Complexity 36

Size/Duplication

Total Lines 242
Duplicated Lines 0 %

Test Coverage

Coverage 93.26%

Importance

Changes 0
Metric Value
wmc 36
dl 0
loc 242
ccs 83
cts 89
cp 0.9326
rs 8.8
c 0
b 0
f 0

12 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 3 1
A add() 0 5 2
A fetch() 0 22 2
A getList() 0 3 3
A flush() 0 9 2
C search() 0 51 12
A delete() 0 7 1
A find() 0 7 2
A restore() 0 9 2
A getKey() 0 7 3
A getTaskList() 0 3 1
B updateStatus() 0 31 5
1
<?php
2
3
namespace JobQueue\Infrastructure;
4
5
use JobQueue\Domain\Task\Profile;
6
use JobQueue\Domain\Task\Queue;
7
use JobQueue\Domain\Task\Status;
8
use JobQueue\Domain\Task\Task;
9
use Predis\Client;
10
use Predis\Collection\Iterator\Keyspace;
11
12
/**
13
 * Redis implementation of the task queue.
14
 *
15
 * Used keys:
16
 * - jobqueue.tasks.{task_identifier}
17
 * - jobqueue.{profile}.waiting
18
 * - jobqueue.{profile}.running
19
 * - jobqueue.{profile}.failed
20
 */
21
final class RedisQueue implements Queue
22
{
23
    /**
24
     *
25
     * @var Client
26
     */
27
    private $predis;
28
29
    /**
30
     *
31
     * @param Client $predis
32
     */
33 2
    public function __construct(Client $predis)
34
    {
35 2
        $this->predis = $predis;
36 2
    }
37
38
    /**
39
     *
40
     * @param Task[] ...$tasks
41
     */
42 5
    public function add(Task ...$tasks)
43
    {
44 5
        foreach ($tasks as $task) {
45 5
            $this->predis->set($this->getKey($task), serialize($task));
46 5
            $this->predis->lpush($this->getTaskList($task), $task->getIdentifier());
47
        }
48 5
    }
49
50
    /**
51
     *
52
     * @param Profile $profile
53
     * @return Task
54
     */
55 6
    public function fetch(Profile $profile): Task
56
    {
57
        try {
58 6
            $identifier = $this->predis->brpoplpush(
59 6
                $this->getList($profile, new Status(Status::WAITING)),
60 6
                $this->getList($profile, new Status(Status::RUNNING)),
61 6
                300 // 5 minutes
62
            );
63
64
            // Find and update task
65 6
            $task = $this->find($identifier);
66 6
            $task->updateStatus(new Status(Status::RUNNING));
67 6
            $this->predis->set($this->getKey($task), serialize($task));
68
69 6
            return $task;
70
71
        } catch (\Exception $e) {
72
            // sleep a little to avoid high CPU consuming infinite loops...
73
            sleep(3);
74
75
            // ... and try again
76
            return $this->fetch($profile);
77
        }
78
    }
79
80
    /**
81
     *
82
     * @param string $identifier
83
     * @return Task
84
     */
85 10
    public function find(string $identifier): Task
86
    {
87 10
        if (!$serializedTask = $this->predis->get($this->getKey($identifier))) {
88 1
            throw new \RuntimeException(sprintf('Task %s does not exists', $identifier));
89
        }
90
91 10
        return unserialize($serializedTask);
92
    }
93
94
    /**
95
     *
96
     * @param Task   $task
97
     * @param Status $status
98
     */
99 6
    public function updateStatus(Task $task, Status $status)
100
    {
101 6
        if ((string) $status === (string) $task->getStatus()) {
102 2
            return;
103
        }
104
105
/**
106
 * Can be useful to mark tasks as success after a crash
107
 * Required now as the worker needs to mark tasks as finished
108
 */
109
//        if (Status::RUNNING === (string) $task->getStatus()) {
110
//            throw new \RuntimeException('The status of a running task can\'t be updated');
111
//        }
112
113 6
        if (Status::RUNNING === (string) $status) {
114
            throw new \RuntimeException('A task can not be marked as "running"');
115
        }
116
117
        // Remove task from its current list (there is no `finished` list)
118 6
        if (Status::FINISHED !== (string) $task->getStatus()) {
119 6
            $this->predis->lrem($this->getTaskList($task), 0, (string) $task);
120
        }
121
122
        // Insert the task to its new list (except for `finished` tasks)
123 6
        if (Status::FINISHED !== (string) $status) {
124 2
            $this->predis->lpush($this->getList($task->getProfile(), $status), (string) $task);
125
        }
126
127
        // Update task
128 6
        $task->updateStatus($status);
129 6
        $this->predis->set($this->getKey($task), serialize($task));
130 6
    }
131
132
    /**
133
     *
134
     * @param Profile|null $profile
135
     * @param Status|null  $status
136
     * @param array        $tags
137
     * @param string       $orderBy
138
     * @return Task[]
139
     * @throws \Exception
140
     */
141 9
    public function search(Profile $profile = null, Status $status = null, array $tags = [], string $orderBy = 'date'): array
142
    {
143 9
        if (!in_array($orderBy, ['date', 'profile', 'status'])) {
144
            throw new \Exception(sprintf('Impossible to order by "%s"', $orderBy));
145
        }
146
147
        // List all tasks
148 9
        $tasks = [];
149 9
        foreach (new Keyspace($this->predis, $this->getKey()) as $key) {
150
151 7
            $task = unserialize($this->predis->get($key)); /** @var Task $task */
152
153 7
            if ($profile and (string) $profile !== (string) $task->getProfile()) {
154 2
                continue;
155
            }
156
157 7
            if ($status and (string) $status !== (string) $task->getStatus()) {
158 1
                continue;
159
            }
160
161 7
            if (!empty($tags)) {
162
                // Check that the task has at least one tag
163 2
                if (empty(array_intersect($tags, $task->getTags()))) {
164 2
                    continue;
165
                }
166
            }
167
168 7
            $tasks[] = [
169 7
                'date' => $task->getCreatedAt(),
170 7
                'profile' => (string) $task->getProfile(),
171 7
                'status' => array_search((string) $task->getStatus(), Status::listStatus()),
172 7
                'task' => $task,
173
            ];
174
        }
175
176
        // Order Tasks
177 9
        usort($tasks, function ($a, $b) use ($orderBy) {
178 6
            $aValue = $a[$orderBy];
179 6
            $bValue = $b[$orderBy];
180
181 6
            if ($aValue === $bValue) return 0;
182
183
            return $aValue < $bValue ? -1 : 1;
184 9
        });
185
186
        // Clean return
187 9
        foreach ($tasks as $key => $task) {
188 7
            $tasks[$key] = $task['task'];
189
        }
190
191 9
        return $tasks;
192
    }
193
194
    /**
195
     *
196
     * @param string $identifier
197
     */
198 1
    public function delete(string $identifier)
199
    {
200 1
        $task = $this->find($identifier);
201
202 1
        $this->predis->lrem($this->getTaskList($task), 0, (string) $task);
203
204 1
        $this->predis->del($this->getKey($task));
205 1
    }
206
207 4
    public function flush()
208
    {
209 4
        $pipe = $this->predis->pipeline();
210
211 4
        foreach ($this->predis->keys('jobqueue.*') as $key) {
212 4
            $pipe->del($key);
213
        }
214
215 4
        $pipe->execute();
216 4
    }
217
218 2
    public function restore()
219
    {
220 2
        $tasks = $this->search();
221
222 2
        $this->flush();
223
224 2
        foreach ($tasks as $task) {
225 2
            $task->updateStatus(new Status(Status::WAITING));
226 2
            $this->add($task);
227
        }
228 2
    }
229
230
    /**
231
     *
232
     * @param Profile $profile
233
     * @param Status  $status
234
     * @return string
235
     */
236 13
    protected function getList(Profile $profile = null, Status $status = null): string
237
    {
238 13
        return sprintf('jobqueue.%s.%s', $profile ?: '*', $status ?: '*');
239
    }
240
241
    /**
242
     *
243
     * @param Task $task
244
     * @return string
245
     */
246 12
    protected function getTaskList(Task $task): string
247
    {
248 12
        return $this->getList($task->getProfile(), $task->getStatus());
249
    }
250
251
    /**
252
     *
253
     * @param $identifier
254
     * @return string
255
     */
256 20
    protected function getKey($identifier = null): string
257
    {
258 20
        if ($identifier instanceof Task) {
259 13
            $identifier = (string) $identifier->getIdentifier();
260
        }
261
262 20
        return sprintf('jobqueue.tasks.%s', $identifier ?: '*');
263
    }
264
}