Passed
Branch dev (b82c9a)
by Raffael
05:11
created

MessageQueue::create()   A

Complexity

Conditions 3
Paths 5

Size

Total Lines 24
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 3

Importance

Changes 0
Metric Value
eloc 12
dl 0
loc 24
c 0
b 0
f 0
ccs 12
cts 12
cp 1
rs 9.8666
cc 3
nc 5
nop 0
crap 3
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use IteratorIterator;
16
use MongoDB\Database;
17
use MongoDB\Driver\Exception\ConnectionException;
18
use MongoDB\Driver\Exception\RuntimeException;
19
use MongoDB\Driver\Exception\ServerException;
0 ignored issues
show
Bug introduced by
The type MongoDB\Driver\Exception\ServerException was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
20
use MongoDB\Operation\Find;
21
use Psr\Log\LoggerInterface;
22
23
class MessageQueue
24
{
25
    protected $db;
26
    protected $name;
27
    protected $size = 100000;
28
    protected $logger;
29
30
    /**
31
     * Message queue.
32
     *
33
     * @var Database
34
     */
35 58
    public function __construct(Database $db, string $name, int $size, LoggerInterface $logger)
36
    {
37 58
        $this->db = $db;
38 58
        $this->name = $name;
39 58
        $this->size = $size;
40 58
        $this->logger = $logger;
41 58
    }
42
43
    /**
44
     * Create queue and insert a dummy object to start cursor
45
     * Dummy object is required, otherwise we would get a dead cursor.
46
     */
47 4
    public function create(): self
48
    {
49 4
        $this->logger->info('create new queue ['.$this->name.']', [
50 4
            'category' => get_class($this),
51
        ]);
52
53
        try {
54 4
            $this->db->createCollection(
55 4
                $this->name,
56
                [
57 4
                    'capped' => true,
58 4
                    'size' => $this->size,
59
                ]
60
            );
61
62 2
            $this->db->{$this->name}->insertOne(['class' => 'dummy']);
63 3
        } catch (RuntimeException $e) {
64
            //ignore if code 48 (collection does already exist)
65 3
            if (48 !== $e->getCode()) {
66 2
                throw $e;
67
            }
68
        }
69
70 2
        return $this;
71
    }
72
73
    /**
74
     * Retrieve next message.
75
     */
76 5
    public function next(IteratorIterator $cursor, callable $callback): void
77
    {
78
        try {
79 5
            $cursor->next();
80 1
        } catch (RuntimeException $e) {
81 1
            $this->logger->error('message queue cursor for ['.$this->name.'] failed to retrieve next message', [
82 1
                'category' => get_class($this),
83 1
                'exception' => $e,
84
            ]);
85
86 1
            $callback();
87
        }
88 5
    }
89
90
    /**
91
     * Get cursor.
92
     */
93 6
    public function getCursor(array $query = []): IteratorIterator
94
    {
95
        $options = [
96 6
            'typeMap' => Scheduler::TYPE_MAP,
97 6
            'cursorType' => Find::TAILABLE,
98
            'noCursorTimeout' => true,
99
        ];
100
101
        try {
102 6
            $cursor = $this->db->{$this->name}->find($query, $options);
103
        } catch (ConnectionException | ServerException $e) {
104
            if (2 === $e->getCode()) {
105
                $this->convert();
106
107
                return $this->getCursor($query);
108
            }
109
110
            throw $e;
111
        } catch (RuntimeException $e) {
112
            return $this->getCursor($query);
113
        }
114
115 6
        $iterator = new IteratorIterator($cursor);
116 6
        $iterator->rewind();
117
118 6
        return $iterator;
119
    }
120
121
    /**
122
     * Create queue and insert a dummy object to start cursor
123
     * Dummy object is required, otherwise we would get a dead cursor.
124
     *
125
     * @return AbstractQueue
0 ignored issues
show
Bug introduced by
The type TaskScheduler\AbstractQueue was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
126
     */
127 1
    public function convert(): self
128
    {
129 1
        $this->logger->info('convert existing queue collection ['.$this->name.'] into a capped collection', [
130 1
            'category' => get_class($this),
131
        ]);
132
133 1
        $this->db->command([
134 1
            'convertToCapped' => $this->name,
135 1
            'size' => $this->size,
136
        ]);
137
138 1
        $this->db->{$this->name}->insertOne(['class' => 'dummy']);
139
140 1
        return $this;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this returns the type TaskScheduler\MessageQueue which is incompatible with the documented return type TaskScheduler\AbstractQueue.
Loading history...
141
    }
142
}
143