Passed
Pull Request — main (#42)
by Luke
09:39
created

BatchQueue   A

Complexity

Total Complexity 16

Size/Duplication

Total Lines 147
Duplicated Lines 0 %

Test Coverage

Coverage 87.23%

Importance

Changes 5
Bugs 2 Features 0
Metric Value
wmc 16
eloc 48
c 5
b 2
f 0
dl 0
loc 147
ccs 41
cts 47
cp 0.8723
rs 10

11 Methods

Rating   Name   Duplication   Size   Complexity  
A pushRaw() 0 3 1
A release() 0 9 2
A marshalJob() 0 12 1
A getBatchDisplayName() 0 7 4
A getJobById() 0 17 2
A __construct() 0 11 1
A bulk() 0 4 1
A later() 0 3 1
A pushToBatch() 0 14 1
A push() 0 4 1
A pop() 0 4 1
1
<?php
2
3
/**
4
 * Laravel Queue for AWS Batch.
5
 *
6
 * @author    Luke Waite <[email protected]>
7
 * @copyright 2017 Luke Waite
8
 * @license   http://www.opensource.org/licenses/mit-license.php MIT
9
 *
10
 * @link      https://github.com/lukewaite/laravel-queue-aws-batch
11
 */
12
13
namespace LukeWaite\LaravelQueueAwsBatch\Queues;
14
15
use Aws\Batch\BatchClient;
16
use Illuminate\Database\Connection;
17
use Illuminate\Queue\DatabaseQueue;
18
use Illuminate\Queue\Jobs\DatabaseJobRecord;
19
use LukeWaite\LaravelQueueAwsBatch\Exceptions\JobNotFoundException;
20
use LukeWaite\LaravelQueueAwsBatch\Exceptions\UnsupportedException;
21
use LukeWaite\LaravelQueueAwsBatch\Jobs\BatchJob;
22
23
class BatchQueue extends DatabaseQueue
24
{
25
    /**
26
     * The AWS Batch client.
27
     *
28
     * @var BatchClient
29
     */
30
    protected $batch;
31
32
    protected $jobDefinition;
33
34 21
    public function __construct(
35
        Connection $database,
36
        $table,
37
        $default,
38
        $expire,
39
        $jobDefinition,
40
        BatchClient $batch
41
    ) {
42 21
        $this->jobDefinition = $jobDefinition;
43 21
        $this->batch = $batch;
44 21
        parent::__construct($database, $table, $default, $expire);
45
    }
46
47 6
    public function push($job, $data = '', $queue = null)
48
    {
49 6
        $payload = $this->createPayload($job, $data);
50 6
        return $this->pushToBatch($queue, $payload, $this->getBatchDisplayName($job));
51
    }
52
53
    public function pushRaw($payload, $queue = null, array $options = [])
54
    {
55
        return $this->pushToBatch($queue, $payload, 'raw-job');
56
    }
57
58
    /**
59
     * Get the display name for the given job.
60
     *
61
     * @param  mixed  $job
62
     * @return string
63
     */
64 6
    protected function getBatchDisplayName($job)
65
    {
66 6
        if (is_object($job)) {
67 6
            return method_exists($job, 'displayName')
68 6
                ? $job->displayName() : str_replace('\\', '_', (string)get_class($job));
69
        } else {
70
            return is_string($job) ? explode('@', $job)[0] : null;
71
        }
72
    }
73
74
    /**
75
     * Push a raw payload to the database, then to AWS Batch, with a given delay.
76
     *
77
     * @param string|null $queue
78
     * @param string      $payload
79
     * @param string      $jobName
80
     *
81
     * @return int
82
     */
83 6
    protected function pushToBatch($queue, $payload, $jobName)
84
    {
85 6
        $jobId = $this->pushToDatabase($queue, $payload);
86
87 6
        $this->batch->submitJob([
88 6
            'jobDefinition' => $this->jobDefinition,
89
            'jobName'       => $jobName,
90 6
            'jobQueue'      => $this->getQueue($queue),
91
            'parameters'    => [
92
                'jobId' => $jobId,
93
            ]
94
        ]);
95
96 6
        return $jobId;
97
    }
98
99 3
    public function getJobById($id)
100
    {
101 3
        $this->database->beginTransaction();
102
103 3
        $job = $this->database->table($this->table)
104 3
            ->lockForUpdate()
105 3
            ->where('id', $id)
106 3
            ->first();
107
108
109 3
        if (!isset($job)) {
110
            throw new JobNotFoundException('Could not find the job');
111
        }
112
113 3
        $job = new DatabaseJobRecord($job);
114
115 3
        return $this->marshalJob($job->queue, $job);
116
    }
117
118 3
    protected function marshalJob($queue, $job)
119
    {
120 3
        $job = $this->markJobAsReserved($job);
121
122 3
        $this->database->commit();
123
124 3
        return new BatchJob(
125 3
            $this->container,
126
            $this,
127
            $job,
0 ignored issues
show
Bug introduced by
$job of type Illuminate\Queue\Jobs\DatabaseJobRecord is incompatible with the type stdClass expected by parameter $job of LukeWaite\LaravelQueueAw...BatchJob::__construct(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

127
            /** @scrutinizer ignore-type */ $job,
Loading history...
128 3
            $this->connectionName,
129
            $queue
130
        );
131
    }
132
133
    /**
134
     * Release the job, without deleting first from the Queue
135
     *
136
     * @param string $queue
137
     * @param \StdClass $job
138
     * @param int $delay
139
     *
140
     * @return int
141
     * @throws UnsupportedException
142
     */
143 6
    public function release($queue, $job, $delay)
144
    {
145 6
        if ($delay != 0) {
146 3
            throw new UnsupportedException('The BatchJob does not support releasing back onto the queue with a delay');
147
        }
148
149 3
        return $this->database->table($this->table)->where('id', $job->id)->update([
150 3
            'attempts'    => $job->attempts,
151
            'reserved_at' => null
152
        ]);
153
    }
154
155 3
    public function pop($queue = null)
156
    {
157 3
        throw new UnsupportedException('The BatchQueue does not support running via a regular worker. ' .
158
            'Instead, you should use the queue:batch-work command with a job id.');
159
    }
160
161
    public function bulk($jobs, $data = '', $queue = null)
162
    {
163
        // This could be implemented, but it's not in first pass.
164
        throw new UnsupportedException('The BatchQueue does not currently support the bulk() operation.');
165
    }
166
167 3
    public function later($delay, $job, $data = '', $queue = null)
168
    {
169 3
        throw new UnsupportedException('The BatchQueue does not support the later() operation.');
170
    }
171
}
172