CouchbaseQueue::marshalJob()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 4
nc 1
nop 2
dl 0
loc 8
ccs 4
cts 4
cp 1
crap 1
rs 9.4285
c 0
b 0
f 0
1
<?php
2
declare(strict_types=1);
3
4
/**
5
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
6
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
7
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
8
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
9
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
10
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
11
 * THE SOFTWARE.
12
 */
13
14
namespace Ytake\LaravelCouchbase\Queue;
15
16
use Illuminate\Queue\DatabaseQueue;
17
use Illuminate\Database\Query\Builder;
18
use Illuminate\Queue\Jobs\DatabaseJob;
19
use Illuminate\Queue\Jobs\DatabaseJobRecord;
20
use Ytake\LaravelCouchbase\Database\CouchbaseConnection;
21
22
/**
23
 * Class CouchbaseQueue
24
 *
25
 * @author Yuuki Takezawa<[email protected]>
26
 */
27
class CouchbaseQueue extends DatabaseQueue
28
{
29
    /**
30
     * The couchbase bucket that holds the jobs.
31
     *
32
     * @var string
33
     */
34
    protected $table;
35
36
    /** @var CouchbaseConnection */
37
    protected $database;
38
39
    /**
40
     * {@inheritdoc}
41
     */
42 2
    public function pop($queue = null)
43
    {
44 2
        $queue = $this->getQueue($queue);
45 2
        if ($job = $this->getNextAvailableJob($queue)) {
46 2
            return $this->marshalJob($queue, $job);
47
        }
48
49 2
        return null;
50
    }
51
52
    /**
53
     * {@inheritdoc}
54
     */
55 2
    protected function marshalJob($queue, $job)
56
    {
57 2
        $job = $this->markJobAsReserved($job);
58
59 2
        return new DatabaseJob(
60 2
            $this->container, $this, $job, $this->connectionName, $queue
61
        );
62
    }
63
64
    /**
65
     * {@inheritdoc}
66
     */
67 2
    protected function getNextAvailableJob($queue)
68
    {
69 2
        $job = $this->database->table($this->table)
70 2
            ->where('queue', $this->getQueue($queue))
71 2
            ->where(function (Builder $query) {
72 2
                $this->isAvailable($query);
73 2
                $this->isReservedButExpired($query);
74 2
            })
75 2
            ->orderBy('id', 'asc')
76 2
            ->first(['*', 'meta().id']);
77
78 2
        return $job ? new DatabaseJobRecord((object)$job) : null;
79
    }
80
81
    /**
82
     * {@inheritdoc}
83
     */
84 2
    protected function markJobAsReserved($job)
85
    {
86 2
        $bucket = $this->table;
87
        /** @var \Couchbase\Bucket $openBucket */
88 2
        $openBucket = $this->database->openBucket($bucket);
89
        // lock bucket
90 2
        $meta = $openBucket->getAndLock($job->id, 10);
91 2
        $meta->value->attempts = $job->$bucket->attempts + 1;
92 2
        $meta->value->reserved_at = $job->touch();
93 2
        $openBucket->replace($job->id, $meta->value, ['cas' => $meta->cas]);
94
95 2
        return $meta->value;
96
    }
97
98
    /**
99
     * {@inheritdoc}
100
     */
101 2
    public function bulk($jobs, $data = '', $queue = null)
102
    {
103 2
        foreach ((array)$jobs as $job) {
104 2
            $this->push($job, $data, $queue);
105
        }
106 2
    }
107
108
    /**
109
     * {@inheritdoc}
110
     */
111 2
    public function deleteReserved($queue, $id)
112
    {
113 2
        $this->database->table($this->table)->where('id', $id)->delete();
114 2
    }
115
116
    /**
117
     * {@inheritdoc}
118
     */
119 2
    protected function pushToDatabase($queue, $payload, $delay = 0, $attempts = 0)
120
    {
121 2
        $attributes = $this->buildDatabaseRecord(
122 2
            $this->getQueue($queue), $payload, $this->availableAt($delay), $attempts
123
        );
124 2
        $increment = $this->incrementKey();
125 2
        $attributes['id'] = $increment;
126 2
        $result = $this->database->table($this->table)
127 2
            ->key($this->uniqueKey($attributes))->insert($attributes);
128 2
        if ($result) {
129 2
            return $increment;
130
        }
131
132
        return false;
0 ignored issues
show
Bug Best Practice introduced by
The return type of return false; (false) is incompatible with the return type of the parent method Illuminate\Queue\DatabaseQueue::pushToDatabase of type integer.

If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.

Let’s take a look at an example:

class Author {
    private $name;

    public function __construct($name) {
        $this->name = $name;
    }

    public function getName() {
        return $this->name;
    }
}

abstract class Post {
    public function getAuthor() {
        return 'Johannes';
    }
}

class BlogPost extends Post {
    public function getAuthor() {
        return new Author('Johannes');
    }
}

class ForumPost extends Post { /* ... */ }

function my_function(Post $post) {
    echo strtoupper($post->getAuthor());
}

Our function my_function expects a Post object, and outputs the author of the post. The base class Post returns a simple string and outputting a simple string will work just fine. However, the child class BlogPost which is a sub-type of Post instead decided to return an object, and is therefore violating the SOLID principles. If a BlogPost were passed to my_function, PHP would not complain, but ultimately fail when executing the strtoupper call in its body.

Loading history...
133
    }
134
135
    /**
136
     * generate increment key
137
     *
138
     * @param int $initial
139
     *
140
     * @return int
141
     */
142 2
    protected function incrementKey($initial = 1)
143
    {
144 2
        $result = $this->database->openBucket($this->table)
145 2
            ->counter($this->identifier(), $initial, ['initial' => abs($initial)]);
146
147 2
        return $result->value;
148
    }
149
150
    /**
151
     * @param array $attributes
152
     *
153
     * @return string
154
     */
155 2
    protected function uniqueKey(array $attributes): string
156
    {
157 2
        $array = array_only($attributes, ['queue', 'attempts', 'id']);
158
159 2
        return implode(':', $array);
160
    }
161
162
    /**
163
     * @return string
164
     */
165 2
    protected function identifier(): string
166
    {
167 2
        return __CLASS__ . ':sequence';
168
    }
169
}
170