Channel::__construct()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 1
eloc 4
c 1
b 0
f 1
nc 1
nop 1
dl 0
loc 6
rs 10
1
<?php
2
/**
3
 * Channel piping
4
 * User: moyo
5
 * Date: 15/09/2017
6
 * Time: 2:24 PM
7
 */
8
9
namespace Carno\Channel;
10
11
use Carno\Channel\Exception\ChannelClosingException;
12
use Carno\Coroutine\Context;
13
use Carno\Promise\Promise;
14
use Carno\Promise\Promised;
15
use SplQueue;
16
17
class Channel implements Chan
18
{
19
    /**
20
     * @var int
21
     */
22
    private $cap = 1;
23
24
    /**
25
     * @var SplQueue
26
     */
27
    private $qData = null;
28
29
    /**
30
     * @var SplQueue
31
     */
32
    private $qSend = null;
33
34
    /**
35
     * @var SplQueue
36
     */
37
    private $qRecv = null;
38
39
    /**
40
     * @var bool
41
     */
42
    private $closing = false;
43
44
    /**
45
     * @var Promised
46
     */
47
    private $closed = null;
48
49
    /**
50
     * Channel constructor.
51
     * @param int $cap
52
     */
53
    public function __construct(int $cap = 1)
54
    {
55
        $this->cap = $cap;
56
        $this->qData = new SplQueue();
57
        $this->qSend = new SplQueue();
58
        $this->qRecv = new SplQueue();
59
    }
60
61
    /**
62
     * @return int
63
     */
64
    public function cap() : int
65
    {
66
        return $this->cap;
67
    }
68
69
    /**
70
     * @param mixed $data
71
     * @param Context $ctx
72
     * @return Promised
73
     */
74
    public function send($data = null, Context $ctx = null) : Promised
75
    {
76
        if ($this->closing) {
77
            throw new ChannelClosingException();
78
        }
79
80
        /**
81
         * @var Promised $wait
82
         */
83
84
        if ($this->qData->count() < $this->cap) {
85
            if ($this->qRecv->count() > 0) {
86
                $wait = $this->qRecv->dequeue();
87
                $wait->resolve($data, $ctx);
88
            } else {
89
                $this->qData->enqueue([$data, $ctx]);
90
            }
91
92
            return Promise::resolved();
93
        }
94
95
        $this->qData->enqueue([$data, $ctx]);
96
        $this->qSend->enqueue($block = Promise::deferred());
97
98
        return $block;
99
    }
100
101
    /**
102
     * @return Promised
103
     */
104
    public function recv() : Promised
105
    {
106
        if ($this->closing) {
107
            throw new ChannelClosingException();
108
        }
109
110
        /**
111
         * @var Promised $wait
112
         */
113
114
        if ($this->qData->count() > 0) {
115
            if ($this->qSend->count() > 0) {
116
                $wait = $this->qSend->dequeue();
117
                $wait->resolve();
118
            }
119
120
            return Promise::resolved(...$this->qData->dequeue());
121
        }
122
123
        $this->qRecv->enqueue($block = Promise::deferred());
124
125
        return $block;
126
    }
127
128
    /**
129
     * @return void
130
     */
131
    public function close() : void
132
    {
133
        if ($this->closing) {
134
            return;
135
        }
136
137
        $this->closing = true;
138
139
        /**
140
         * @var Promised $send
141
         * @var Promised $recv
142
         */
143
144
        while ($this->qSend->count() > 0) {
145
            $send = $this->qSend->dequeue();
146
            $send->throw(new ChannelClosingException());
147
        }
148
149
        while ($this->qRecv->count() > 0) {
150
            $recv = $this->qRecv->dequeue();
151
            $recv->throw(new ChannelClosingException());
152
        }
153
154
        $this->closed()->resolve();
155
    }
156
157
    /**
158
     * @return Promised
159
     */
160
    public function closed() : Promised
161
    {
162
        return $this->closed ?? $this->closed = Promise::deferred();
163
    }
164
165
    /**
166
     * @return string
167
     */
168
    public function __toString() : string
169
    {
170
        return sprintf('%d|%d|%d', $this->qData->count(), $this->qSend->count(), $this->qRecv->count());
171
    }
172
}
173