Completed
Push — develop ( f102ca...488b09 )
by Mathias
20:12 queued 11:00
created

MongoQueue::createEnvelope()   A

Complexity

Conditions 5
Paths 16

Size

Total Lines 20
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 15
CRAP Score 5

Importance

Changes 0
Metric Value
eloc 15
dl 0
loc 20
ccs 15
cts 15
cp 1
rs 9.4555
c 0
b 0
f 0
cc 5
nc 16
nop 2
crap 5
1
<?php
2
/**
3
 * YAWIK
4
 *
5
 * @filesource
6
 * @license MIT
7
 * @copyright  2013 - 2019 Cross Solution <http://cross-solution.de>
8
 */
9
  
10
/** */
11
namespace Core\Queue;
12
13
use SlmQueue\Job\JobInterface;
14
use SlmQueue\Job\JobPluginManager;
15
use SlmQueue\Queue\AbstractQueue;
16
17
/**
18
 * SlmQueue implementation for a queue backed by MongoDB
19
 *
20
 * Heavily inspired by https://github.com/juriansluiman/SlmQueueDoctrine
21
 * 
22
 * @author Mathias Gelhausen <[email protected]>
23
 */
24
class MongoQueue extends AbstractQueue
25
{
26
27
    /**#@+
28
     * Job status
29
     * @var int
30
     */
31
    const STATUS_PENDING = 1;
32
    const STATUS_RUNNING = 2;
33
    const STATUS_FAILED  = 3;
34
    /**#@-*/
35
36
    /**
37
     * Default priority
38
     * @var int
39
     */
40
    const DEFAULT_PRIORITY = 1024;
41
42
    /**
43
     * Used to synchronize time calculations
44
     *
45
     * @var \DateTime
46
     */
47
    private $now;
48
49
    /**
50
     * @var \MongoDB\Collection
51
     */
52
    private $mongoCollection;
53
54
    /**
55
     * Constructor
56
     *
57
     * @param \MongoDB\Collection $collection
58
     * @param string           $name
59
     * @param JobPluginManager $jobPluginManager
60
     */
61 13
    public function __construct(
62
        \MongoDB\Collection $collection,
63
        $name,
64
        JobPluginManager $jobPluginManager
65
    ) {
66 13
        $this->mongoCollection = $collection;
67
68 13
        parent::__construct($name, $jobPluginManager);
69 13
    }
70
71
72
    /**
73
     * Push a job to the queue.
74
     *
75
     * Valid options are:
76
     *      - priority: the lower the priority is, the sooner the job get popped from the queue (default to 1024)
77
     *
78
     * Note : see {@link parseOptionsToDateTime()} for schedule and delay options
79
     *
80
     * @param JobInterface $job
81
     * @param array $options
82
     */
83 2
    public function push(JobInterface $job, array $options = [])
84
    {
85 2
        $envelope = $this->createEnvelope($job, $options);
86
87 2
        $result = $this->mongoCollection->insertOne($envelope);
88
89 2
        $job->setId((string) $result->getInsertedId());
90 2
    }
91
92
    /**
93
     * Push a lazy loading job in the queue.
94
     *
95
     * Lazy job allows to load the actual job only when executed.
96
     *
97
     * You can specify the job to load in two ways:
98
     * - as string:
99
     *      The actual job will be pulled from the job manager or instantiated from a valid class name.
100
     *
101
     * - as array:
102
     *      If you need to pass options to the job, you can specify the
103
     *      job as an array:
104
     *      [ string:name, array:options ]
105
     *      The actual job will be pulled from the job manager using the 'build' command,
106
     *      passing the options along OR if no service for the job is defined, but a valid
107
     *      class name is given, this class will be instantiated and the options are passed
108
     *      as constructor arguments,
109
     *
110
     * @see push()
111
     * @see LazyJob
112
     *
113
     * @param string|array $service
114
     * @param mixed|null  $payload
115
     * @param array $options
116
     */
117 3
    public function pushLazy($service, $payload = null, array $options = [])
118
    {
119 3
        $manager = $this->getJobPluginManager();
120 3
        $serviceOptions = [];
121
122 3
        if (is_array($service)) {
123 1
            $serviceOptions = $service['options'] ?? $service[1] ?? [];
124 1
            $service = $service['name'] ?? $service[0] ?? null;
125
        }
126
127 3
        if (!$manager->has($service) && !class_exists($service)) {
128 1
            throw new \UnexpectedValueException(sprintf(
129 1
                'Service name "%s" is not a known job service or existent class',
130 1
                $service
131
            ));
132
        }
133
134
        $lazyOptions = [
135 2
            'name' => $service,
136 2
            'options' => $serviceOptions,
137 2
            'content' => $payload,
138
        ];
139
140 2
        $job = $this->getJobPluginManager()->build('lazy', $lazyOptions);
141
142 2
        $this->push($job, $options);
143 2
    }
144
145
    /**
146
     * Create a mongo document.
147
     *
148
     * @param JobInterface $job
149
     * @param array        $options
150
     *
151
     * @return array
152
     */
153 4
    private function createEnvelope(JobInterface $job, array $options = [])
154
    {
155 4
        $scheduled = $this->parseOptionsToDateTime($options);
156 4
        $tried     = isset($options['tried']) ? (int) $options['tried'] : null;
157 4
        $message   = isset($options['message']) ? $options['message'] : null;
158 4
        $trace     = isset($options['trace']) ? $options['trace'] : null;
159
160
        $envelope = [
161 4
            'queue'     => $this->getName(),
162 4
            'status'    => self::STATUS_PENDING,
163 4
            'tried'     => $tried,
164 4
            'message'   => $message,
165 4
            'trace'     => $trace,
166 4
            'created'   => $this->dateTimeToUTCDateTime($this->now),
167 4
            'data'      => $this->serializeJob($job),
168 4
            'scheduled' => $this->dateTimeToUTCDateTime($scheduled),
169 4
            'priority'  => isset($options['priority']) ? $options['priority'] : self::DEFAULT_PRIORITY,
170
        ];
171
172 4
        return $envelope;
173
    }
174
175
    /**
176
     * Reinsert the job in the queue.
177
     *
178
     * @see push()
179
     * @param JobInterface $job
180
     * @param array        $options
181
     */
182 1
    public function retry(JobInterface $job, array $options = [])
183
    {
184 1
        $tried = $job->getMetadata('mongoqueue.tries', 0) + 1;
0 ignored issues
show
Unused Code introduced by
The call to Zend\Stdlib\MessageInterface::getMetadata() has too many arguments starting with 0. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

184
        $tried = $job->/** @scrutinizer ignore-call */ getMetadata('mongoqueue.tries', 0) + 1;

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above.

Loading history...
185 1
        $job->setMetaData('mongoqueue.tries', $tried);
186
187 1
        $options['tried'] = $tried;
188 1
        $envelope = $this->createEnvelope($job, $options);
189 1
        unset($envelope['created']);
190
191 1
        $this->mongoCollection->findOneAndUpdate(
192
            [
193 1
                '_id' => new \MongoDB\BSON\ObjectID($job->getId())
194
            ],
195
            [
196 1
                '$set' => $envelope
197
            ]
198
        );
199 1
    }
200
201
    /**
202
     * Pop a job from the queue.
203
     *
204
     * The status will be set to self::STATUS_RUNNING.
205
     *
206
     * @param array $options unused
207
     * @return null|JobInterface
208
     */
209 2
    public function pop(array $options = [])
210
    {
211 2
        $time      = microtime(true);
212 2
        $micro     = sprintf("%06d", ($time - floor($time)) * 1000000);
213 2
        $this->now = new \DateTime(
214 2
            date('Y-m-d H:i:s.' . $micro, $time),
215 2
            new \DateTimeZone(date_default_timezone_get())
216
        );
217 2
        $now = $this->dateTimeToUTCDateTime($this->now);
218
219 2
        $envelope = $this->mongoCollection->findOneAndUpdate(
220
            [
221 2
                'queue' => $this->getName(),
222 2
                'status' => self::STATUS_PENDING,
223 2
                'scheduled' => ['$lte' => $now],
224
            ],
225
            [
226
                '$set' => [
227 2
                    'status' => self::STATUS_RUNNING,
228 2
                    'executed' => $now,
229
                ],
230
            ],
231
            [
232 2
                'sort' => ['priority' => 1, 'scheduled' => 1],
233 2
                'returnDocument' => \MongoDB\Operation\FindOneAndUpdate::RETURN_DOCUMENT_AFTER
234
            ]
235
        );
236
237 2
        if (!$envelope) {
238 1
            return null;
239
        }
240
241 1
        return $this->unserializeJob($envelope['data'], ['__id__' => $envelope['_id']]);
242
    }
243
244
    /**
245
     * Fetch a list of jobs
246
     *
247
     * @param array $options
248
     *
249
     * @return array
250
     */
251 2
    public function listing(array $options = [])
252
    {
253 2
        $filter = [ 'queue' => $this->getName() ];
254 2
        if (isset($options['status'])) {
255 1
            $filter['status'] = $options['status'];
256
        }
257
258 2
        $opt = [ 'sort' => [ 'scheduled' => 1, 'priority' => 1] ];
259 2
        if (isset($options['limit'])) {
260 1
            $opt['limit'] = $options['limit'];
261
        }
262
263 2
        $cursor = $this->mongoCollection->find($filter, $opt);
264 2
        $jobs   = $cursor->toArray();
265
266 2
        foreach ($jobs as &$envelope) {
267 1
            $envelope['job'] = $this->unserializeJob($envelope['data'], ['__id__' => $envelope['_id']]);
268
        }
269
270 2
        return $jobs;
271
272
    }
273
274
    /**
275
     * Delete a job from the queue
276
     *
277
     * @param JobInterface $job
278
     * @param array        $options unused
279
     *
280
     * @return bool
281
     */
282 1
    public function delete(JobInterface $job, array $options = [])
283
    {
284 1
        $result = $this->mongoCollection->deleteOne(['_id' => $job->getId()]);
285
286 1
        return (bool) $result->getDeletedCount();
0 ignored issues
show
Bug Best Practice introduced by
The expression return (bool)$result->getDeletedCount() returns the type boolean which is incompatible with the return type mandated by SlmQueue\Queue\QueueInterface::delete() of void.

In the issue above, the returned value is violating the contract defined by the mentioned interface.

Let's take a look at an example:

interface HasName {
    /** @return string */
    public function getName();
}

class Name {
    public $name;
}

class User implements HasName {
    /** @return string|Name */
    public function getName() {
        return new Name('foo'); // This is a violation of the ``HasName`` interface
                                // which only allows a string value to be returned.
    }
}
Loading history...
287
    }
288
289
    /**
290
     * Mark a job as permanent failed.
291
     *
292
     * The status will be set to self::STATUS_FAILED
293
     * @param JobInterface $job
294
     * @param array        $options unused
295
     */
296 1
    public function fail(JobInterface $job, array $options = [])
297
    {
298 1
        $envelope = $this->createEnvelope($job, $options);
299 1
        unset($envelope['created']);
300 1
        unset($envelope['scheduled']);
301 1
        $envelope['status'] = self::STATUS_FAILED;
302
303 1
        $this->mongoCollection->findOneAndUpdate(
304
            [
305 1
                '_id' => new \MongoDB\BSON\ObjectId($job->getId())
306
            ],
307
            [
308 1
                '$set' => $envelope
309
            ]
310
        );
311 1
    }
312
313
    /**
314
     * Parses options to a datetime object
315
     *
316
     * valid options keys:
317
     *
318
     * scheduled: the time when the job will be scheduled to run next
319
     * - numeric string or integer - interpreted as a timestamp
320
     * - string parserable by the DateTime object
321
     * - DateTime instance
322
     * delay: the delay before a job become available to be popped (defaults to 0 - no delay -)
323
     * - numeric string or integer - interpreted as seconds
324
     * - string parserable (ISO 8601 duration) by DateTimeInterval::__construct
325
     * - string parserable (relative parts) by DateTimeInterval::createFromDateString
326
     * - DateTimeInterval instance
327
     *
328
     * @see http://en.wikipedia.org/wiki/Iso8601#Durations
329
     * @see http://www.php.net/manual/en/datetime.formats.relative.php
330
     *
331
     * @codeCoverageIgnore
332
     * @param $options array
333
     * @return \DateTime
334
     */
335
    protected function parseOptionsToDateTime($options)
336
    {
337
        $time      = microtime(true);
338
        $micro     = sprintf("%06d", ($time - floor($time)) * 1000000);
339
        $this->now = new \DateTime(date('Y-m-d H:i:s.' . $micro, $time), new \DateTimeZone(date_default_timezone_get()));
340
        $scheduled = clone ($this->now);
341
342
        if (isset($options['scheduled'])) {
343
            switch (true) {
344
                case is_numeric($options['scheduled']):
345
                    $scheduled = new \DateTime(
346
                        sprintf("@%d", (int) $options['scheduled']),
347
                        new \DateTimeZone(date_default_timezone_get())
348
                    );
349
                    break;
350
                case is_string($options['scheduled']):
351
                    $scheduled = new \DateTime($options['scheduled'], new \DateTimeZone(date_default_timezone_get()));
352
                    break;
353
                case $options['scheduled'] instanceof \DateTime:
354
                    $scheduled = $options['scheduled'];
355
                    break;
356
            }
357
        }
358
359
        if (isset($options['delay'])) {
360
            switch (true) {
361
                case is_numeric($options['delay']):
362
                    $delay = new \DateInterval(sprintf("PT%dS", abs((int) $options['delay'])));
363
                    $delay->invert = ($options['delay'] < 0) ? 1 : 0;
364
                    break;
365
                case is_string($options['delay']):
366
                    try {
367
                        // first try ISO 8601 duration specification
368
                        $delay = new \DateInterval($options['delay']);
369
                    } catch (\Exception $e) {
370
                        // then try normal date parser
371
                        $delay = \DateInterval::createFromDateString($options['delay']);
372
                    }
373
                    break;
374
                case $options['delay'] instanceof \DateInterval:
375
                    $delay = $options['delay'];
376
                    break;
377
                default:
378
                    $delay = null;
379
            }
380
381
            if ($delay instanceof \DateInterval) {
382
                $scheduled->add($delay);
383
            }
384
        }
385
386
        return $scheduled;
387
    }
388
389
    /**
390
     * Converst a \DateTime object to its UTCDateTime representation.
391
     *
392
     * @param \DateTime $date
393
     *
394
     * @return \MongoDB\BSON\UTCDateTime
395
     */
396 6
    protected function dateTimeToUTCDateTime(\DateTime $date)
397
    {
398 6
        return new \MongoDB\BSON\UTCDateTime($date->getTimestamp() * 1000);
399
    }
400
401
}
402