Passed
Push — master ( 98d101...f1415a )
by Brice
02:40
created

RedisQueue::flush()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 9
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 4
nc 2
nop 0
dl 0
loc 9
ccs 5
cts 5
cp 1
crap 2
rs 9.6666
c 0
b 0
f 0
1
<?php
2
3
namespace JobQueue\Infrastructure;
4
5
use JobQueue\Domain\Task\Profile;
6
use JobQueue\Domain\Task\Queue;
7
use JobQueue\Domain\Task\Status;
8
use JobQueue\Domain\Task\Task;
9
use Predis\Client;
10
use Predis\Collection\Iterator\Keyspace;
11
12
/**
13
 * Redis implementation of the task queue.
14
 *
15
 * Used keys:
16
 * - jobqueue.tasks.{task_identifier}
17
 * - jobqueue.{profile}.waiting
18
 * - jobqueue.{profile}.running
19
 * - jobqueue.{profile}.failed
20
 */
21
final class RedisQueue implements Queue
22
{
23
    /**
24
     *
25
     * @var Client
26
     */
27
    private $predis;
28
29
    /**
30
     *
31
     * @param Client $predis
32
     */
33 2
    public function __construct(Client $predis)
34
    {
35 2
        $this->predis = $predis;
36 2
    }
37
38
    /**
39
     *
40
     * @param Task[] ...$tasks
41
     */
42 7
    public function add(Task ...$tasks)
43
    {
44 7
        foreach ($tasks as $task) {
45 7
            $this->predis->set($this->getKey($task), serialize($task));
46 7
            $this->predis->lpush($this->getTaskList($task), $task->getIdentifier());
47
        }
48 7
    }
49
50
    /**
51
     *
52
     * @param Profile $profile
53
     * @return Task
54
     */
55 6
    public function fetch(Profile $profile): Task
56
    {
57
        try {
58 6
            $identifier = $this->predis->brpoplpush(
59 6
                $this->getList($profile, new Status(Status::WAITING)),
60 6
                $this->getList($profile, new Status(Status::RUNNING)),
61 6
                300 // 5 minutes
62
            );
63
64 6
            if (is_null($identifier)) {
65
                throw new \Exception('Empty list');
66
            }
67
68
            // Find and update task
69 6
            $task = $this->find($identifier);
70 6
            $task->updateStatus(new Status(Status::RUNNING));
71 6
            $this->predis->set($this->getKey($task), serialize($task));
72
73 6
            return $task;
74
75
        } catch (\Exception $e) {
76
            // sleep a little to avoid high CPU consuming infinite loops...
77
            sleep(3);
78
79
            // ... and try again
80
            return $this->fetch($profile);
81
        }
82
    }
83
84
    /**
85
     *
86
     * @param string $identifier
87
     * @return Task
88
     */
89 11
    public function find(string $identifier): Task
90
    {
91 11
        if (!$serializedTask = $this->predis->get($this->getKey($identifier))) {
92 1
            throw new \RuntimeException(sprintf('Task %s does not exists', $identifier));
93
        }
94
95 11
        return unserialize($serializedTask);
96
    }
97
98
    /**
99
     *
100
     * @param Task   $task
101
     * @param Status $status
102
     */
103 6
    public function updateStatus(Task $task, Status $status)
104
    {
105 6
        if ((string) $status === (string) $task->getStatus()) {
106 2
            return;
107
        }
108
109
/**
110
 * Can be useful to mark tasks as success after a crash
111
 * Required now as the worker needs to mark tasks as finished
112
 */
113
//        if (Status::RUNNING === (string) $task->getStatus()) {
114
//            throw new \RuntimeException('The status of a running task can\'t be updated');
115
//        }
116
117 6
        if (Status::RUNNING === (string) $status) {
118
            throw new \RuntimeException('A task can not be marked as "running"');
119
        }
120
121
        // Remove task from its current list (there is no `finished` list)
122 6
        if (Status::FINISHED !== (string) $task->getStatus()) {
123 6
            $this->predis->lrem($this->getTaskList($task), 0, (string) $task);
124
        }
125
126
        // Insert the task to its new list (except for `finished` tasks)
127 6
        if (Status::FINISHED !== (string) $status) {
128 2
            $this->predis->lpush($this->getList($task->getProfile(), $status), (string) $task);
129
        }
130
131
        // Update task
132 6
        $task->updateStatus($status);
133 6
        $this->predis->set($this->getKey($task), serialize($task));
134 6
    }
135
136
    /**
137
     *
138
     * @param Profile|null $profile
139
     * @param Status|null  $status
140
     * @param array        $tags
141
     * @param string       $orderBy
142
     * @return Task[]
143
     * @throws \Exception
144
     */
145 10
    public function search(Profile $profile = null, Status $status = null, array $tags = [], string $orderBy = 'date'): array
146
    {
147 10
        if (!in_array($orderBy, ['date', 'profile', 'status'])) {
148
            throw new \Exception(sprintf('Impossible to order by "%s"', $orderBy));
149
        }
150
151
        // List all tasks
152 10
        $tasks = [];
153 10
        foreach (new Keyspace($this->predis, $this->getKey()) as $key) {
154
155 8
            $task = unserialize($this->predis->get($key)); /** @var Task $task */
156
157 8
            if ($profile and (string) $profile !== (string) $task->getProfile()) {
158 2
                continue;
159
            }
160
161 8
            if ($status and (string) $status !== (string) $task->getStatus()) {
162 1
                continue;
163
            }
164
165 8
            if (!empty($tags)) {
166
                // Check that the task has at least one tag
167 2
                if (empty(array_intersect($tags, $task->getTags()))) {
168 2
                    continue;
169
                }
170
            }
171
172 8
            $tasks[] = [
173 8
                'date' => $task->getCreatedAt(),
174 8
                'profile' => (string) $task->getProfile(),
175 8
                'status' => array_search((string) $task->getStatus(), Status::listStatus()),
176 8
                'task' => $task,
177
            ];
178
        }
179
180
        // Order Tasks
181 10
        usort($tasks, function ($a, $b) use ($orderBy) {
182 7
            $aValue = $a[$orderBy];
183 7
            $bValue = $b[$orderBy];
184
185 7
            if ($aValue === $bValue) return 0;
186
187
            return $aValue < $bValue ? -1 : 1;
188 10
        });
189
190
        // Clean return
191 10
        foreach ($tasks as $key => $task) {
192 8
            $tasks[$key] = $task['task'];
193
        }
194
195 10
        return $tasks;
196
    }
197
198
    /**
199
     *
200
     * @param string $identifier
201
     */
202 1
    public function delete(string $identifier)
203
    {
204 1
        $task = $this->find($identifier);
205
206 1
        $this->predis->lrem($this->getTaskList($task), 0, (string) $task);
207
208 1
        $this->predis->del($this->getKey($task));
209 1
    }
210
211 4
    public function flush()
212
    {
213 4
        $pipe = $this->predis->pipeline();
214
215 4
        foreach ($this->predis->keys('jobqueue.*') as $key) {
216 4
            $pipe->del($key);
217
        }
218
219 4
        $pipe->execute();
220 4
    }
221
222 2
    public function restore()
223
    {
224 2
        $tasks = $this->search();
225
226 2
        $this->flush();
227
228 2
        foreach ($tasks as $task) {
229
            // Reset non finished tasks to `waiting` status
230 2
            if (Status::FINISHED !== (string) $task->getStatus()) {
231 2
                $task->updateStatus(new Status(Status::WAITING));
232
            }
233
234 2
            $this->add($task);
235
        }
236 2
    }
237
238
    /**
239
     *
240
     * @param Profile $profile
241
     * @param Status  $status
242
     * @return string
243
     */
244 15
    protected function getList(Profile $profile = null, Status $status = null): string
245
    {
246 15
        return sprintf('jobqueue.%s.%s', $profile ?: '*', $status ?: '*');
247
    }
248
249
    /**
250
     *
251
     * @param Task $task
252
     * @return string
253
     */
254 14
    protected function getTaskList(Task $task): string
255
    {
256 14
        return $this->getList($task->getProfile(), $task->getStatus());
257
    }
258
259
    /**
260
     *
261
     * @param $identifier
262
     * @return string
263
     */
264 23
    protected function getKey($identifier = null): string
265
    {
266 23
        if ($identifier instanceof Task) {
267 15
            $identifier = (string) $identifier->getIdentifier();
268
        }
269
270 23
        return sprintf('jobqueue.tasks.%s', $identifier ?: '*');
271
    }
272
}