Passed
Push — v2 ( 1cd5a7...0b44bc )
by Brice
02:47
created

RedisQueue::delete()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 7
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 3
nc 1
nop 1
dl 0
loc 7
ccs 0
cts 4
cp 0
crap 2
rs 9.4285
c 0
b 0
f 0
1
<?php
2
3
namespace JobQueue\Infrastructure;
4
5
use JobQueue\Domain\Task\Profile;
6
use JobQueue\Domain\Task\Queue;
7
use JobQueue\Domain\Task\Status;
8
use JobQueue\Domain\Task\Task;
9
use Predis\Client;
10
use Predis\Collection\Iterator\Keyspace;
11
12
/**
13
 * Redis implementation of the task queue.
14
 *
15
 * Used keys:
16
 * - jobqueue.tasks.{task_identifier}
17
 * - jobqueue.{profile}.waiting
18
 * - jobqueue.{profile}.running
19
 * - jobqueue.{profile}.failed
20
 */
21
final class RedisQueue implements Queue
22
{
23
    /**
24
     *
25
     * @var Client
26
     */
27
    private $predis;
28
29
    /**
30
     *
31
     * @param Client $predis
32
     */
33 1
    public function __construct(Client $predis)
34
    {
35 1
        $this->predis = $predis;
36 1
    }
37
38
    /**
39
     *
40
     * @param Task[] ...$tasks
41
     */
42 2
    public function add(Task ...$tasks)
43
    {
44 2
        foreach ($tasks as $task) {
45 2
            $this->predis->set($this->getKey($task), serialize($task));
46 2
            $this->predis->lpush($this->getTaskList($task), $task->getIdentifier());
47
        }
48 2
    }
49
50
    /**
51
     *
52
     * @param Profile $profile
53
     * @return Task
54
     */
55 4
    public function fetch(Profile $profile): Task
56
    {
57
        try {
58 4
            $identifier = $this->predis->brpoplpush(
59 4
                $this->getList($profile, new Status(Status::WAITING)),
60 4
                $this->getList($profile, new Status(Status::RUNNING)),
61 4
                300 // 5 minutes
62
            );
63
64
            // Find and update task
65 4
            $task = $this->find($identifier);
66 4
            $task->updateStatus(new Status(Status::RUNNING));
67 4
            $this->predis->set($this->getKey($task), serialize($task));
68
69 4
            return $task;
70
71
        } catch (\Exception $e) {
72
            // sleep a little to avoid high CPU consuming infinite loops...
73
            sleep(3);
74
75
            // ... and try again
76
            return $this->fetch($profile);
77
        }
78
    }
79
80
    /**
81
     *
82
     * @param string $identifier
83
     * @return Task
84
     */
85 6
    public function find(string $identifier): Task
86
    {
87 6
        if (!$serializedTask = $this->predis->get($this->getKey($identifier))) {
88
            throw new \RuntimeException(sprintf('Task %s does not exists', $identifier));
89
        }
90
91 6
        return unserialize($serializedTask);
92
    }
93
94
    /**
95
     *
96
     * @param Task   $task
97
     * @param Status $status
98
     */
99 5
    public function updateStatus(Task $task, Status $status)
100
    {
101 5
        if ((string) $status === (string) $task->getStatus()) {
102
            return;
103
        }
104
105
/**
106
 * Can be useful to mark tasks as success after a crash
107
 * Required now as the worker needs to mark tasks as finished
108
 */
109
//        if (Status::RUNNING === (string) $task->getStatus()) {
110
//            throw new \RuntimeException('The status of a running task can\'t be updated');
111
//        }
112
113 5
        if (Status::RUNNING === (string) $status) {
114
            throw new \RuntimeException('A task can not be marked as "running"');
115
        }
116
117
        // Remove task from its current list (there is no `finished` list)
118 5
        if (Status::FINISHED !== (string) $task->getStatus()) {
119 5
            $this->predis->lrem($this->getTaskList($task), 0, (string) $task);
120
        }
121
122
        // Insert the task to its new list (except for `finished` tasks)
123 5
        if (Status::FINISHED !== (string) $status) {
124 2
            $this->predis->lpush($this->getList($task->getProfile(), $status), (string) $task);
125
        }
126
127
        // Update task
128 5
        $task->updateStatus($status);
129 5
        $this->predis->set($this->getKey($task), serialize($task));
130 5
    }
131
132
    /**
133
     *
134
     * @param Profile $profile
135
     * @param Status  $status
136
     * @param string  $orderBy
137
     * @return Task[]
138
     * @throws \Exception
139
     */
140 3
    public function dump(Profile $profile = null, Status $status = null, string $orderBy = 'date'): array
141
    {
142 3
        if (!in_array($orderBy, ['date', 'profile', 'status'])) {
143
            throw new \Exception(sprintf('Impossible to order by "%s"', $orderBy));
144
        }
145
146
        // List all tasks
147 3
        $tasks = [];
148 3
        foreach (new Keyspace($this->predis, $this->getKey()) as $key) {
149 2
            $task = unserialize($this->predis->get($key)); /** @var Task $task */
150
151 2
            if ($profile and (string) $profile !== (string) $task->getProfile()) {
152
                continue;
153
            }
154
155 2
            if ($status and (string) $status !== (string) $task->getStatus()) {
156
                continue;
157
            }
158
159 2
            $tasks[] = [
160 2
                'date' => $task->getCreatedAt(),
161 2
                'profile' => (string) $task->getProfile(),
162 2
                'status' => array_search((string) $task->getStatus(), Status::listStatus()),
163 2
                'task' => $task,
164
            ];
165
        }
166
167
        // Order Tasks
168 3
        usort($tasks, function ($a, $b) use ($orderBy) {
169
            $aValue = $a[$orderBy];
170
            $bValue = $b[$orderBy];
171
172
            if ($aValue === $bValue) return 0;
173
174
            return $aValue < $bValue ? -1 : 1;
175 3
        });
176
177
        // Clean return
178 3
        foreach ($tasks as $key => $task) {
179 2
            $tasks[$key] = $task['task'];
180
        }
181
182 3
        return $tasks;
183
    }
184
185
    /**
186
     *
187
     * @param string $identifier
188
     */
189
    public function delete(string $identifier)
190
    {
191
        $task = $this->find($identifier);
192
193
        $this->predis->lrem($this->getTaskList($task), 0, (string) $task);
194
195
        $this->predis->del($this->getKey($task));
196
    }
197
198 2
    public function flush()
199
    {
200 2
        $pipe = $this->predis->pipeline();
201
202 2
        foreach ($this->predis->keys('jobqueue.*') as $key) {
203 2
            $pipe->del($key);
204
        }
205
206 2
        $pipe->execute();
207 2
    }
208
209 1
    public function restore()
210
    {
211 1
        $tasks = $this->dump();
212
213 1
        $this->flush();
214
215 1
        foreach ($tasks as $task) {
216 1
            $task->updateStatus(new Status(Status::WAITING));
217 1
            $this->add($task);
218
        }
219 1
    }
220
221
    /**
222
     *
223
     * @param Profile $profile
224
     * @param Status  $status
225
     * @return string
226
     */
227 7
    protected function getList(Profile $profile = null, Status $status = null): string
228
    {
229 7
        return sprintf('jobqueue.%s.%s', $profile ?: '*', $status ?: '*');
230
    }
231
232
    /**
233
     *
234
     * @param Task $task
235
     * @return string
236
     */
237 7
    protected function getTaskList(Task $task): string
238
    {
239 7
        return $this->getList($task->getProfile(), $task->getStatus());
240
    }
241
242
    /**
243
     *
244
     * @param $identifier
245
     * @return string
246
     */
247 10
    protected function getKey($identifier = null): string
248
    {
249 10
        if ($identifier instanceof Task) {
250 7
            $identifier = (string) $identifier->getIdentifier();
251
        }
252
253 10
        return sprintf('jobqueue.tasks.%s', $identifier ?: '*');
254
    }
255
}