Completed
Push — master ( 790544...a0889a )
by yuuki
9s
created

CouchbaseQueue::buildQueueQuery()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 7
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 5
nc 1
nop 2
dl 0
loc 7
ccs 5
cts 5
cp 1
crap 1
rs 9.4285
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
5
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
6
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
7
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
8
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
9
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
10
 * THE SOFTWARE.
11
 */
12
13
namespace Ytake\LaravelCouchbase\Queue;
14
15
use Carbon\Carbon;
16
use Illuminate\Queue\DatabaseQueue;
17
use Illuminate\Queue\Jobs\DatabaseJob;
18
use Ytake\LaravelCouchbase\Query\Builder;
19
use Ytake\LaravelCouchbase\Database\CouchbaseConnection;
20
21
/**
22
 * Class CouchbaseQueue
23
 *
24
 * @author Yuuki Takezawa<[email protected]>
25
 */
26
class CouchbaseQueue extends DatabaseQueue
27
{
28
    /**
29
     * The couchbase bucket that holds the jobs.
30
     *
31
     * @var string
32
     */
33
    protected $table;
34
35
    /** @var CouchbaseConnection */
36
    protected $database;
37
38
    /**
39
     * {@inheritdoc}
40
     */
41 1
    public function pop($queue = null)
42
    {
43 1
        $queue = $this->getQueue($queue);
44
45 1
        if (!is_null($this->expire)) {
46 1
            $this->releaseJobsThatHaveBeenReservedTooLong($queue);
47
        }
48
49 1
        if ($job = $this->getNextAvailableJob($queue)) {
50 1
            $this->markJobAsReserved($job->id);
51 1
            $bucket = $this->table;
52 1
            return new DatabaseJob(
53 1
                $this->container, $this, $job->$bucket, $queue
54
            );
55
        }
56
57 1
        return null;
58
    }
59
60
    /**
61
     * {@inheritdoc}
62
     */
63 1
    protected function releaseJobsThatHaveBeenReservedTooLong($queue)
64
    {
65 1
        $bucket = $this->table;
66 1
        $expired = Carbon::now()->subSeconds($this->expire)->getTimestamp();
67
68 1
        $first = $this->buildQueueQuery($queue, $expired)
69 1
            ->first(['*', 'meta().id']);
70 1
        if ($first) {
71
            $attempts = (isset($first->$bucket->attempts)) ? $first->$bucket->attempts : 1;
72
            $identifier = $first->id;
73
            $this->buildQueueQuery($queue, $expired)
74
                ->key($identifier)
75
                ->update([
76
                    'reserved'    => 0,
77
                    'reserved_at' => null,
78
                    'attempts'    => $attempts,
79
                ]);
80
        }
81 1
    }
82
83
    /**
84
     * @param $queue
85
     * @param $expired
86
     *
87
     * @return Builder
88
     */
89 1
    protected function buildQueueQuery($queue, $expired)
90
    {
91 1
        return $this->database->table($this->table)
92 1
            ->where('queue', $this->getQueue($queue))
93 1
            ->where('reserved', 1)
94 1
            ->where('reserved_at', '<=', $expired);
95
    }
96
97
    /**
98
     * {@inheritdoc}
99
     */
100 1
    protected function getNextAvailableJob($queue)
101
    {
102 1
        $job = $this->database->table($this->table)
103 1
            ->where('queue', $this->getQueue($queue))
104 1
            ->where('reserved', 0)
105 1
            ->where('available_at', '<=', $this->getTime())
106 1
            ->orderBy('id', 'asc')
107 1
            ->first(['*', 'meta().id']);
108
109 1
        return $job ? (object)$job : null;
110
    }
111
112
    /**
113
     * {@inheritdoc}
114
     */
115 1
    protected function markJobAsReserved($id)
116
    {
117 1
        $bucket = $this->table;
118
        /** @var \CouchbaseBucket $openBucket */
119 1
        $openBucket = $this->database->openBucket($bucket);
120
        // lock bucket
121 1
        $meta = $openBucket->getAndLock($id, 10);
122 1
        $meta->value->reserved = 1;
123 1
        $meta->value->reserved_at = $this->getTime();
124 1
        $openBucket->replace($id, $meta->value, ['cas' => $meta->cas]);
125
126 1
        return $meta->value;
127
    }
128
129
    /**
130
     * {@inheritdoc}
131
     */
132 1
    public function bulk($jobs, $data = '', $queue = null)
133
    {
134 1
        $queue = $this->getQueue($queue);
135
136 1
        $availableAt = $this->getAvailableAt(0);
137
138 1
        $records = array_map(function ($job) use ($queue, $data, $availableAt) {
139 1
            return $this->buildDatabaseRecord(
140 1
                $queue, $this->createPayload($job, $data), $availableAt
141
            );
142 1
        }, (array)$jobs);
143 1
        foreach ($records as $record) {
144 1
            $increment = $this->incrementKey();
145 1
            $record['id'] = $increment;
146 1
            $this->database->table($this->table)
147 1
                ->key($this->uniqueKey($record))->insert($record);
148
        }
149 1
    }
150
151
    /**
152
     * {@inheritdoc}
153
     */
154 1
    public function deleteReserved($queue, $id)
155
    {
156 1
        $this->database->table($this->table)->where('id', $id)->delete();
157 1
    }
158
159
    /**
160
     * {@inheritdoc}
161
     */
162
    protected function pushToDatabase($delay, $queue, $payload, $attempts = 0)
163
    {
164
        $attributes = $this->buildDatabaseRecord(
165
            $this->getQueue($queue), $payload, $this->getAvailableAt($delay), $attempts
166
        );
167
        $increment = $this->incrementKey();
168
        $attributes['id'] = $increment;
169
        $result = $this->database->table($this->table)
170
            ->key($this->uniqueKey($attributes))->insert($attributes);
171
        if ($result) {
172
            return $increment;
173
        }
174
175
        return false;
176
    }
177
178
    /**
179
     * generate increment key
180
     *
181
     * @param int $initial
182
     *
183
     * @return int
184
     */
185 1
    protected function incrementKey($initial = 1)
186
    {
187 1
        $result = $this->database->openBucket($this->table)
188 1
            ->counter($this->identifier(), $initial, ['initial' => abs($initial)]);
189
190 1
        return $result->value;
191
    }
192
193
    /**
194
     * @param array $attributes
195
     *
196
     * @return string
197
     */
198 1
    protected function uniqueKey(array $attributes)
199
    {
200 1
        $array = array_only($attributes, ['queue', 'attempts', 'id']);
201
202 1
        return implode(':', $array);
203
    }
204
205
    /**
206
     * @return string
207
     */
208 1
    protected function identifier()
209
    {
210 1
        return __CLASS__ . ':sequence';
211
    }
212
}
213