1 | <?php |
||||
2 | /** |
||||
3 | * Commands kit |
||||
4 | * User: moyo |
||||
5 | * Date: 07/08/2017 |
||||
6 | * Time: 6:18 PM |
||||
7 | */ |
||||
8 | |||||
9 | namespace Carno\Coroutine; |
||||
10 | |||||
11 | use Carno\Coroutine\Exception\TimeoutException; |
||||
12 | use Carno\Coroutine\Job\Creator; |
||||
13 | use Carno\Promise\Promise; |
||||
14 | use Carno\Promise\Promised; |
||||
15 | use Carno\Timer\Timer; |
||||
16 | use Closure; |
||||
17 | use Throwable; |
||||
18 | |||||
19 | /** |
||||
20 | * @param $ms |
||||
21 | * @param string $ec |
||||
22 | * @param string $em |
||||
23 | * @return Promised |
||||
24 | */ |
||||
25 | function timeout( |
||||
26 | int $ms, |
||||
27 | string $ec = TimeoutException::class, |
||||
28 | string $em = '' |
||||
29 | ) : Promised { |
||||
30 | $racer = Promise::deferred(); |
||||
31 | if ($ms > 0) { |
||||
32 | $timer = Timer::after($ms, static function () use ($racer, $ec, $em) { |
||||
33 | $racer->throw(new $ec($em)); |
||||
34 | }); |
||||
35 | $racer->catch(static function () use ($timer) { |
||||
36 | Timer::clear($timer); |
||||
37 | }); |
||||
38 | } |
||||
39 | return $racer; |
||||
40 | } |
||||
41 | |||||
42 | /** |
||||
43 | * sleep [and do something] (optional) |
||||
44 | * @param int $ms |
||||
45 | * @param Closure $do |
||||
46 | * @return Promised |
||||
47 | */ |
||||
48 | function msleep( |
||||
49 | int $ms, |
||||
50 | Closure $do = null |
||||
51 | ) : Promised { |
||||
52 | return new Promise(static function (Promised $promised) use ($ms, $do) { |
||||
53 | $tick = Timer::after($ms, static function () use ($promised, $do) { |
||||
54 | $promised->resolve($do ? $do() : null); |
||||
55 | }); |
||||
56 | $promised->catch(static function () use ($tick) { |
||||
57 | Timer::clear($tick); |
||||
58 | }); |
||||
59 | }); |
||||
60 | } |
||||
61 | |||||
62 | /** |
||||
63 | * "GO" new program without results but throws exception if error |
||||
64 | * @param mixed $program |
||||
65 | * @param Context $ctx |
||||
66 | */ |
||||
67 | function go($program, Context $ctx = null) : void |
||||
68 | { |
||||
69 | async($program, $ctx)->fusion(); |
||||
70 | } |
||||
71 | |||||
72 | /** |
||||
73 | * similar with "GO" but returns "CLOSURE" |
||||
74 | * @param mixed $program |
||||
75 | * @param Context $ctx |
||||
76 | * @return Closure |
||||
77 | */ |
||||
78 | function co($program, Context $ctx = null) : Closure |
||||
79 | { |
||||
80 | return static function (...$args) use ($program, $ctx) { |
||||
81 | return async($program, $ctx, ...$args)->fusion(); |
||||
82 | }; |
||||
83 | } |
||||
84 | |||||
85 | /** |
||||
86 | * similar with "GO" but returns "PROMISED" and no "THROWS" |
||||
87 | * @param mixed $program |
||||
88 | * @param Context $ctx |
||||
89 | * @param mixed $args |
||||
90 | * @return Promised |
||||
91 | */ |
||||
92 | function async($program, Context $ctx = null, ...$args) : Promised |
||||
93 | { |
||||
94 | return Creator::promised($program, $args, $ctx); |
||||
95 | } |
||||
96 | |||||
97 | /** |
||||
98 | * wait and wake programme |
||||
99 | * @param Closure $dial |
||||
100 | * @param Closure $awake |
||||
101 | * @param int $timeout |
||||
102 | * @param string $error |
||||
103 | * @param string $message |
||||
104 | * @return Promised |
||||
105 | */ |
||||
106 | function await( |
||||
107 | Closure $dial, |
||||
108 | Closure $awake, |
||||
109 | int $timeout = 60000, |
||||
110 | string $error = TimeoutException::class, |
||||
111 | string $message = '' |
||||
112 | ) : Promised { |
||||
113 | return race( |
||||
114 | new Promise(static function (Promised $await) use ($dial, $awake) { |
||||
115 | $dial(static function (...$args) use ($await, $awake) { |
||||
116 | try { |
||||
117 | $await->pended() && $out = $awake(...$args); |
||||
118 | $await->pended() && $await->resolve($out ?? null); |
||||
119 | } catch (Throwable $e) { |
||||
120 | $await->pended() && $await->throw($e); |
||||
121 | } |
||||
122 | }); |
||||
123 | }), |
||||
124 | timeout($timeout, $error, $message) |
||||
125 | ); |
||||
126 | } |
||||
127 | |||||
128 | /** |
||||
129 | * @return Syscall |
||||
130 | */ |
||||
131 | function ctx() : Syscall |
||||
132 | { |
||||
133 | return new Syscall(static function (Job $job) { |
||||
134 | return $job->ctx(); |
||||
135 | }); |
||||
136 | } |
||||
137 | |||||
138 | /** |
||||
139 | * @param Closure $program |
||||
140 | * @return Syscall |
||||
141 | */ |
||||
142 | function defer(Closure $program) : Syscall |
||||
143 | { |
||||
144 | return new Syscall(static function (Job $job) use ($program) { |
||||
145 | $job->roll()->then(static function ($stage) use ($job, $program) { |
||||
146 | co($program, $job->ctx())($stage); |
||||
147 | }); |
||||
148 | }); |
||||
149 | } |
||||
150 | |||||
151 | /** |
||||
152 | * @param mixed ...$programs |
||||
153 | * @return Promised |
||||
154 | */ |
||||
155 | function race(...$programs) : Promised |
||||
156 | { |
||||
157 | return Promise::race(...Creator::promises(...Creator::context($programs))); |
||||
0 ignored issues
–
show
Bug
introduced
by
![]() |
|||||
158 | } |
||||
159 | |||||
160 | /** |
||||
161 | * @param mixed ...$programs |
||||
162 | * @return Promised |
||||
163 | */ |
||||
164 | function all(...$programs) : Promised |
||||
165 | { |
||||
166 | return Promise::all(...Creator::promises(...Creator::context($programs))); |
||||
0 ignored issues
–
show
It seems like
Carno\Coroutine\Job\Creator::context($programs) can also be of type Carno\Coroutine\Context and null ; however, parameter $programs of Carno\Coroutine\Job\Creator::promises() does only seem to accept array , maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
167 | } |
||||
168 |