BatchQueue::__construct()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 11
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 11
ccs 4
cts 4
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 6
crap 1
1
<?php
2
/**
3
 * Laravel Queue for AWS Batch.
4
 *
5
 * @author    Luke Waite <[email protected]>
6
 * @copyright 2017 Luke Waite
7
 * @license   http://www.opensource.org/licenses/mit-license.php MIT
8
 *
9
 * @link      https://github.com/dnxlabs/laravel-queue-aws-batch
10
 */
11
12
namespace DNXLabs\LaravelQueueAwsBatch\Queues;
13
14
use Aws\Batch\BatchClient;
15
use Illuminate\Database\Connection;
16
use Illuminate\Queue\DatabaseQueue;
17
use Illuminate\Queue\Jobs\DatabaseJobRecord;
18
use Illuminate\Support\Str;
19
use DNXLabs\LaravelQueueAwsBatch\Exceptions\JobNotFoundException;
20
use DNXLabs\LaravelQueueAwsBatch\Exceptions\UnsupportedException;
21
use DNXLabs\LaravelQueueAwsBatch\Jobs\BatchJob;
22
23
class BatchQueue extends DatabaseQueue
24
{
25
    /**
26
     * The AWS Batch client.
27
     *
28
     * @var BatchClient
29
     */
30
    protected $batch;
31
32
    protected $jobDefinition;
33 5
34
    public function __construct(
35
        Connection $database,
36
        $table,
37
        $default,
38
        $expire,
39
        $jobDefinition,
40
        BatchClient $batch
41 5
    ) {
42 5
        $this->jobDefinition = $jobDefinition;
43 5
        $this->batch = $batch;
44 5
        parent::__construct($database, $table, $default, $expire);
45
    }
46 2
47
    public function push($job, $data = '', $queue = null)
48 2
    {
49 2
        $queue = $this->getQueue($queue);
50 2
        $payload = $this->createPayload($job, $queue, $data);
51
        return $this->pushToBatch($queue, $payload, $this->getBatchDisplayName($job));
52
    }
53
54
    public function pushRaw($payload, $queue = null, array $options = [])
55
    {
56
        return $this->pushToBatch($queue, $payload, 'raw-job');
57
    }
58
59
    /**
60
     * Get the display name for the given job.
61
     *
62
     * @param  mixed  $job
63
     * @return string
64 2
     */
65
    protected function getBatchDisplayName($job)
66 2
    {
67 1
        if (is_object($job)) {
68 1
            $jobName = method_exists($job, 'displayName')
69
                ? $job->displayName() : str_replace('\\', '_', (string) get_class($job));
70 1
        } else {
71
            $jobName = is_string($job) ? explode('@', $job)[0] : null;
72
        }
73
74
        return Str::limit($jobName, 128); // Limit requested by AWS Batch SubmitJob API
75
    }
76
77
    /**
78
     * Push a raw payload to the database, then to AWS Batch, with a given delay.
79
     *
80
     * @param string|null $queue
81
     * @param string      $payload
82
     * @param string      $jobName
83 2
     *
84
     * @return int
85 2
     */
86
    protected function pushToBatch($queue, $payload, $jobName)
87 2
    {
88 2
        $jobId = $this->pushToDatabase($queue, $payload);
89 2
90 2
        $json = json_decode($payload);
91
        $data = unserialize($json->data->command);
92 2
93
        $this->batch->submitJob([
94
            'jobDefinition' => $this->jobDefinition,
95
            'jobName'       => $jobName,
96 2
            'jobQueue'      => $this->getQueue($queue),
97
            'parameters'    => [
98
                'jobId'        => $jobId,
99
                'connectionId' => $data->connection
100
            ]
101
        ]);
102
103
        return $jobId;
104
    }
105
106
    public function getJobById($id)
107
    {
108
        $this->database->beginTransaction();
109
110
        $job = $this->database->table($this->table)
111
            ->lockForUpdate()
112
            ->where('id', $id)
113
            ->first();
114
115
116
        if (!isset($job)) {
117
            throw new JobNotFoundException('Could not find the job');
118
        }
119
120
        $job = new DatabaseJobRecord($job);
121
122
        return $this->marshalJob($job->queue, $job);
123
    }
124
125
    protected function marshalJob($queue, $job)
126
    {
127
        $job = $this->markJobAsReserved($job);
128
129
        $this->database->commit();
130
131
        return new BatchJob(
132
            $this->container,
133
            $this,
134
            $job,
0 ignored issues
show
Bug introduced by
$job of type Illuminate\Queue\Jobs\DatabaseJobRecord is incompatible with the type stdClass expected by parameter $job of DNXLabs\LaravelQueueAwsB...BatchJob::__construct(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

134
            /** @scrutinizer ignore-type */ $job,
Loading history...
135
            $this->connectionName,
136
            $queue
137
        );
138
    }
139
140
    /**
141
     * Release the job, without deleting first from the Queue
142
     *
143 2
     * @param string $queue
144
     * @param \StdClass $job
145 2
     * @param int $delay
146 1
     *
147
     * @return int
148
     * @throws UnsupportedException
149 1
     */
150 1
    public function release($queue, $job, $delay)
151
    {
152
        if ($delay != 0) {
153
            throw new UnsupportedException('The BatchJob does not support releasing back onto the queue with a delay');
154
        }
155
156
        return $this->database->table($this->table)->where('id', $job->id)->update([
157
            'attempts'    => $job->attempts,
158
            'reserved_at' => null
159
        ]);
160
    }
161
162
    public function pop($queue = null)
163
    {
164
        throw new UnsupportedException('The BatchQueue does not support running via a regular worker. ' .
165
            'Instead, you should use the queue:work-batch command with a job id.');
166
    }
167
168
    public function bulk($jobs, $data = '', $queue = null)
169
    {
170 1
        // This could be implemented, but it's not in first pass.
171
        throw new UnsupportedException('The BatchQueue does not currently support the bulk() operation.');
172 1
    }
173
174
    public function later($delay, $job, $data = '', $queue = null)
175
    {
176
        throw new UnsupportedException('The BatchQueue does not support the later() operation.');
177
    }
178
}
179