Completed
Branch dev (11269e)
by Raffael
04:12
created

AbstractQueue::retrieveNextJob()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 12
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 3.6875

Importance

Changes 0
Metric Value
eloc 8
dl 0
loc 12
ccs 2
cts 8
cp 0.25
rs 10
c 0
b 0
f 0
cc 2
nc 2
nop 1
crap 3.6875
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use IteratorIterator;
16
use MongoDB\Database;
17
use MongoDB\Driver\Exception\ConnectionException;
18
use MongoDB\Driver\Exception\RuntimeException;
19
use MongoDB\Driver\Exception\ServerException;
0 ignored issues
show
Bug introduced by
The type MongoDB\Driver\Exception\ServerException was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
20
use MongoDB\Operation\Find;
21
use Psr\Container\ContainerInterface;
22
use Psr\Log\LoggerInterface;
23
24
class AbstractQueue
25
{
26
    /**
27
     * Scheduler.
28
     *
29
     * @param Scheduler
30
     */
31
    protected $scheduler;
32
33
    /**
34
     * Database.
35
     *
36
     * @var Database
37
     */
38
    protected $db;
39
40
    /**
41
     * LoggerInterface.
42
     *
43
     * @var LoggerInterface
44
     */
45
    protected $logger;
46
47
    /**
48
     * Collection name.
49
     *
50
     * @var string
51
     */
52
    protected $collection_name = 'queue';
53
54
    /**
55
     * Container.
56
     *
57
     * @var ContainerInterface
58
     */
59
    protected $container;
60
61
    /**
62
     * Process ID.
63
     *
64
     * @var string
65
     */
66
    protected $process;
67
68
    /**
69
     * Create queue and insert a dummy object to start cursor
70
     * Dummy object is required, otherwise we would get a dead cursor.
71
     *
72
     * @return Queue
73
     */
74 3
    protected function createQueue(): self
75
    {
76 3
        $this->logger->info('create new queue ['.$this->collection_name.']', [
77 3
            'category' => get_class($this),
78 3
            'pm' => $this->process,
79
        ]);
80
81
        try {
82 3
            $this->db->createCollection(
83 3
                $this->collection_name,
84
                [
85 3
                    'capped' => true,
86 3
                    'size' => $this->scheduler->getQueueSize(),
87
                ]
88
            );
89
90 2
            $this->db->{$this->collection_name}->insertOne(['class' => 'dummy']);
91 2
        } catch (RuntimeException $e) {
92 2
            if (48 !== $e->getCode()) {
93 1
                throw $e;
94
            }
95
        }
96
97 2
        return $this;
98
    }
99
100
    /**
101
     * Create queue and insert a dummy object to start cursor
102
     * Dummy object is required, otherwise we would get a dead cursor.
103
     *
104
     * @return Queue
105
     */
106 1
    protected function convertQueue(): self
107
    {
108 1
        $this->logger->info('convert existing queue collection ['.$this->collection_name.'] into a capped collection', [
109 1
            'category' => get_class($this),
110 1
            'pm' => $this->process,
111
        ]);
112
113 1
        $this->db->command([
114 1
            'convertToCapped' => $this->collection_name,
115 1
            'size' => $this->scheduler->getQueueSize(),
116
        ]);
117
118 1
        $this->db->{$this->collection_name}->insertOne(['class' => 'dummy']);
119
120 1
        return $this;
121
    }
122
123
    /**
124
     * Retrieve next job.
125
     *
126
     * @param IteratorIterator $cursor
127
     */
128 2
    protected function retrieveNextJob(IteratorIterator $cursor)
129
    {
130
        try {
131 2
            $cursor->next();
132
        } catch (RuntimeException $e) {
133
            $this->logger->error('job queue cursor failed to retrieve next job, restart queue listener', [
134
                'category' => get_class($this),
135
                'pm' => $this->process,
136
                'exception' => $e,
137
            ]);
138
139
            $this->main();
0 ignored issues
show
introduced by
The method main() does not exist on TaskScheduler\AbstractQueue. Maybe you want to declare this class abstract? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

139
            $this->/** @scrutinizer ignore-call */ 
140
                   main();
Loading history...
140
        }
141 2
    }
142
143
    /**
144
     * Get cursor.
145
     *
146
     * @param bool $tailable
147
     *
148
     * @return IteratorIterator
149
     */
150 6
    protected function getCursor(bool $tailable = true): IteratorIterator
151
    {
152 6
        $options = ['typeMap' => Scheduler::TYPE_MAP];
153
154 6
        if (true === $tailable) {
155 6
            $options['cursorType'] = Find::TAILABLE;
156 6
            $options['noCursorTimeout'] = true;
157
        }
158
159
        try {
160 6
            $cursor = $this->db->{$this->collection_name}->find([
161 6
                '$or' => [
162
                    ['status' => JobInterface::STATUS_WAITING],
163
                    ['status' => JobInterface::STATUS_POSTPONED],
164
                ],
165 6
            ], $options);
166
        } catch (ConnectionException | ServerException $e) {
167
            if (2 === $e->getCode()) {
168
                $this->convertQueue();
169
170
                return $this->getCursor($tailable);
171
            }
172
173
            throw $e;
174
        } catch (RuntimeException $e) {
175
            return $this->getCursor($tailable);
176
        }
177
178 6
        $iterator = new IteratorIterator($cursor);
179 6
        $iterator->rewind();
180
181 6
        return $iterator;
182
    }
183
}
184