AbstractQueue::fetch()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 15
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 15
ccs 0
cts 13
cp 0
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 9
nc 2
nop 0
crap 6
1
<?php
2
3
/**
4
 * @author    Flipbox Factory
5
 * @copyright Copyright (c) 2017, Flipbox Digital
6
 * @link      https://github.com/flipbox/queue/releases/latest
7
 * @license   https://github.com/flipbox/queue/blob/master/LICENSE
8
 */
9
10
namespace flipbox\queue\queues;
11
12
use flipbox\queue\events\Queue as QueueEvent;
13
use flipbox\queue\events\QueueValue as QueueValueEvent;
14
use flipbox\queue\helpers\JobHelper;
15
use flipbox\queue\jobs\JobInterface;
16
use Craft;
17
use yii\base\Component;
18
use yii\base\Exception;
19
use yii\helpers\ArrayHelper;
20
use yii\helpers\Json;
21
22
/**
23
 * @author Flipbox Factory <[email protected]>
24
 * @since 1.0.0
25
 */
26
abstract class AbstractQueue extends Component implements QueueInterface
27
{
28
    /**
29
     * Event executed before a job is posted to the queue.
30
     */
31
    const EVENT_BEFORE_POST = 'beforePost';
32
33
    /**
34
     * Event executed before a job is posted to the queue.
35
     */
36
    const EVENT_AFTER_POST = 'afterPost';
37
38
    /**
39
     * Event executed before a job is being fetched from the queue.
40
     */
41
    const EVENT_BEFORE_FETCH = 'beforeFetch';
42
43
    /**
44
     * Event executed after a job is being fetched from the queue.
45
     */
46
    const EVENT_AFTER_FETCH = 'afterFetch';
47
48
    /**
49
     * Event executed before a job is being deleted from the queue.
50
     */
51
    const EVENT_BEFORE_DELETE = 'beforeDelete';
52
53
    /**
54
     * Event executed after a job is being deleted from the queue.
55
     */
56
    const EVENT_AFTER_DELETE = 'afterDelete';
57
58
    /**
59
     * Event executed before a job is being released from the queue.
60
     */
61
    const EVENT_BEFORE_RELEASE = 'beforeRelease';
62
63
    /**
64
     * Event executed after a job is being released from the queue.
65
     */
66
    const EVENT_AFTER_RELEASE = 'afterRelease';
67
68
    /**
69
     * Event executed before a job is being executed.
70
     */
71
    const EVENT_BEFORE_RUN = 'beforeRun';
72
73
    /**
74
     * Event executed after a job is being executed.
75
     */
76
    const EVENT_AFTER_RUN = 'afterRun';
77
78
    /**
79
     * This will release automatically on execution failure. i.e. when
80
     * the `run` method returns false or catch exception.
81
     *
82
     * @var bool
83
     */
84
    public $releaseOnFailure = true;
85
86
    /**
87
     * Post new job to the queue.  Override this for queue implementation.
88
     *
89
     * @param JobInterface $job
90
     * @param array $options
91
     * @return bool
92
     */
93
    abstract protected function postJob(JobInterface $job, array $options = []): bool;
94
95
    /**
96
     * @inheritdoc
97
     */
98
    public function post(JobInterface $job, array $options = []): bool
99
    {
100
        $this->trigger(
101
            self::EVENT_BEFORE_POST,
102
            $beforeEvent = new QueueEvent(['job' => $job])
103
        );
104
105
        if (!$beforeEvent->isValid) {
106
            return false;
107
        }
108
109
        $return = $this->postJob($job);
110
        if (!$return) {
111
            return false;
112
        }
113
114
        $this->trigger(
115
            self::EVENT_AFTER_POST,
116
            new QueueEvent(['job' => $job])
117
        );
118
        return true;
119
    }
120
121
    /**
122
     * @inheritdoc
123
     */
124
    public function fetch()
125
    {
126
        $this->trigger(self::EVENT_BEFORE_FETCH);
127
128
        $job = $this->fetchJob();
129
        if ($job == false) {
130
            return false;
131
        }
132
133
        $this->trigger(
134
            self::EVENT_AFTER_FETCH,
135
            new QueueEvent(['job' => $job])
136
        );
137
        return $job;
138
    }
139
140
    /**
141
     * Return next job from the queue. Override this for queue implementation.
142
     *
143
     * @return JobInterface|bool
144
     */
145
    abstract protected function fetchJob();
146
147
    /**
148
     * @inheritdoc
149
     */
150
    public function run(JobInterface $job): bool
151
    {
152
        $this->trigger(
153
            self::EVENT_BEFORE_RUN,
154
            $beforeEvent = new QueueEvent(['job' => $job])
155
        );
156
157
        if (!$beforeEvent->isValid) {
158
            return false;
159
        }
160
161
        try {
162
            $value = $job->run();
163
        } catch (\Exception $e) {
164
            $class = get_class($job);
165
166
            Craft::error(
167
                "Fatal Error: Error running job '{$class}'. Id: {$job->getId()} Message: {$e->getMessage()}",
168
                'queue'
169
            );
170
171
            if ($this->releaseOnFailure) {
172
                $this->release($job);
173
            }
174
175
            throw new Exception(
176
                "Error running job '{$class}'. " .
177
                "Message: {$e->getMessage()}. " .
178
                "File: {$e->getFile()}[{$e->getLine()}]. Stack Trace: {$e->getTraceAsString()}",
179
                500
180
            );
181
        }
182
183
        $this->trigger(
184
            self::EVENT_AFTER_RUN,
185
            new QueueValueEvent(['job' => $job, 'value' => $value])
186
        );
187
188
        if ($value === false) {
189
            if ($this->releaseOnFailure) {
190
                $this->release($job);
191
            }
192
193
            return false;
194
        }
195
196
        $this->delete($job);
197
198
        return true;
199
    }
200
201
    /**
202
     * @inheritdoc
203
     */
204
    public function delete(JobInterface $job): bool
205
    {
206
        $this->trigger(
207
            self::EVENT_BEFORE_DELETE,
208
            $beforeEvent = new QueueEvent(['job' => $job])
209
        );
210
        if (!$beforeEvent->isValid) {
211
            return false;
212
        }
213
214
        $return = $this->deleteJob($job);
215
        if (!$return) {
216
            return false;
217
        }
218
219
        $this->trigger(
220
            self::EVENT_AFTER_DELETE,
221
            new QueueEvent(['job' => $job])
222
        );
223
        return true;
224
    }
225
226
    /**
227
     * Delete the job. Override this for the queue implementation.
228
     *
229
     * @param JobInterface $job
230
     * @return bool
231
     */
232
    abstract protected function deleteJob(JobInterface $job): bool;
233
234
    /**
235
     * @inheritdoc
236
     */
237
    public function release(JobInterface $job): bool
238
    {
239
        $this->trigger(
240
            self::EVENT_BEFORE_RELEASE,
241
            $beforeEvent = new QueueEvent(['job' => $job])
242
        );
243
        if (!$beforeEvent->isValid) {
244
            return false;
245
        }
246
247
        $return = $this->releaseJob($job);
248
        if (!$return) {
249
            return false;
250
        }
251
252
        $this->trigger(
253
            self::EVENT_AFTER_RELEASE,
254
            new QueueEvent(['job' => $job])
255
        );
256
        return true;
257
    }
258
259
    /**
260
     * Release the job. Override this for the queue implementation.
261
     *
262
     * @param JobInterface $job
263
     * @return bool
264
     */
265
    abstract protected function releaseJob(JobInterface $job): bool;
266
267
    /**
268
     * @param string $message
269
     * @return JobInterface
270
     * @throws Exception
271
     */
272
    protected function deserialize($message): JobInterface
273
    {
274
        // Deserialize
275
        $jobArray = Json::decode($message);
276
277
        return JobHelper::create($jobArray);
278
    }
279
280
    /**
281
     * @param JobInterface $job The job to serialize.
282
     * @return string JSON string.
283
     */
284
    protected function serialize(JobInterface $job)
285
    {
286
        return Json::encode($job->toConfig());
287
    }
288
}
289