Job::__construct()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 19
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 1
eloc 9
c 1
b 0
f 1
nc 1
nop 3
dl 0
loc 19
rs 9.9666
1
<?php
2
/**
3
 * Job core
4
 * User: moyo
5
 * Date: 01/08/2017
6
 * Time: 2:54 PM
7
 */
8
9
namespace Carno\Coroutine;
10
11
use Carno\Coroutine\Exception\InterruptException;
12
use Carno\Coroutine\Exception\RejectedException;
13
use Carno\Coroutine\Job\IDGen;
14
use Carno\Coroutine\Job\Priority;
15
use Carno\Coroutine\Job\Signal;
16
use Carno\Promise\Promise;
17
use Carno\Promise\Promised;
18
use Generator;
19
use SplStack;
20
use Throwable;
21
22
class Job
23
{
24
    /**
25
     * @var int
26
     */
27
    private $p = 0;
28
29
    /**
30
     * @var int
31
     */
32
    private $id = 0;
33
34
    /**
35
     * @var Generator
36
     */
37
    private $co = null;
38
39
    /**
40
     * @var Context
41
     */
42
    private $ctx = null;
43
44
    /**
45
     * @var SplStack
46
     */
47
    private $chain = null;
48
49
    /**
50
     * ROLL means current co finished (e.g. function returned)
51
     * @var Promised[]
52
     */
53
    private $rollers = null;
54
55
    /**
56
     * END means all co in job are finished
57
     * @var Promised
58
     */
59
    private $ender = null;
60
61
    /**
62
     * @var bool
63
     */
64
    private $started = false;
65
66
    /**
67
     * @var mixed
68
     */
69
    private $stage = null;
70
71
    /**
72
     * @var Promised
73
     */
74
    private $sleep = null;
75
76
    /**
77
     * @var int
78
     */
79
    private $signal = null;
80
81
    /**
82
     * @var mixed
83
     */
84
    private $result = null;
85
86
    /**
87
     * Job constructor.
88
     * @param Generator $co
89
     * @param Context $ctx
90
     * @param int $priority
91
     */
92
    public function __construct(
93
        Generator $co,
94
        Context $ctx = null,
95
        int $priority = Priority::MEDIUM
96
    ) {
97
        Stats::created();
98
99
        $this->p = $priority;
100
        $this->id = IDGen::next();
101
        $this->co = $co;
102
        $this->ctx = $ctx ?? new Context();
103
104
        $this->chain = new SplStack();
105
106
        ($this->ender = Promise::deferred())->catch(function (...$args) {
107
            $this->killed(...$args);
0 ignored issues
show
Bug introduced by
$args is expanded, but the parameter $e of Carno\Coroutine\Job::killed() does not expect variable arguments. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

107
            $this->killed(/** @scrutinizer ignore-type */ ...$args);
Loading history...
108
        });
109
110
        $this->run();
111
    }
112
113
    /**
114
     */
115
    public function __destruct()
116
    {
117
        Stats::finished();
118
    }
119
120
    /**
121
     * @return int
122
     */
123
    public function id() : int
124
    {
125
        return $this->id;
126
    }
127
128
    /**
129
     * @return Context
130
     */
131
    public function ctx() : Context
132
    {
133
        return $this->ctx;
134
    }
135
136
    /**
137
     * @return int
138
     */
139
    public function priority() : int
140
    {
141
        return $this->p;
142
    }
143
144
    /**
145
     * @return int
146
     */
147
    public function exec() : int
148
    {
149
        // check if signal is presented
150
        $this->signal && $this->stage = $this->result;
151
152
        // co interaction
153
        try {
154
            if ($this->started) {
155
                if ($this->stage instanceof Throwable) {
156
                    $this->stage = $this->co->throw($this->stage);
0 ignored issues
show
Bug introduced by
The method throw() does not exist on Generator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

156
                    /** @scrutinizer ignore-call */ 
157
                    $this->stage = $this->co->throw($this->stage);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
157
                } else {
158
                    $this->stage = $this->co->send($this->stage);
159
                }
160
            } else {
161
                $this->started = true;
162
                $this->stage = $this->co->current();
163
            }
164
        } catch (Throwable $e) {
165
            $this->stage = $e;
166
        }
167
168
        // state switcher
169
        if ($this->stage instanceof Generator) {
170
            // jump next co
171
            $this->chain->push($this->co);
172
            $this->co = $this->stage;
173
            $this->started = false;
174
            // job switch
175
            return Signal::ROLL;
176
        }
177
178
        // valid checker
179
        if ($this->co->valid()) {
180
            // sleeping ?
181
            if ($this->stage instanceof Promised) {
182
                // wait promise
183
                return $this->sleep($this->stage);
184
            } elseif ($this->stage instanceof Syscall) {
185
                // exec syscall
186
                $this->stage = $this->stage->exec($this);
187
            }
188
            // keep running
189
            return Signal::KEEP;
190
        }
191
192
        // co rolling for some features such as "defer"
193
        $this->rolling();
194
195
        // check if co/chain is finished
196
        if ($this->chain->isEmpty()) {
197
            // result detector
198
            $this->result = $this->stage instanceof Throwable ? $this->stage : $this->co->getReturn();
199
            // job done
200
            return $this->signal ?? Signal::FIN;
201
        }
202
203
        // trying to get returned value if no exception happens
204
        $this->stage instanceof Throwable || $this->stage = $this->co->getReturn();
205
206
        // jump prev co
207
        $this->co = $this->chain->pop();
208
209
        // sleeping ?
210
        if ($this->stage instanceof Promised) {
211
            // wait promise
212
            return $this->sleep($this->stage);
213
        }
214
215
        // job switch
216
        return Signal::ROLL;
217
    }
218
219
    /**
220
     * @param Promised $await
221
     * @return int
222
     */
223
    private function sleep(Promised $await) : int
224
    {
225
        $this->sleep = $await;
226
227
        $await->then(function ($r = null) {
228
            $this->wakeup($r);
229
        }, function (Throwable $e = null) {
230
            $this->wakeup($e ?? new RejectedException());
231
        });
232
233
        return Signal::SLEEP;
234
    }
235
236
    /**
237
     * @param $result
238
     */
239
    public function wakeup($result) : void
240
    {
241
        $this->sleep = null;
242
        $this->stage = $result;
243
        $this->run();
244
    }
245
246
    /**
247
     * @return mixed
248
     */
249
    public function stage()
250
    {
251
        return $this->stage;
252
    }
253
254
    /**
255
     * @return mixed
256
     */
257
    public function result()
258
    {
259
        return $this->result;
260
    }
261
262
    /**
263
     * @return bool
264
     */
265
    public function failure() : bool
266
    {
267
        return $this->result instanceof Throwable;
268
    }
269
270
    /**
271
     * @param Throwable $e
272
     */
273
    public function killed(Throwable $e = null) : void
274
    {
275
        $this->signal = Signal::KILL;
276
        $this->result = $e = $e ?? new InterruptException();
277
        $this->sleep instanceof Promised ? $this->sleep->throw($e) : $this->run();
0 ignored issues
show
introduced by
$this->sleep is always a sub-type of Carno\Promise\Promised.
Loading history...
278
    }
279
280
    /**
281
     * @return void
282
     */
283
    public function rolling() : void
284
    {
285
        if (isset($this->rollers[$idx = $this->chain->count()])) {
286
            $this->rollers[$idx]->resolve($this->stage());
287
            unset($this->rollers[$idx]);
288
        }
289
    }
290
291
    /**
292
     * @return Promised
293
     */
294
    public function roll() : Promised
295
    {
296
        return $this->rollers[$idx = $this->chain->count()] ?? $this->rollers[$idx] = Promise::deferred();
297
    }
298
299
    /**
300
     * @return Promised
301
     */
302
    public function end() : Promised
303
    {
304
        return $this->ender;
305
    }
306
307
    /**
308
     * @return void
309
     */
310
    private function run() : void
311
    {
312
        Worker::process($this);
313
    }
314
}
315