Completed
Branch master (07905b)
by yuuki
01:56
created

CouchbaseQueue::pushToDatabase()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 15
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
cc 2
eloc 10
nc 2
nop 4
dl 0
loc 15
ccs 0
cts 4
cp 0
crap 6
rs 9.4285
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
5
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
6
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
7
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
8
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
9
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
10
 * THE SOFTWARE.
11
 */
12
13
namespace Ytake\LaravelCouchbase\Queue;
14
15
use Carbon\Carbon;
16
use Illuminate\Database\Query\Builder;
17
use Illuminate\Queue\DatabaseQueue;
18
use Illuminate\Queue\Jobs\DatabaseJob;
19
use Ytake\LaravelCouchbase\Database\CouchbaseConnection;
20
21
/**
22
 * Class CouchbaseQueue
23
 *
24
 * @author Yuuki Takezawa<[email protected]>
25
 */
26
class CouchbaseQueue extends DatabaseQueue
27
{
28
    /**
29
     * The couchbase bucket that holds the jobs.
30
     *
31
     * @var string
32
     */
33
    protected $table;
34
35
    /** @var CouchbaseConnection */
36
    protected $database;
37
38
    /**
39
     * {@inheritdoc}
40
     */
41 1
    public function pop($queue = null)
42
    {
43 1
        $queue = $this->getQueue($queue);
44
45 1
        if (!is_null($this->expire)) {
46 1
            $this->releaseJobsThatHaveBeenReservedTooLong($queue);
47
        }
48
49
        if ($job = $this->getNextAvailableJob($queue)) {
50
            $this->markJobAsReserved($job->id);
51
            $bucket = $this->table;
52
            return new DatabaseJob(
53
                $this->container, $this, $job->$bucket, $queue
54
            );
55
        }
56
57
        return null;
58
    }
59
60
    /**
61
     * {@inheritdoc}
62
     */
63 1
    protected function releaseJobsThatHaveBeenReservedTooLong($queue)
64
    {
65 1
        $bucket = $this->table;
66 1
        $expired = Carbon::now()->subSeconds($this->expire)->getTimestamp();
67
68 1
        $first = $this->database->table($this->table)
69 1
            ->where('queue', $this->getQueue($queue))
70 1
            ->where('reserved', 1)
71 1
            ->where('reserved_at', '<=', $expired)
72 1
            ->first(['*', 'meta().id']);
73 1
        $attempts = 1;
74 1
        $identifier = null;
75
        if ($first) {
76
            $attempts = (isset($first->$bucket->attempts)) ? $first->$bucket->attempts : 1;
77 1
            $identifier = $first->id;
78 1
        }
79 1
        if (is_null($identifier)) {
80 1
            $identifier = $this->uniqueKey([
81 1
                'queue'    => $this->getQueue($queue),
82 1
                'attempts' => $attempts,
83
                'id'       => $this->incrementKey(),
84
            ]);
85
        }
86
        $this->database->table($this->table)
87
            ->where('queue', $this->getQueue($queue))
88
            ->where('reserved', 1)
89
            ->where('reserved_at', '<=', $expired)
90
            ->key($identifier)
91
            ->update([
92
                'reserved'    => 0,
93
                'reserved_at' => null,
94
                'attempts'    => $attempts,
95
            ]);
96
    }
97
98
    /**
99
     * {@inheritdoc}
100
     */
101
    protected function getNextAvailableJob($queue)
102
    {
103
        $job = $this->database->table($this->table)
104
            ->where('queue', $this->getQueue($queue))
105
            ->where('reserved', 0)
106
            ->where('available_at', '<=', $this->getTime())
107
            ->orderBy('id', 'asc')
108
            ->first(['*', 'meta().id']);
109
110
        return $job ? (object)$job : null;
111
    }
112
113
    /**
114
     * {@inheritdoc}
115
     */
116
    protected function markJobAsReserved($id)
117
    {
118
        $bucket = $this->table;
119
        /** @var \CouchbaseBucket $openBucket */
120
        $openBucket = $this->database->openBucket($bucket);
121
        // lock bucket
122
        $meta = $openBucket->getAndLock($id, 10);
123
        $meta->value->reserved = 1;
124
        $meta->value->reserved_at = $this->getTime();
125
        $openBucket->replace($id, $meta->value, ['cas' => $meta->cas]);
126
127
        return $meta->value;
128
    }
129
130
    /**
131
     * {@inheritdoc}
132
     */
133
    public function bulk($jobs, $data = '', $queue = null)
134
    {
135
        $queue = $this->getQueue($queue);
136
137
        $availableAt = $this->getAvailableAt(0);
138
139
        $records = array_map(function ($job) use ($queue, $data, $availableAt) {
140
            return $this->buildDatabaseRecord(
141
                $queue, $this->createPayload($job, $data), $availableAt
142
            );
143
        }, (array)$jobs);
144
        foreach ($records as $record) {
145
            $increment = $this->incrementKey();
146
            $record['id'] = $increment;
147
            $this->database->table($this->table)
148
                ->key($this->uniqueKey($record))->insert($record);
149
        }
150
    }
151
152
    /**
153
     * {@inheritdoc}
154
     */
155
    public function deleteReserved($queue, $id)
156
    {
157
        $this->database->table($this->table)->where('id', $id)->delete();
158
    }
159
160
    /**
161
     * {@inheritdoc}
162
     */
163
    protected function pushToDatabase($delay, $queue, $payload, $attempts = 0)
164
    {
165
        $attributes = $this->buildDatabaseRecord(
166
            $this->getQueue($queue), $payload, $this->getAvailableAt($delay), $attempts
167
        );
168
        $increment = $this->incrementKey();
169
        $attributes['id'] = $increment;
170
        $result = $this->database->table($this->table)
171
            ->key($this->uniqueKey($attributes))->insert($attributes);
172
        if ($result) {
173
            return $increment;
174
        }
175
176
        return false;
177
    }
178
179
    /**
180
     * generate increment key
181
     *
182
     * @param int $initial
183
     *
184
     * @return int
185
     */
186
    protected function incrementKey($initial = 1)
187
    {
188
        $result = $this->database->openBucket($this->table)
189
            ->counter($this->identifier(), $initial, ['initial' => abs($initial)]);
190
191
        return $result->value;
192
    }
193
194
    /**
195
     * @param array $attributes
196
     *
197
     * @return string
198
     */
199
    protected function uniqueKey(array $attributes)
200
    {
201
        $array = array_only($attributes, ['queue', 'attempts', 'id']);
202
203
        return implode(':', $array);
204
    }
205
206
    /**
207
     * @return string
208
     */
209
    protected function identifier()
210
    {
211
        return __CLASS__ . ':sequence';
212
    }
213
}
214