Passed
Push — master ( 1d1c44...e0e6c6 )
by Roman
02:44
created

JobMonitor::events()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 7
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 7
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 5
nc 1
nop 0
1
<?php
2
/**
3
 * @link https://github.com/zhuravljov/yii2-queue-monitor
4
 * @copyright Copyright (c) 2017 Roman Zhuravlev
5
 * @license http://opensource.org/licenses/BSD-3-Clause
6
 */
7
8
namespace zhuravljov\yii\queue\monitor;
9
10
use Yii;
11
use yii\base\Behavior;
12
use yii\base\InvalidConfigException;
13
use yii\queue\ErrorEvent;
14
use yii\queue\ExecEvent;
15
use yii\queue\JobEvent;
16
use yii\queue\PushEvent;
17
use yii\queue\Queue;
18
use zhuravljov\yii\queue\monitor\records\ExecRecord;
19
use zhuravljov\yii\queue\monitor\records\PushRecord;
20
use zhuravljov\yii\queue\monitor\records\WorkerRecord;
21
22
/**
23
 * Queue Job Monitor
24
 *
25
 * @author Roman Zhuravlev <[email protected]>
26
 */
27
class JobMonitor extends Behavior
28
{
29
    /**
30
     * @var Queue
31
     * @inheritdoc
32
     */
33
    public $owner;
34
    /**
35
     * @var Env
36
     */
37
    protected $env;
38
39
    /**
40
     * @param Env $env
41
     * @param array $config
42
     */
43
    public function __construct(Env $env, $config = [])
44
    {
45
        $this->env = $env;
46
        parent::__construct($config);
47
    }
48
49
    /**
50
     * @inheritdoc
51
     */
52
    public function events()
53
    {
54
        return [
55
            Queue::EVENT_AFTER_PUSH => 'afterPush',
56
            Queue::EVENT_BEFORE_EXEC => 'beforeExec',
57
            Queue::EVENT_AFTER_EXEC => 'afterExec',
58
            Queue::EVENT_AFTER_ERROR => 'afterError',
59
        ];
60
    }
61
62
    /**
63
     * @param PushEvent $event
64
     */
65
    public function afterPush(PushEvent $event)
66
    {
67
        $push = new PushRecord();
68
        $push->sender_name = $this->getSenderName($event);
69
        $push->job_uid = $event->id;
70
        $push->setJob($event->job);
71
        $push->ttr = $event->ttr;
72
        $push->delay = $event->delay;
73
        $push->pushed_at = time();
74
        $push->save(false);
75
    }
76
77
    /**
78
     * @param ExecEvent $event
79
     */
80
    public function beforeExec(ExecEvent $event)
81
    {
82
        $push = $this->getPushRecord($event);
83
        if (!$push) {
84
            return;
85
        }
86
        if ($push->isStopped()) {
87
            // Rejects job execution in case is stopped
88
            $event->handled = true;
89
            return;
90
        }
91
        $this->env->db->transaction(function () use ($event, $push) {
92
            $worker = $this->getWorkerRecord($event);
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $worker is correct as $this->getWorkerRecord($event) targeting zhuravljov\yii\queue\mon...itor::getWorkerRecord() seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
93
94
            $exec = new ExecRecord();
95
            $exec->push_id = $push->id;
96
            if ($worker) {
97
                $exec->worker_id = $worker->id;
98
            }
99
            $exec->attempt = $event->attempt;
100
            $exec->reserved_at = time();
101
            $exec->save(false);
102
103
            $push->first_exec_id = $push->first_exec_id ?: $exec->id;
104
            $push->last_exec_id = $exec->id;
105
            $push->save(false);
106
107
            if ($worker) {
108
                $worker->last_exec_id = $exec->id;
109
                $worker->save(false);
110
            }
111
        });
112
    }
113
114
    /**
115
     * @param ExecEvent $event
116
     */
117
    public function afterExec(ExecEvent $event)
118
    {
119
        $push = $this->getPushRecord($event);
120
        if (!$push) {
121
            return;
122
        }
123
        if ($push->last_exec_id) {
124
            ExecRecord::updateAll([
125
                'done_at' => time(),
126
                'error' => null,
127
                'retry' => false,
128
            ], [
129
                'id' => $push->last_exec_id
130
            ]);
131
        }
132
    }
133
134
    /**
135
     * @param ErrorEvent $event
136
     */
137
    public function afterError(ErrorEvent $event)
138
    {
139
        $push = $this->getPushRecord($event);
140
        if (!$push) {
141
            return;
142
        }
143
        if ($push->isStopped()) {
144
            // Breaks retry in case is stopped
145
            $event->retry = false;
146
        }
147
        if ($push->last_exec_id) {
148
            ExecRecord::updateAll([
149
                'done_at' => time(),
150
                'error' => $event->error,
151
                'retry' => $event->retry,
152
            ], [
153
                'id' => $push->last_exec_id
154
            ]);
155
        }
156
    }
157
158
    /**
159
     * @param JobEvent $event
160
     * @throws
161
     * @return string
162
     */
163
    protected function getSenderName($event)
164
    {
165
        foreach (Yii::$app->getComponents(false) as $id => $component) {
166
            if ($component === $event->sender) {
167
                return $id;
168
            }
169
        }
170
        throw new InvalidConfigException('Queue must be an application component.');
171
    }
172
173
    /**
174
     * @param JobEvent $event
175
     * @return PushRecord
176
     */
177
    protected function getPushRecord(JobEvent $event)
178
    {
179
        if ($event->id !== null) {
180
            return $this->env->db->useMaster(function () use ($event) {
181
                return PushRecord::find()
182
                    ->byJob($this->getSenderName($event), $event->id)
183
                    ->one();
184
            });
185
        } else {
186
            return null;
187
        }
188
    }
189
190
    /**
191
     * @param ExecEvent $event
192
     * @return WorkerRecord|null
193
     */
194
    protected function getWorkerRecord(ExecEvent $event)
195
    {
196
        if ($event->sender->getWorkerPid() === null) {
0 ignored issues
show
introduced by
The condition $event->sender->getWorkerPid() === null can never be false.
Loading history...
Bug introduced by
Are you sure the usage of $event->sender->getWorkerPid() targeting yii\queue\Queue::getWorkerPid() seems to always return null.

This check looks for function or method calls that always return null and whose return value is used.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
if ($a->getObject()) {

The method getObject() can return nothing but null, so it makes no sense to use the return value.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
197
            return null;
198
        }
199
        if (!$this->isWorkerMonitored()) {
200
            return null;
201
        }
202
203
        return $this->env->db->useMaster(function () use ($event) {
204
            return WorkerRecord::find()
205
                ->byPid($event->sender->getWorkerPid())
206
                ->active()
207
                ->one();
208
        });
209
    }
210
211
    /**
212
     * @return bool whether workers are monitored.
213
     */
214
    private function isWorkerMonitored()
215
    {
216
        foreach ($this->owner->getBehaviors() as $behavior) {
217
            if ($behavior instanceof WorkerMonitor) {
218
                return true;
219
            }
220
        }
221
        return false;
222
    }
223
}
224