Connections::setBusying()   A
last analyzed

Complexity

Conditions 4
Paths 4

Size

Total Lines 11
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 5
nc 4
nop 2
dl 0
loc 11
rs 10
c 0
b 0
f 0
1
<?php
2
/**
3
 * Connections
4
 * User: moyo
5
 * Date: 09/08/2017
6
 * Time: 6:00 PM
7
 */
8
9
namespace Carno\Pool;
10
11
use function Carno\Coroutine\go;
12
use function Carno\Coroutine\race;
13
use function Carno\Coroutine\timeout;
14
use Carno\Pool\Chips\CCPolicy;
15
use Carno\Pool\Exception\SelectWaitOverflowException;
16
use Carno\Pool\Exception\SelectWaitTimeoutException;
17
use Carno\Pool\Features\IdleRecycling;
18
use Carno\Promise\Promise;
19
use Carno\Promise\Promised;
20
use SplQueue;
21
use SplStack;
22
use Throwable;
23
24
class Connections
25
{
26
    use CCPolicy;
0 ignored issues
show
introduced by
The trait Carno\Pool\Chips\CCPolicy requires some properties which are not provided by Carno\Pool\Connections: $scaleFactor, $initial, $overall, $minIdle, $getWaitQMax
Loading history...
27
28
    /**
29
     * @var Pool
30
     */
31
    private $pool = null;
32
33
    /**
34
     * @var Options
35
     */
36
    private $options = null;
37
38
    /**
39
     * @var Connector
40
     */
41
    private $connector = null;
42
43
    /**
44
     * @var IdleRecycling
45
     */
46
    private $recycling = null;
47
48
    /**
49
     * @var int
50
     */
51
    private $connIDA = 0;
52
53
    /**
54
     * @var SplStack
55
     */
56
    private $staIdling = null;
57
58
    /**
59
     * @var Poolable[]
60
     */
61
    private $staBusying = [];
62
63
    /**
64
     * @var int
65
     */
66
    private $rsExpanding = 0;
67
68
    /**
69
     * @var SplQueue
70
     */
71
    private $getWaitQ = null;
72
73
    /**
74
     * @var Promised[]
75
     */
76
    private $liveGetQ = [];
77
78
    /**
79
     * @var bool
80
     */
81
    private $exiting = false;
82
83
    /**
84
     * Connections constructor.
85
     * @param Pool $pool
86
     * @param Options $options
87
     * @param Connector $connector
88
     */
89
    public function __construct(
90
        Pool $pool,
91
        Options $options,
92
        Connector $connector
93
    ) {
94
        $this->pool = $pool;
95
        $this->options = $options;
96
        $this->connector = $connector;
97
98
        $this->staIdling = new SplStack;
99
        $this->getWaitQ = new SplQueue;
100
101
        $this->resizing($this->options->initial, 'initialize');
102
    }
103
104
    /**
105
     */
106
    public function cleanup() : void
107
    {
108
        $this->pool = null;
109
        $this->options = null;
110
111
        $this->getWaitQ = null;
112
113
        $this->connector->cleanup();
114
    }
115
116
    /**
117
     * @param IdleRecycling $recycling
118
     */
119
    public function setIRecycling(IdleRecycling $recycling) : void
120
    {
121
        $this->recycling = $recycling;
122
    }
123
124
    /**
125
     * @return int
126
     */
127
    public function cBusyCount() : int
128
    {
129
        return count($this->staBusying);
130
    }
131
132
    /**
133
     * @return int
134
     */
135
    public function cIdleCount() : int
136
    {
137
        return $this->staIdling->count();
138
    }
139
140
    /**
141
     * @return int
142
     */
143
    public function cWaitCount() : int
144
    {
145
        return $this->getWaitQ ? $this->getWaitQ->count() : 0;
146
    }
147
148
    /**
149
     * @deprecated
150
     * @return Promised|Poolable
151
     */
152
    public function getLived()
153
    {
154
        if (!$this->staIdling->isEmpty()) {
155
            return $this->staIdling->current();
156
        } elseif ($this->staBusying) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->staBusying of type Carno\Pool\Poolable[] is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
157
            return current($this->staBusying);
158
        } else {
159
            return $this->liveGetQ[] = Promise::deferred();
160
        }
161
    }
162
163
    /**
164
     * @return SplStack|Poolable[]
165
     */
166
    public function getIdles() : SplStack
167
    {
168
        return $this->staIdling;
169
    }
170
171
    /**
172
     * @param bool $wait
173
     * @param bool $work
174
     * @return Promised|Poolable
175
     */
176
    public function getIdled(bool $wait = true, bool $work = true)
177
    {
178
        if ($this->staIdling->isEmpty()) {
179
            if ($wait) {
180
                if ($this->getWaitQ->count() > $this->options->getWaitQMax) {
181
                    throw new SelectWaitOverflowException($this->connector->identify());
182
                }
183
184
                $this->getWaitQ->enqueue($waiting = Promise::deferred());
185
186
                $this->ccDecision(
187
                    $this->options,
188
                    $this->connector->identify(),
189
                    $this->cIdleCount(),
190
                    $this->cBusyCount(),
191
                    $this->cWaitCount()
192
                );
193
194
                return race(
195
                    $waiting,
196
                    timeout(
197
                        $this->options->getWaitTimeout,
198
                        SelectWaitTimeoutException::class,
199
                        $this->connector->identify()
200
                    )
201
                );
202
            }
203
204
            return null;
205
        }
206
207
        return $this->setBusying(null, $work);
208
    }
209
210
    /**
211
     * @param Poolable $conn
212
     */
213
    public function putIdled(Poolable $conn) : void
214
    {
215
        /**
216
         * @var Promised $wait
217
         */
218
219
        // checking in liveGetQ
220
        while ($this->liveGetQ && null !== $wait = array_pop($this->liveGetQ)) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->liveGetQ of type Carno\Promise\Promised[] is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
221
            $wait->pended() && $wait->resolve($conn);
222
        }
223
224
        // checking in getWaitQ
225
        while (($this->getWaitQ ?! $this->getWaitQ->isEmpty() : false) && $wait = $this->getWaitQ->dequeue()) {
226
            if ($wait->pended()) {
227
                $wait->resolve($this->setBusying($conn));
228
                return;
229
            }
230
        }
231
232
        // finally set idle
233
        $this->setIdling($conn);
234
235
        // check exiting
236
        $this->exiting && $conn->destroy();
237
    }
238
239
    /**
240
     * @param Poolable $conn
241
     */
242
    public function released(Poolable $conn) : void
243
    {
244
        $hit = false;
245
246
        $cid = $conn->cid();
247
248
        // searching in "busying" stack
249
        if (isset($this->staBusying[$cid])) {
250
            unset($this->staBusying[$cid]);
251
            $this->setClosing($conn);
252
            $hit = true;
253
        }
254
255
        // searching in "idling" stack
256
        $hit || $this->removeIdling($conn);
257
258
        // finally checking conn state
259
        $this->checking();
260
    }
261
262
    /**
263
     * @param bool $forced
264
     * @param string $reason
265
     */
266
    public function exit(bool $forced = true, string $reason = 'exiting') : void
267
    {
268
        $this->exiting = true;
269
        $this->resizing(0, $reason, $forced);
270
        $this->checking();
271
    }
272
273
    /**
274
     * conn sta checking
275
     */
276
    private function checking() : void
277
    {
278
        $cleared = ! ($this->cIdleCount() + $this->cBusyCount());
279
280
        if ($this->exiting) {
281
            if ($this->pool && ($closed = $this->pool->closed())->pended() && $cleared) {
282
                $closed->resolve();
283
            }
284
            return;
285
        }
286
287
        $cleared && $this->resizing(
288
            max(
289
                1,
290
                $this->options->initial,
291
                min($this->getWaitQ->count(), $this->options->maxIdle)
292
            ),
293
            'minimum-scaling'
294
        );
295
    }
296
297
    /**
298
     * @param int $target
299
     * @param string $reason
300
     * @param bool $forced
301
     * @return int
302
     */
303
    public function resizing(int $target, string $reason = 'none', bool $forced = false) : int
304
    {
305
        $busySize = $this->cBusyCount();
306
        $idleSize = $this->cIdleCount();
307
308
        if ($busySize + $idleSize < $target && !$this->rsExpanding) {
309
            logger('pool')->debug(
310
                'Expanding',
311
                [
312
                    'id' => $this->connector->identify(),
313
                    'idle' => $idleSize,
314
                    'busy' => $busySize,
315
                    'target' => $target,
316
                    'reason' => $reason,
317
                ]
318
            );
319
            $this->rsExpanding = $expandSize = $target - $busySize - $idleSize;
320
            while ($expandSize -- > 0) {
321
                go(function () {
322
                    try {
323
                        /**
324
                         * @var Poolable $poolable
325
                         */
326
                        $poolable = yield $this->connector->created();
327
                        yield $poolable->connect();
328
                        $poolable->cid(sprintf('c-%d', ++ $this->connIDA));
329
                        $this->putIdled($poolable);
330
                    } catch (Throwable $e) {
331
                        logger('pool')->warning(
332
                            'Connecting failed',
333
                            [
334
                                'id' => $this->connector->identify(),
335
                                'error' => sprintf('#%d->%s::%s', $e->getCode(), get_class($e), $e->getMessage()),
336
                            ]
337
                        );
338
                    } finally {
339
                        $this->rsExpanding --;
340
                    }
341
                });
342
            }
343
            return $expandSize;
344
        } elseif ($busySize + $idleSize > $target) {
345
            logger('pool')->debug(
346
                'Shrinking',
347
                [
348
                    'id' => $this->connector->identify(),
349
                    'idle' => $idleSize,
350
                    'busy' => $busySize,
351
                    'target' => $target,
352
                    'reason' => $reason,
353
                ]
354
            );
355
            $shrinkSize = ($busySize + $idleSize) - $target;
356
            while ($shrinkSize -- > 0) {
357
                if ($this->staIdling->count() > 0) {
358
                    if ($this->removeIdling(null, function ($_, Poolable $found) use ($forced) {
359
                        if (($this->recycling && $this->recycling->idling($found->cid())) || $forced) {
360
                            return true;
361
                        }
362
                        return false;
363
                    })) {
364
                        // removed ... checking next
365
                        continue;
366
                    } else {
367
                        // give up shrinking if all idles conn not accord
368
                        logger('pool')->debug('Idles conn not according', ['id' => $this->connector->identify()]);
369
                        break;
370
                    }
371
                }
372
                ($closer = Promise::deferred())->then(function (Poolable $conn) {
373
                    $this->setClosing($conn);
374
                });
375
                ($got = array_shift($this->staBusying)) && $got->schedule(Poolable::RELEASED, $closer);
376
            }
377
            return - $shrinkSize;
378
        }
379
380
        return 0;
381
    }
382
383
    /**
384
     * @param Poolable $conn
385
     * @param bool $work
386
     * @return Poolable
387
     */
388
    private function setBusying(Poolable $conn = null, bool $work = true) : Poolable
389
    {
390
        is_null($conn) && $conn = $this->staIdling->shift();
391
392
        $cid = $conn->cid();
393
394
        if ($this->recycling && $work) {
395
            $this->recycling->busying($cid);
396
        }
397
398
        return $this->staBusying[$cid] = $conn;
399
    }
400
401
    /**
402
     * @param Poolable $conn
403
     */
404
    private function setIdling(Poolable $conn) : void
405
    {
406
        $cid = $conn->cid();
407
408
        if (isset($this->staBusying[$cid])) {
409
            unset($this->staBusying[$cid]);
410
        }
411
412
        $this->staIdling->unshift($conn);
413
    }
414
415
    /**
416
     * @param Poolable $conn
417
     */
418
    private function setClosing(Poolable $conn) : void
419
    {
420
        $conn->closed()->pended() && $conn->close();
421
    }
422
423
    /**
424
     * @param Poolable $any
425
     * @param callable $matcher
426
     * @param bool $close
427
     * @return bool
428
     */
429
    private function removeIdling(Poolable $any = null, callable $matcher = null, bool $close = true) : bool
430
    {
431
        if ($this->staIdling->isEmpty()) {
432
            return false;
433
        }
434
435
        if (is_null($matcher)) {
436
            $matcher = static function (Poolable $present, Poolable $found) {
437
                return $present->cid() === $found->cid();
438
            };
439
        }
440
441
        $searched = 0;
442
        $connections = $this->staIdling->count();
443
444
        while ($searched ++ < $connections) {
445
            $conn = $this->staIdling->pop();
446
            if ($matcher($any, $conn)) {
447
                $close && $this->setClosing($conn);
448
                return true;
449
            } else {
450
                $this->staIdling->unshift($conn);
451
            }
452
        }
453
454
        return false;
455
    }
456
}
457