MessageQueue::convert()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 14
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 14
c 0
b 0
f 0
ccs 8
cts 8
cp 1
rs 10
cc 1
nc 1
nop 0
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2019 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use IteratorIterator;
16
use MongoDB\Database;
17
use MongoDB\Driver\Exception\ConnectionException;
18
use MongoDB\Driver\Exception\RuntimeException;
19
use MongoDB\Driver\Exception\ServerException;
20
use MongoDB\Operation\Find;
21
use Psr\Log\LoggerInterface;
22
23
class MessageQueue
24
{
25
    /**
26
     * Database.
27
     *
28
     * @var Database
29
     */
30
    protected $db;
31
32
    /**
33
     * Queue name.
34
     *
35
     * @var string
36
     */
37
    protected $name;
38
39
    /**
40
     *  Queue size in bytes.
41
     *
42
     * @var int
43
     */
44
    protected $size = 1000000;
45
46
    /**
47
     * Logger.
48
     *
49
     * @var LoggerInterface
50
     */
51
    protected $logger;
52
53
    /**
54
     * Message queue.
55
     */
56 67
    public function __construct(Database $db, string $name, int $size, LoggerInterface $logger)
57
    {
58 67
        $this->db = $db;
59 67
        $this->name = $name;
60 67
        $this->size = $size;
61 67
        $this->logger = $logger;
62 67
    }
63
64
    /**
65
     * Create queue and insert a dummy object to start cursor
66
     * Dummy object is required, otherwise we would get a dead cursor.
67
     */
68 4
    public function create(): self
69
    {
70 4
        $this->logger->info('create new queue ['.$this->name.']', [
71 4
            'category' => get_class($this),
72
        ]);
73
74
        try {
75 4
            $this->db->createCollection(
76 4
                $this->name,
77
                [
78 4
                    'capped' => true,
79 4
                    'size' => $this->size,
80
                ]
81
            );
82
83 2
            $this->db->{$this->name}->insertOne(['class' => 'dummy']);
84 3
        } catch (RuntimeException $e) {
85
            //ignore if code 48 (collection does already exist)
86 3
            if (48 !== $e->getCode()) {
87 2
                throw $e;
88
            }
89
        }
90
91 2
        return $this;
92
    }
93
94
    /**
95
     * Retrieve next message.
96
     */
97 17
    public function next(IteratorIterator $cursor, callable $callback): void
98
    {
99
        try {
100 17
            $cursor->next();
101 1
        } catch (RuntimeException | ServerException $e) {
102 1
            $this->logger->error('message queue cursor for ['.$this->name.'] failed to retrieve next message', [
103 1
                'category' => get_class($this),
104 1
                'exception' => $e,
105
            ]);
106
107 1
            $callback();
108
        }
109 17
    }
110
111
    /**
112
     * Get cursor.
113
     */
114 18
    public function getCursor(array $query = []): IteratorIterator
115
    {
116
        $options = [
117 18
            'typeMap' => Scheduler::TYPE_MAP,
118 18
            'cursorType' => Find::TAILABLE_AWAIT,
119
            'noCursorTimeout' => true,
120
        ];
121
122
        try {
123 18
            $cursor = $this->db->{$this->name}->find($query, $options);
124
        } catch (ConnectionException | ServerException $e) {
125
            if (2 === $e->getCode()) {
126
                $this->convert();
127
128
                return $this->getCursor($query);
129
            }
130
131
            throw $e;
132
        } catch (RuntimeException $e) {
133
            return $this->getCursor($query);
134
        }
135
136 18
        $iterator = new IteratorIterator($cursor);
137 18
        $iterator->rewind();
138
139 18
        return $iterator;
140
    }
141
142
    /**
143
     * Create queue and insert a dummy object to start cursor
144
     * Dummy object is required, otherwise we would get a dead cursor.
145
     *
146
     * @return AbstractQueue
0 ignored issues
show
Bug introduced by
The type TaskScheduler\AbstractQueue was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
147
     */
148 1
    public function convert(): self
149
    {
150 1
        $this->logger->info('convert existing queue collection ['.$this->name.'] into a capped collection', [
151 1
            'category' => get_class($this),
152
        ]);
153
154 1
        $this->db->command([
155 1
            'convertToCapped' => $this->name,
156 1
            'size' => $this->size,
157
        ]);
158
159 1
        $this->db->{$this->name}->insertOne(['class' => 'dummy']);
160
161 1
        return $this;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this returns the type TaskScheduler\MessageQueue which is incompatible with the documented return type TaskScheduler\AbstractQueue.
Loading history...
162
    }
163
}
164