RedisQueue::updateStatus()   B
last analyzed

Complexity

Conditions 5
Paths 6

Size

Total Lines 31
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 5.0187

Importance

Changes 0
Metric Value
dl 0
loc 31
ccs 10
cts 11
cp 0.9091
rs 8.439
c 0
b 0
f 0
cc 5
eloc 10
nc 6
nop 2
crap 5.0187
1
<?php
2
3
namespace JobQueue\Infrastructure;
4
5
use JobQueue\Domain\Task\Profile;
6
use JobQueue\Domain\Task\Queue;
7
use JobQueue\Domain\Task\Status;
8
use JobQueue\Domain\Task\Task;
9
use Predis\Client;
10
use Predis\Collection\Iterator\Keyspace;
11
12
/**
13
 * Redis implementation of the task queue.
14
 *
15
 * Used keys:
16
 * - jobqueue.tasks.{task_identifier}
17
 * - jobqueue.{profile}.waiting
18
 * - jobqueue.{profile}.running
19
 * - jobqueue.{profile}.failed
20
 */
21
final class RedisQueue implements Queue
22
{
23
    /**
24
     *
25
     * @var Client
26
     */
27
    private $predis;
28
29
    /**
30
     *
31
     * @param Client $predis
32
     */
33 2
    public function __construct(Client $predis)
34
    {
35 2
        $this->predis = $predis;
36 2
    }
37
38
    /**
39
     *
40
     * @param Task[] ...$tasks
41
     */
42 7
    public function add(Task ...$tasks)
43
    {
44 7
        foreach ($tasks as $task) {
45 7
            $this->predis->set($this->getKey($task), serialize($task));
46 7
            $this->predis->lpush($this->getTaskList($task), $task->getIdentifier());
47
        }
48 7
    }
49
50
    /**
51
     *
52
     * @param Profile $profile
53
     * @return Task
54
     */
55 6
    public function fetch(Profile $profile): Task
56
    {
57
        try {
58 6
            $identifier = $this->predis->brpoplpush(
59 6
                $this->getList($profile, new Status(Status::WAITING)),
60 6
                $this->getList($profile, new Status(Status::RUNNING)),
61 6
                300 // 5 minutes
62
            );
63
64 6
            if (is_null($identifier)) {
65
                throw new \Exception('Empty list');
66
            }
67
68
            // Find and update task
69 6
            $task = $this->find($identifier);
70 6
            $task->updateStatus(new Status(Status::RUNNING));
71 6
            $this->predis->set($this->getKey($task), serialize($task));
72
73 6
            return $task;
74
75
        } catch (\Exception $e) {
76
            // Try to free memory
77
            unset($identifier, $task);
78
79
            // sleep a little to avoid high CPU consuming infinite loops...
80
            sleep(3);
81
82
            // ... and try again
83
            return $this->fetch($profile);
84
        }
85
    }
86
87
    /**
88
     *
89
     * @param string $identifier
90
     * @return Task
91
     */
92 11
    public function find(string $identifier): Task
93
    {
94 11
        if (!$serializedTask = $this->predis->get($this->getKey($identifier))) {
95 1
            throw new \RuntimeException(sprintf('Task %s does not exists', $identifier));
96
        }
97
98 11
        return unserialize($serializedTask);
99
    }
100
101
    /**
102
     *
103
     * @param Task   $task
104
     * @param Status $status
105
     */
106 6
    public function updateStatus(Task $task, Status $status)
107
    {
108 6
        if ((string) $status === (string) $task->getStatus()) {
109 2
            return;
110
        }
111
112
/**
113
 * Can be useful to mark tasks as success after a crash
114
 * Required now as the worker needs to mark tasks as finished
115
 */
116
//        if (Status::RUNNING === (string) $task->getStatus()) {
117
//            throw new \RuntimeException('The status of a running task can\'t be updated');
118
//        }
119
120 6
        if (Status::RUNNING === (string) $status) {
121
            throw new \RuntimeException('A task can not be marked as "running"');
122
        }
123
124
        // Remove task from its current list (there is no `finished` list)
125 6
        if (Status::FINISHED !== (string) $task->getStatus()) {
126 6
            $this->predis->lrem($this->getTaskList($task), 0, (string) $task);
127
        }
128
129
        // Insert the task to its new list (except for `finished` tasks)
130 6
        if (Status::FINISHED !== (string) $status) {
131 2
            $this->predis->lpush($this->getList($task->getProfile(), $status), (string) $task);
132
        }
133
134
        // Update task
135 6
        $task->updateStatus($status);
136 6
        $this->predis->set($this->getKey($task), serialize($task));
137 6
    }
138
139
    /**
140
     *
141
     * @param Profile|null $profile
142
     * @param Status|null  $status
143
     * @param array        $tags
144
     * @param string       $orderBy
145
     * @return Task[]
146
     * @throws \Exception
147
     */
148 10
    public function search(Profile $profile = null, Status $status = null, array $tags = [], string $orderBy = 'date'): array
149
    {
150 10
        if (!in_array($orderBy, ['date', 'profile', 'status'])) {
151
            throw new \Exception(sprintf('Impossible to order by "%s"', $orderBy));
152
        }
153
154
        // List all tasks
155 10
        $tasks = [];
156 10
        foreach (new Keyspace($this->predis, $this->getKey()) as $key) {
157
158 8
            $task = unserialize($this->predis->get($key)); /** @var Task $task */
159
160 8
            if ($profile and (string) $profile !== (string) $task->getProfile()) {
161 2
                continue;
162
            }
163
164 8
            if ($status and (string) $status !== (string) $task->getStatus()) {
165 1
                continue;
166
            }
167
168 8
            if (!empty($tags)) {
169
                // Check that the task has at least one tag
170 2
                if (empty(array_intersect($tags, $task->getTags()))) {
171 2
                    continue;
172
                }
173
            }
174
175 8
            $tasks[] = [
176 8
                'date' => $task->getCreatedAt(),
177 8
                'profile' => (string) $task->getProfile(),
178 8
                'status' => array_search((string) $task->getStatus(), Status::listStatus()),
179 8
                'task' => $task,
180
            ];
181
        }
182
183
        // Order Tasks
184 10
        usort($tasks, function ($a, $b) use ($orderBy) {
185 7
            $aValue = $a[$orderBy];
186 7
            $bValue = $b[$orderBy];
187
188 7
            if ($aValue === $bValue) return 0;
189
190
            return $aValue < $bValue ? -1 : 1;
191 10
        });
192
193
        // Clean return
194 10
        foreach ($tasks as $key => $task) {
195 8
            $tasks[$key] = $task['task'];
196
        }
197
198 10
        return $tasks;
199
    }
200
201
    /**
202
     *
203
     * @param string $identifier
204
     */
205 1
    public function delete(string $identifier)
206
    {
207 1
        $task = $this->find($identifier);
208
209 1
        $this->predis->lrem($this->getTaskList($task), 0, (string) $task);
210
211 1
        $this->predis->del($this->getKey($task));
212 1
    }
213
214 4
    public function flush()
215
    {
216 4
        $pipe = $this->predis->pipeline();
217
218 4
        foreach ($this->predis->keys('jobqueue.*') as $key) {
219 4
            $pipe->del($key);
220
        }
221
222 4
        $pipe->execute();
223 4
    }
224
225 2
    public function restore()
226
    {
227 2
        $tasks = $this->search();
228
229 2
        $this->flush();
230
231 2
        foreach ($tasks as $task) {
232
            // Reset non finished tasks to `waiting` status
233 2
            if (Status::FINISHED !== (string) $task->getStatus()) {
234 2
                $task->updateStatus(new Status(Status::WAITING));
235
            }
236
237 2
            $this->add($task);
238
        }
239 2
    }
240
241
    /**
242
     *
243
     * @param Profile $profile
244
     * @param Status  $status
245
     * @return string
246
     */
247 15
    protected function getList(Profile $profile = null, Status $status = null): string
248
    {
249 15
        return sprintf('jobqueue.%s.%s', $profile ?: '*', $status ?: '*');
250
    }
251
252
    /**
253
     *
254
     * @param Task $task
255
     * @return string
256
     */
257 14
    protected function getTaskList(Task $task): string
258
    {
259 14
        return $this->getList($task->getProfile(), $task->getStatus());
260
    }
261
262
    /**
263
     *
264
     * @param $identifier
265
     * @return string
266
     */
267 23
    protected function getKey($identifier = null): string
268
    {
269 23
        if ($identifier instanceof Task) {
270 15
            $identifier = (string) $identifier->getIdentifier();
271
        }
272
273 23
        return sprintf('jobqueue.tasks.%s', $identifier ?: '*');
274
    }
275
}