1
|
|
|
<?php |
2
|
|
|
declare(strict_types=1); |
3
|
|
|
|
4
|
|
|
/** |
5
|
|
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
6
|
|
|
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
7
|
|
|
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
8
|
|
|
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
9
|
|
|
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
10
|
|
|
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
11
|
|
|
* THE SOFTWARE. |
12
|
|
|
*/ |
13
|
|
|
|
14
|
|
|
namespace Ytake\LaravelCouchbase\Queue; |
15
|
|
|
|
16
|
|
|
use Illuminate\Queue\DatabaseQueue; |
17
|
|
|
use Illuminate\Database\Query\Builder; |
18
|
|
|
use Illuminate\Queue\Jobs\DatabaseJob; |
19
|
|
|
use Illuminate\Queue\Jobs\DatabaseJobRecord; |
20
|
|
|
use Ytake\LaravelCouchbase\Database\CouchbaseConnection; |
21
|
|
|
|
22
|
|
|
/** |
23
|
|
|
* Class CouchbaseQueue |
24
|
|
|
* |
25
|
|
|
* @author Yuuki Takezawa<[email protected]> |
26
|
|
|
*/ |
27
|
|
|
class CouchbaseQueue extends DatabaseQueue |
28
|
|
|
{ |
29
|
|
|
/** |
30
|
|
|
* The couchbase bucket that holds the jobs. |
31
|
|
|
* |
32
|
|
|
* @var string |
33
|
|
|
*/ |
34
|
|
|
protected $table; |
35
|
|
|
|
36
|
|
|
/** @var CouchbaseConnection */ |
37
|
|
|
protected $database; |
38
|
|
|
|
39
|
|
|
/** |
40
|
|
|
* {@inheritdoc} |
41
|
|
|
*/ |
42
|
2 |
|
public function pop($queue = null) |
43
|
|
|
{ |
44
|
2 |
|
$queue = $this->getQueue($queue); |
45
|
2 |
|
if ($job = $this->getNextAvailableJob($queue)) { |
46
|
2 |
|
return $this->marshalJob($queue, $job); |
47
|
|
|
} |
48
|
|
|
|
49
|
2 |
|
return null; |
50
|
|
|
} |
51
|
|
|
|
52
|
|
|
/** |
53
|
|
|
* {@inheritdoc} |
54
|
|
|
*/ |
55
|
2 |
|
protected function marshalJob($queue, $job) |
56
|
|
|
{ |
57
|
2 |
|
$job = $this->markJobAsReserved($job); |
58
|
|
|
|
59
|
2 |
|
return new DatabaseJob( |
60
|
2 |
|
$this->container, $this, $job, $this->connectionName, $queue |
61
|
|
|
); |
62
|
|
|
} |
63
|
|
|
|
64
|
|
|
/** |
65
|
|
|
* {@inheritdoc} |
66
|
|
|
*/ |
67
|
2 |
|
protected function getNextAvailableJob($queue) |
68
|
|
|
{ |
69
|
2 |
|
$job = $this->database->table($this->table) |
70
|
2 |
|
->where('queue', $this->getQueue($queue)) |
71
|
2 |
|
->where(function (Builder $query) { |
72
|
2 |
|
$this->isAvailable($query); |
73
|
2 |
|
$this->isReservedButExpired($query); |
74
|
2 |
|
}) |
75
|
2 |
|
->orderBy('id', 'asc') |
76
|
2 |
|
->first(['*', 'meta().id']); |
77
|
|
|
|
78
|
2 |
|
return $job ? new DatabaseJobRecord((object)$job) : null; |
79
|
|
|
} |
80
|
|
|
|
81
|
|
|
/** |
82
|
|
|
* {@inheritdoc} |
83
|
|
|
*/ |
84
|
2 |
|
protected function markJobAsReserved($job) |
85
|
|
|
{ |
86
|
2 |
|
$bucket = $this->table; |
87
|
|
|
/** @var \Couchbase\Bucket $openBucket */ |
88
|
2 |
|
$openBucket = $this->database->openBucket($bucket); |
89
|
|
|
// lock bucket |
90
|
2 |
|
$meta = $openBucket->getAndLock($job->id, 10); |
91
|
2 |
|
$meta->value->attempts = $job->$bucket->attempts + 1; |
92
|
2 |
|
$meta->value->reserved_at = $job->touch(); |
93
|
2 |
|
$openBucket->replace($job->id, $meta->value, ['cas' => $meta->cas]); |
94
|
|
|
|
95
|
2 |
|
return $meta->value; |
96
|
|
|
} |
97
|
|
|
|
98
|
|
|
/** |
99
|
|
|
* {@inheritdoc} |
100
|
|
|
*/ |
101
|
2 |
|
public function bulk($jobs, $data = '', $queue = null) |
102
|
|
|
{ |
103
|
2 |
|
foreach ((array)$jobs as $job) { |
104
|
2 |
|
$this->push($job, $data, $queue); |
105
|
|
|
} |
106
|
2 |
|
} |
107
|
|
|
|
108
|
|
|
/** |
109
|
|
|
* {@inheritdoc} |
110
|
|
|
*/ |
111
|
2 |
|
public function deleteReserved($queue, $id) |
112
|
|
|
{ |
113
|
2 |
|
$this->database->table($this->table)->where('id', $id)->delete(); |
114
|
2 |
|
} |
115
|
|
|
|
116
|
|
|
/** |
117
|
|
|
* {@inheritdoc} |
118
|
|
|
*/ |
119
|
2 |
|
protected function pushToDatabase($queue, $payload, $delay = 0, $attempts = 0) |
120
|
|
|
{ |
121
|
2 |
|
$attributes = $this->buildDatabaseRecord( |
122
|
2 |
|
$this->getQueue($queue), $payload, $this->availableAt($delay), $attempts |
123
|
|
|
); |
124
|
2 |
|
$increment = $this->incrementKey(); |
125
|
2 |
|
$attributes['id'] = $increment; |
126
|
2 |
|
$result = $this->database->table($this->table) |
127
|
2 |
|
->key($this->uniqueKey($attributes))->insert($attributes); |
128
|
2 |
|
if ($result) { |
129
|
2 |
|
return $increment; |
130
|
|
|
} |
131
|
|
|
|
132
|
|
|
return false; |
|
|
|
|
133
|
|
|
} |
134
|
|
|
|
135
|
|
|
/** |
136
|
|
|
* generate increment key |
137
|
|
|
* |
138
|
|
|
* @param int $initial |
139
|
|
|
* |
140
|
|
|
* @return int |
141
|
|
|
*/ |
142
|
2 |
|
protected function incrementKey($initial = 1) |
143
|
|
|
{ |
144
|
2 |
|
$result = $this->database->openBucket($this->table) |
145
|
2 |
|
->counter($this->identifier(), $initial, ['initial' => abs($initial)]); |
146
|
|
|
|
147
|
2 |
|
return $result->value; |
148
|
|
|
} |
149
|
|
|
|
150
|
|
|
/** |
151
|
|
|
* @param array $attributes |
152
|
|
|
* |
153
|
|
|
* @return string |
154
|
|
|
*/ |
155
|
2 |
|
protected function uniqueKey(array $attributes): string |
156
|
|
|
{ |
157
|
2 |
|
$array = array_only($attributes, ['queue', 'attempts', 'id']); |
158
|
|
|
|
159
|
2 |
|
return implode(':', $array); |
160
|
|
|
} |
161
|
|
|
|
162
|
|
|
/** |
163
|
|
|
* @return string |
164
|
|
|
*/ |
165
|
2 |
|
protected function identifier(): string |
166
|
|
|
{ |
167
|
2 |
|
return __CLASS__ . ':sequence'; |
168
|
|
|
} |
169
|
|
|
} |
170
|
|
|
|
If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.
Let’s take a look at an example:
Our function
my_function
expects aPost
object, and outputs the author of the post. The base classPost
returns a simple string and outputting a simple string will work just fine. However, the child classBlogPost
which is a sub-type ofPost
instead decided to return anobject
, and is therefore violating the SOLID principles. If aBlogPost
were passed tomy_function
, PHP would not complain, but ultimately fail when executing thestrtoupper
call in its body.