Worker::__construct()   A
last analyzed

Complexity

Conditions 3
Paths 1

Size

Total Lines 25
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 3
eloc 13
c 1
b 0
f 1
nc 1
nop 3
dl 0
loc 25
rs 9.8333
1
<?php
2
/**
3
 * Channel worker
4
 * User: moyo
5
 * Date: 27/03/2018
6
 * Time: 2:58 PM
7
 */
8
9
namespace Carno\Channel;
10
11
use function Carno\Coroutine\async;
12
use Carno\Coroutine\Context;
13
use Closure;
14
use Throwable;
15
16
class Worker
17
{
18
    /**
19
     * @var Chan
20
     */
21
    private $chan = null;
22
23
    /**
24
     * @var Closure
25
     */
26
    private $program = null;
27
28
    /**
29
     * @var Closure
30
     */
31
    private $failure = null;
32
33
    /**
34
     * @var Closure
35
     */
36
    private $processor = null;
37
38
    /**
39
     * @var Closure
40
     */
41
    private $done = null;
42
43
    /**
44
     * @var Closure
45
     */
46
    private $close = null;
47
48
    /**
49
     * @var bool
50
     */
51
    private $closing = false;
52
53
    /**
54
     * @var int
55
     */
56
    private $running = 0;
57
58
    /**
59
     * Worker constructor.
60
     * @param Chan $chan
61
     * @param Closure $program
62
     * @param Closure $failure
63
     */
64
    public function __construct(Chan $chan, Closure $program, Closure $failure = null)
65
    {
66
        $this->chan = $chan;
67
        $this->program = $program;
68
        $this->failure = $failure;
69
70
        $this->processor = function ($data, Context $ctx = null) {
71
            async($this->program, $ctx ?? new Context(), $data)->then($this->done, $this->done);
72
        };
73
74
        $this->done = function ($e = null) {
75
            $this->running --;
76
77
            if ($this->failure && $e instanceof Throwable) {
78
                ($this->failure)($e);
79
            }
80
81
            $this->execute();
82
        };
83
84
        $this->close = function () {
85
            $this->closing = true;
86
        };
87
88
        $this->execute();
89
    }
90
91
    /**
92
     * @return int
93
     */
94
    public function activated() : int
95
    {
96
        return $this->running;
97
    }
98
99
    /**
100
     */
101
    private function execute() : void
102
    {
103
        for (;;) {
104
            if ($this->closing || $this->running >= $this->chan->cap()) {
105
                return;
106
            }
107
108
            $this->running ++;
109
110
            ($recv = $this->chan->recv())->then($this->processor, $this->close);
111
112
            if ($recv->pended()) {
113
                break;
114
            }
115
        }
116
    }
117
}
118