Completed
Push — master ( 07905b...790544 )
by yuuki
10s
created

releaseJobsThatHaveBeenReservedTooLong()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 20
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 3.9148

Importance

Changes 0
Metric Value
cc 3
eloc 15
nc 3
nop 1
dl 0
loc 20
ccs 8
cts 15
cp 0.5333
crap 3.9148
rs 9.4285
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
5
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
6
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
7
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
8
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
9
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
10
 * THE SOFTWARE.
11
 */
12
13
namespace Ytake\LaravelCouchbase\Queue;
14
15
use Carbon\Carbon;
16
use Illuminate\Queue\DatabaseQueue;
17
use Illuminate\Queue\Jobs\DatabaseJob;
18
use Ytake\LaravelCouchbase\Query\Builder;
19
use Ytake\LaravelCouchbase\Database\CouchbaseConnection;
20
21
/**
22
 * Class CouchbaseQueue
23
 *
24
 * @author Yuuki Takezawa<[email protected]>
25
 */
26
class CouchbaseQueue extends DatabaseQueue
27
{
28
    /**
29
     * The couchbase bucket that holds the jobs.
30
     *
31
     * @var string
32
     */
33
    protected $table;
34
35
    /** @var CouchbaseConnection */
36
    protected $database;
37
38
    /**
39
     * {@inheritdoc}
40
     */
41 1
    public function pop($queue = null)
42
    {
43 1
        $queue = $this->getQueue($queue);
44
45 1
        if (!is_null($this->expire)) {
46 1
            $this->releaseJobsThatHaveBeenReservedTooLong($queue);
47
        }
48
49 1
        if ($job = $this->getNextAvailableJob($queue)) {
50 1
            $this->markJobAsReserved($job->id);
51 1
            $bucket = $this->table;
52 1
            return new DatabaseJob(
53 1
                $this->container, $this, $job->$bucket, $queue
54
            );
55
        }
56
57 1
        return null;
58
    }
59
60
    /**
61
     * {@inheritdoc}
62
     */
63 1
    protected function releaseJobsThatHaveBeenReservedTooLong($queue)
64
    {
65 1
        $bucket = $this->table;
66 1
        $expired = Carbon::now()->subSeconds($this->expire)->getTimestamp();
67
68 1
        $first = $this->buildQueueQuery($queue, $expired)
69 1
            ->first(['*', 'meta().id']);
70 1
        $identifier = null;
0 ignored issues
show
Unused Code introduced by
$identifier is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
71 1
        if ($first) {
72
            $attempts = (isset($first->$bucket->attempts)) ? $first->$bucket->attempts : 1;
73
            $identifier = $first->id;
74
            $this->buildQueueQuery($queue, $expired)
75
                ->key($identifier)
76
                ->update([
77
                    'reserved'    => 0,
78
                    'reserved_at' => null,
79
                    'attempts'    => $attempts,
80
                ]);
81
        }
82 1
    }
83
84
    /**
85
     * @param $queue
86
     * @param $expired
87
     *
88
     * @return Builder
89
     */
90 1
    protected function buildQueueQuery($queue, $expired)
91
    {
92 1
        return $this->database->table($this->table)
93 1
            ->where('queue', $this->getQueue($queue))
94 1
            ->where('reserved', 1)
95 1
            ->where('reserved_at', '<=', $expired);
96
    }
97
98
    /**
99
     * {@inheritdoc}
100
     */
101 1
    protected function getNextAvailableJob($queue)
102
    {
103 1
        $job = $this->database->table($this->table)
104 1
            ->where('queue', $this->getQueue($queue))
105 1
            ->where('reserved', 0)
106 1
            ->where('available_at', '<=', $this->getTime())
107 1
            ->orderBy('id', 'asc')
108 1
            ->first(['*', 'meta().id']);
109
110 1
        return $job ? (object)$job : null;
111
    }
112
113
    /**
114
     * {@inheritdoc}
115
     */
116 1
    protected function markJobAsReserved($id)
117
    {
118 1
        $bucket = $this->table;
119
        /** @var \CouchbaseBucket $openBucket */
120 1
        $openBucket = $this->database->openBucket($bucket);
121
        // lock bucket
122 1
        $meta = $openBucket->getAndLock($id, 10);
123 1
        $meta->value->reserved = 1;
124 1
        $meta->value->reserved_at = $this->getTime();
125 1
        $openBucket->replace($id, $meta->value, ['cas' => $meta->cas]);
126
127 1
        return $meta->value;
128
    }
129
130
    /**
131
     * {@inheritdoc}
132
     */
133 1
    public function bulk($jobs, $data = '', $queue = null)
134
    {
135 1
        $queue = $this->getQueue($queue);
136
137 1
        $availableAt = $this->getAvailableAt(0);
138
139 1
        $records = array_map(function ($job) use ($queue, $data, $availableAt) {
140 1
            return $this->buildDatabaseRecord(
141 1
                $queue, $this->createPayload($job, $data), $availableAt
142
            );
143 1
        }, (array)$jobs);
144 1
        foreach ($records as $record) {
145 1
            $increment = $this->incrementKey();
146 1
            $record['id'] = $increment;
147 1
            $this->database->table($this->table)
148 1
                ->key($this->uniqueKey($record))->insert($record);
149
        }
150 1
    }
151
152
    /**
153
     * {@inheritdoc}
154
     */
155 1
    public function deleteReserved($queue, $id)
156
    {
157 1
        $this->database->table($this->table)->where('id', $id)->delete();
158 1
    }
159
160
    /**
161
     * {@inheritdoc}
162
     */
163
    protected function pushToDatabase($delay, $queue, $payload, $attempts = 0)
164
    {
165
        $attributes = $this->buildDatabaseRecord(
166
            $this->getQueue($queue), $payload, $this->getAvailableAt($delay), $attempts
167
        );
168
        $increment = $this->incrementKey();
169
        $attributes['id'] = $increment;
170
        $result = $this->database->table($this->table)
171
            ->key($this->uniqueKey($attributes))->insert($attributes);
172
        if ($result) {
173
            return $increment;
174
        }
175
176
        return false;
177
    }
178
179
    /**
180
     * generate increment key
181
     *
182
     * @param int $initial
183
     *
184
     * @return int
185
     */
186 1
    protected function incrementKey($initial = 1)
187
    {
188 1
        $result = $this->database->openBucket($this->table)
189 1
            ->counter($this->identifier(), $initial, ['initial' => abs($initial)]);
190
191 1
        return $result->value;
192
    }
193
194
    /**
195
     * @param array $attributes
196
     *
197
     * @return string
198
     */
199 1
    protected function uniqueKey(array $attributes)
200
    {
201 1
        $array = array_only($attributes, ['queue', 'attempts', 'id']);
202
203 1
        return implode(':', $array);
204
    }
205
206
    /**
207
     * @return string
208
     */
209 1
    protected function identifier()
210
    {
211 1
        return __CLASS__ . ':sequence';
212
    }
213
}
214