Completed
Branch master (b61033)
by Alexandre
04:26 queued 01:33
created

DoctrineAdapter::forget()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 8
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 8
rs 9.4285
cc 1
eloc 5
nc 1
nop 1
1
<?php
2
3
/**
4
 * This file is part of HeriJobQueueBundle.
5
 *
6
 * This source file is subject to the MIT license that is bundled
7
 * with this source code in the file LICENSE.
8
 */
9
10
namespace Heri\Bundle\JobQueueBundle\Adapter;
11
12
use ZendQueue\Adapter\AbstractAdapter;
13
use ZendQueue\Message;
14
use ZendQueue\Queue;
15
use Heri\Bundle\JobQueueBundle\Exception\AdapterRuntimeException;
16
17
/**
18
 * Doctrine adapter.
19
 *
20
 * @see ZendQueue\Adapter\AbstractAdapter
21
 */
22
class DoctrineAdapter extends AbstractAdapter implements AdapterInterface
23
{
24
    /**
25
     * @var Doctrine\ORM\EntityManager.
26
     */
27
    public $em;
28
29
    /**
30
     * @var int.
31
     */
32
    public $priority = 0;
33
34
    /**
35
     * Does a queue already exist?
36
     *
37
     * Throws an exception if the adapter cannot determine if a queue exists.
38
     * use isSupported('isExists') to determine if an adapter can test for
39
     * queue existance.
40
     *
41
     * @param string $name
42
     *
43
     * @return bool
44
     *
45
     * @throws ZendQueue\Exception
46
     */
47
    public function isExists($name)
48
    {
49
        $repo = $this->em
50
            ->getRepository('Heri\Bundle\JobQueueBundle\Entity\Queue')
51
            ->findOneBy([
52
                'name' => $name,
53
            ]);
54
55
        return ($repo) ? true : false;
56
    }
57
58
    /**
59
     * Create a new queue.
60
     *
61
     * Visibility timeout is how long a message is left in the queue "invisible"
62
     * to other readers.  If the message is acknowleged (deleted) before the
63
     * timeout, then the message is deleted.  However, if the timeout expires
64
     * then the message will be made available to other queue readers.
65
     *
66
     * @param string $name    Queue name
67
     * @param int    $timeout Default visibility timeout
68
     *
69
     * @return bool
70
     *
71
     * @throws ZendQueue\Exception - database error
72
     */
73
    public function create($name, $timeout = null)
74
    {
75
        if ($this->isExists($name)) {
76
            return false;
77
        }
78
79
        $queue = new \Heri\Bundle\JobQueueBundle\Entity\Queue();
80
        $queue->setName($name);
81
        $newtimeout = (is_null($timeout)) ? self::CREATE_TIMEOUT_DEFAULT : (int) $timeout;
82
        $queue->setTimeout($newtimeout);
0 ignored issues
show
Documentation introduced by
$newtimeout is of type integer, but the function expects a object<Heri\Bundle\JobQu...Bundle\Entity\smallint>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
83
84
        $this->em->persist($queue);
85
        $this->em->flush();
86
87
        return true;
88
    }
89
90
    /**
91
     * Delete a queue and all of it's messages.
92
     *
93
     * Returns false if the queue is not found, true if the queue exists
94
     *
95
     * @param string $name Queue name
96
     *
97
     * @return bool
98
     *
99
     * @throws ZendQueue\Exception
100
     */
101
    public function delete($name)
102
    {
103
        // Get primary key
104
        $id = $this->getQueueEntity($name);
105
106
        $queue = $this->em
107
            ->getRepository('Heri\Bundle\JobQueueBundle\Entity\Queue')
108
            ->find($id);
109
110
        $messages = $this->em
111
            ->getRepository('Heri\Bundle\JobQueueBundle\Entity\Message')
112
            ->findBy([
113
                'queue' => $queue,
114
            ]);
115
        foreach ($messages as $message) {
116
            $this->em->remove($message);
117
        }
118
119
        $this->em->remove($queue);
120
        $this->em->flush();
121
        $this->em->clear();
122
123
        return true;
124
    }
125
126
    /*
127
     * Get an array of all available queues
128
     *
129
     * Not all adapters support getQueues(), use isSupported('getQueues')
130
     * to determine if the adapter supports this feature.
131
     *
132
     * @return array
133
     */
134
    public function getQueues()
135
    {
136
        $list = [];
137
138
        $queues = $this->em
139
            ->getRepository('Heri\Bundle\JobQueueBundle\Entity\Queue')
140
            ->findAll();
141
        foreach ($queues as $queue) {
142
            $list[] = $queue->name;
143
        }
144
145
        return $list;
146
    }
147
148
    /**
149
     * Return the approximate number of messages in the queue.
150
     *
151
     * @param ZendQueue\Queue $queue
152
     *
153
     * @return int
154
     *
155
     * @throws ZendQueue\Exception
156
     */
157
    public function count(Queue $queue = null)
158
    {
159
        $qb = $this->em->createQueryBuilder();
160
        $qb
161
            ->select('count(m)')
162
            ->from('Heri\Bundle\JobQueueBundle\Entity\Message', 'm')
163
            ->leftJoin('m.queue', 'Queue')
164
        ;
165
166
        if ($queue instanceof Queue) {
167
            $qb
168
                ->where($qb->expr()->eq('Queue.name', ':name'))
169
                ->setParameter('name', $queue->getName())
170
            ;
171
        }
172
173
        $query = $qb->getQuery();
174
175
        return $query->getSingleScalarResult();
176
    }
177
178
    /**
179
     * Send a message to the queue.
180
     *
181
     * @param string          $message Message to send to the active queue
182
     * @param ZendQueue\Queue $queue
183
     *
184
     * @return ZendQueue\Message
185
     *
186
     * @throws ZendQueue\Exception
187
     */
188
    public function send($message, Queue $queue = null)
189
    {
190
        $body = '';
191
192
        if ($queue === null) {
193
            $queue = $this->_queue;
194
        }
195
196
        if (is_scalar($message)) {
197
            $body = (string) $message;
198
        }
199
200
        if (is_string($message)) {
201
            $body = trim($message);
202
        }
203
204
        if (!$this->isExists($queue->getName())) {
205
            throw new AdapterRuntimeException("Queue does not exist: {$queue->getName()}");
206
        }
207
208
        $entity = $this->createMessage($queue, $body);
209
210
        $options = [
211
            'queue' => $queue,
212
            'data' => $entity->toArray(),
213
        ];
214
215
        $classname = $queue->getMessageClass();
216
217
        return new $classname($options);
218
    }
219
220
    /**
221
     * Get messages in the queue.
222
     *
223
     * @param int             $maxMessages Maximum number of messages to return
224
     * @param int             $timeout     Visibility timeout for these messages
225
     * @param ZendQueue\Queue $queue
226
     *
227
     * @return ZendQueue\MessageIterator
228
     *
229
     * @throws ZendQueue\Exception Database error
230
     */
231
    public function receive($maxMessages = null, $timeout = null, Queue $queue = null)
232
    {
233
        $result = [];
234
235
        // Cache microtime
236
        $microtime = microtime(true);
237
238
        if (is_null($queue)) {
239
            $queue = $this->_queue;
240
        }
241
242
        if ($maxMessages > 0) {
243
            $messages = $this->getMessages(
244
                $maxMessages,
245
                $timeout,
246
                $queue,
0 ignored issues
show
Documentation introduced by
$queue is of type object<ZendQueue\Queue>, but the function expects a object<Heri\Bundle\JobQu...r\ZendQueue\Queue>|null.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
247
                $microtime
248
            );
249
250
            // Update working messages
251
            foreach ($messages as $message) {
252
                $key = md5(uniqid(rand(), true));
253
                $message->setHandle($key);
254
                $message->setTimeout($microtime);
255
256
                $result[] = $message->toArray();
257
            }
258
            $this->em->flush();
259
        }
260
261
        $options = [
262
            'queue' => $queue,
263
            'data' => $result,
264
            'messageClass' => $queue->getMessageClass(),
265
        ];
266
267
        $classname = $queue->getMessageSetClass();
268
269
        return new $classname($options);
270
    }
271
272
    /**
273
     * Delete a message from the queue.
274
     *
275
     * Returns true if the message is deleted, false if the deletion is
276
     * unsuccessful.
277
     *
278
     * @param ZendQueue\Message $message
279
     *
280
     * @return bool
281
     *
282
     * @throws ZendQueue\Exception - database error
283
     */
284
    public function deleteMessage(Message $message)
285
    {
286
        $repo = $this->em
287
            ->getRepository('Heri\Bundle\JobQueueBundle\Entity\Message')
288
            ->findOneBy([
289
                'handle' => $message->handle,
290
            ]);
291
292
        $this->em->remove($repo);
293
        $this->em->flush();
294
295
        return $this->em->clear();
296
    }
297
298
    /**
299
     * Return a list of queue capabilities functions.
300
     *
301
     * $array['function name'] = true or false
302
     * true is supported, false is not supported.
303
     *
304
     * @param string $name
0 ignored issues
show
Bug introduced by
There is no parameter named $name. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
305
     *
306
     * @return array
307
     */
308
    public function getCapabilities()
309
    {
310
        return [
311
            'create' => true,
312
            'delete' => true,
313
            'send' => true,
314
            'receive' => true,
315
            'deleteMessage' => true,
316
            'getQueues' => true,
317
            'count' => true,
318
            'isExists' => true,
319
        ];
320
    }
321
322
    /**
323
     * Retry failed messages.
324
     *
325
     * @param int $id
326
     */
327
    public function retry($id = null)
328
    {
329
        $sql = <<<EOL
330
            UPDATE Heri\Bundle\JobQueueBundle\Entity\Message m
331
            SET m.numRetries = 0
332
EOL;
333
334
        $query = $this->em->createQuery($sql);
335
        if (!is_null($id)) {
336
            $sql .= ' WHERE m.id = ?1';
337
338
            $query->setParameter(1, $id);
339
        }
340
341
        $query->execute();
342
    }
343
344
    /**
345
     * Delete a failed message.
346
     *
347
     * @param int $id
348
     */
349
    public function forget($id)
350
    {
351
        $sql = <<<EOL
352
        DELETE FROM Heri\Bundle\JobQueueBundle\Entity\Message m WHERE m.id = ?1
353
EOL;
354
355
        return $this->em->createQuery($sql)->setParameter(1, $id)->execute();
356
    }
357
358
    /**
359
     * {@inheritdoc}
360
     */
361
    public function setPriority($priority)
362
    {
363
        $this->priority = $priority;
364
    }
365
366
    /**
367
     * {@inheritdoc}
368
     */
369
    public function showMessages($queueName)
370
    {
371
        $results = [];
372
        if ($this->isExists($queueName)) {
373
            $qb = $this->em->createQueryBuilder();
374
            $qb
375
                ->select('m.id, m.body, m.created, m.ended, m.failed')
376
                ->from('Heri\Bundle\JobQueueBundle\Entity\Message', 'm')
377
                ->leftJoin('m.queue', 'Queue')
378
                ->where($qb->expr()->eq('Queue.name', ':name'))
379
                ->setParameter('name', $queueName)
380
            ;
381
382
            $query = $qb->getQuery();
383
            $results = $query->getResult(\Doctrine\ORM\Query::HYDRATE_ARRAY);
384
        }
385
386
        return $results;
387
    }
388
389
    /**
390
     * {@inheritdoc}
391
     */
392
    public function flush()
393
    {
394
        $sql = 'DELETE Heri\Bundle\JobQueueBundle\Entity\MessageLog';
395
396
        return $this->em->createQuery($sql);
397
    }
398
399
    /**
400
     * {@inheritdoc}
401
     */
402
    public function logException($message, $e)
403
    {
404
        $sql = <<<EOL
405
            UPDATE Heri\Bundle\JobQueueBundle\Entity\Message m
406
            SET
407
                m.ended = 0,
408
                m.failed = 1,
409
                m.numRetries = m.numRetries + ?1,
410
                m.priority = 0
411
            WHERE m.id = ?2
412
EOL;
413
414
        $this->em->createQuery($sql)
415
            ->setParameter(1, $message->failed ? 1 : 0)
416
            ->setParameter(2, $message->id)
417
            ->execute()
418
        ;
419
420
        $messageObject = $this->em
421
            ->getRepository('Heri\Bundle\JobQueueBundle\Entity\Message')
422
            ->find($message->id);
423
424
        $log = new \Heri\Bundle\JobQueueBundle\Entity\MessageLog();
425
        $log->setMessageId($messageObject);
426
        $log->setDateLog(new \DateTime('now'));
427
        $log->setLog($e->getMessage());
428
        $this->em->persist($log);
429
        $this->em->flush();
430
    }
431
432
    /**
433
     * Create a new message.
434
     *
435
     * @param ZendQueue\Queue $queue
436
     * @param string          $body
437
     */
438
    protected function createMessage(Queue $queue, $body)
439
    {
440
        // check if message exist
441
        $message = $this->em
442
            ->getRepository('Heri\Bundle\JobQueueBundle\Entity\Message')
443
            ->findOneBy([
444
                'md5' => md5($body),
445
            ]);
446
447
        if (!$message) {
448
            $message = new \Heri\Bundle\JobQueueBundle\Entity\Message();
449
            $message->setQueue($this->getQueueEntity($queue->getName()));
0 ignored issues
show
Documentation introduced by
$this->getQueueEntity($queue->getName()) is of type object<ZendQueue\Queue>, but the function expects a null|object<Heri\Bundle\...eueBundle\Entity\Queue>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
450
            $message->setBody($body);
0 ignored issues
show
Documentation introduced by
$body is of type string, but the function expects a object<Heri\Bundle\JobQueueBundle\Entity\text>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
451
            $message->setMd5(md5($body));
452
            $message->setPriority($this->priority);
0 ignored issues
show
Documentation introduced by
$this->priority is of type integer, but the function expects a object<Heri\Bundle\JobQu...Bundle\Entity\smallint>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
453
            $message->setFailed(false);
454
            $message->setEnded(false);
455
456
            $this->em->persist($message);
457
            $this->em->flush();
458
            $this->em->clear();
459
        }
460
461
        return $message;
462
    }
463
464
    /**
465
     * Get messages of the queue.
466
     *
467
     * @param int             $maxMessages
468
     * @param int             $timeout
469
     * @param ZendQueue\Queue $queue
470
     * @param int             $microtime
471
     */
472
    protected function getMessages($maxMessages, $timeout, $queue = null, $microtime = null)
473
    {
474
        if ($maxMessages === null) {
475
            $maxMessages = 1;
476
        }
477
478
        if ($timeout === null) {
479
            $timeout = self::RECEIVE_TIMEOUT_DEFAULT;
480
        }
481
482
        $andWhere = '';
483
        if ($queue instanceof Queue) {
484
            $andWhere = 'AND (m.queue = :queue) AND (q.maxRetries IS NULL OR m.numRetries < q.maxRetries)';
485
        }
486
487
        // Search for all messages inside the timeout
488
        $sql = 'SELECT m '.
489
            'FROM Heri\Bundle\JobQueueBundle\Entity\Message m '.
490
            'LEFT JOIN m.queue q '.
491
            'WHERE (m.handle IS NULL OR m.handle = \'\' OR m.timeout + :timeout < :microtime) '.$andWhere.' '.
492
            'ORDER BY m.priority DESC';
493
494
        $query = $this->em->createQuery($sql);
495
496
        $query->setParameter('timeout', (int) $timeout);
497
        $query->setParameter('microtime', (int) $microtime);
498
499
        if ($queue instanceof Queue) {
500
            $query->setParameter('queue', $this->getQueueEntity($queue->getName()));
501
        }
502
        $query->setMaxResults($maxMessages);
503
504
        return $query->getResult();
505
    }
506
507
    /**
508
     * Get the queue entity.
509
     *
510
     * @param string $name
511
     *
512
     * @return Queue Entity
513
     *
514
     * @throws ZendQueue\Exception
515
     */
516
    protected function getQueueEntity($name)
517
    {
518
        $repo = $this->em
519
            ->getRepository('Heri\Bundle\JobQueueBundle\Entity\Queue')
520
            ->findOneBy([
521
                'name' => $name,
522
            ]);
523
524
        if (!$repo) {
525
            throw new AdapterRuntimeException(sprintf('Queue does not exist: %s', $name));
526
        }
527
528
        return $repo;
529
    }
530
}
531