Completed
Push — master ( 702453...1286b5 )
by James
01:47
created

Channel   A

Complexity

Total Complexity 12

Size/Duplication

Total Lines 133
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 1

Importance

Changes 0
Metric Value
wmc 12
lcom 1
cbo 1
dl 0
loc 133
rs 10
c 0
b 0
f 0

6 Methods

Rating   Name   Duplication   Size   Complexity  
A create() 0 4 1
A read() 0 8 2
A write() 0 11 3
A close() 0 15 3
A isOpen() 0 4 1
A __construct() 0 16 2
1
<?php
2
3
declare (strict_types = 1); // @codeCoverageIgnore
4
5
namespace Recoil\Channel;
6
7
use Generator;
8
use Recoil\Channel\Exception\ChannelClosedException;
9
use Recoil\Kernel\Strand;
10
use Recoil\Recoil;
11
use SplQueue;
12
13
/**
14
 * An unbuffered, bidirectional data channel.
15
 *
16
 * Unbuffered data channels will block any call to write() until the value has
17
 * been read.
18
 */
19
final class Channel implements ReadableChannel, WritableChannel
20
{
21
    public static function create() : self
22
    {
23
        return new self();
24
    }
25
26
    /**
27
     * Read a value from this channel.
28
     *
29
     * @recoil-coroutine
30
     *
31
     * Execution of the current strand is suspended until a value becomes
32
     * available.
33
     *
34
     * @return mixed                  The value read from the channel.
1 ignored issue
show
Documentation introduced by James Harris
Consider making the return type a bit more specific; maybe use Generator.

This check looks for the generic type array as a return type and suggests a more specific type. This type is inferred from the actual code.

Loading history...
35
     * @throws ChannelClosedException The channel has been closed.
36
     */
37
    public function read() : Generator
38
    {
39
        if (!$this->isOpen) {
40
            throw ChannelClosedException::channelClosed();
41
        }
42
43
        return yield Recoil::suspend($this->suspendReader);
44
    }
45
46
    /**
47
     * Write a value to this channel.
48
     *
49
     * @recoil-coroutine
50
     *
51
     * Execution of the current strand is suspended until the value is read
52
     * from the channel.
53
     *
54
     * @param mixed $value The value to write to the channel.
55
     *
56
     * @throws ChannelClosedException The channel has been closed.
57
     */
58
    public function write($value) : Generator
59
    {
60
        if (!$this->isOpen) {
61
            throw ChannelClosedException::channelClosed();
62
        } elseif (++$this->size > 0) {
63
            $reader = yield Recoil::suspend($this->suspendWriter);
64
            $reader->send($value);
65
        } else {
66
            $this->strands->dequeue()->send($value);
67
        }
68
    }
69
70
    /**
71
     * Close this channel.
72
     *
73
     * @recoil-coroutine
74
     */
75
    public function close() : Generator
76
    {
77
        if ($this->isOpen) {
78
            $this->isOpen = false;
79
            $this->size = 0;
80
            $exception = ChannelClosedException::channelClosed();
81
82
            foreach ($this->strands as $strand) {
83
                $strand->throw($exception);
84
            }
85
        }
86
87
        return;
88
        yield; // @codeCoverageIgnore
1 ignored issue
show
Unused Code introduced by James Harris
yield; does not seem to be reachable.

This check looks for unreachable code. It uses sophisticated control flow analysis techniques to find statements which will never be executed.

Unreachable code is most often the result of return, die or exit statements that have been added for debug purposes.

function fx() {
    try {
        doSomething();
        return true;
    }
    catch (\Exception $e) {
        return false;
    }

    return false;
}

In the above example, the last return false will never be executed, because a return statement has already been met in every possible execution path.

Loading history...
89
    }
90
91
    /**
92
     * Check if this channel is open.
93
     */
94
    public function isOpen() : bool
95
    {
96
        return $this->isOpen;
97
    }
98
99
    /**
100
     * Please note that this code is not part of the public API. It may be
101
     * changed or removed at any time without notice.
102
     *
103
     * @access private
104
     *
105
     * This constructor is public because the `Exception` class does not allow
106
     * subclasses to have private or protected constructors.
107
     */
108
    public function __construct()
109
    {
110
        $this->strands = new SplQueue();
111
112
        $this->suspendReader = function ($strand) {
0 ignored issues
show
Documentation Bug introduced by James Harris
It seems like function ($strand) { ...>send($strand); } } of type object<Closure> is incompatible with the declared type object<Recoil\Channel\Closure> of property $suspendReader.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
113
            if (--$this->size < 0) {
114
                $this->strands->enqueue($strand);
115
            } else {
116
                $this->strands->dequeue()->send($strand);
117
            }
118
        };
119
120
        $this->suspendWriter = function ($strand) {
0 ignored issues
show
Documentation Bug introduced by James Harris
It seems like function ($strand) { ...ds->enqueue($strand); } of type object<Closure> is incompatible with the declared type object<Recoil\Channel\Closure> of property $suspendWriter.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
121
            $this->strands->enqueue($strand);
122
        };
123
    }
124
125
    /**
126
     * @var SplQueue<Strand> The queue of strands waiting to read or write.
127
     */
128
    private $strands;
129
130
    /**
131
     * @var Closure Callback used when suspending a reading strand.
132
     */
133
    private $suspendReader;
134
135
    /**
136
     * @var Closure Callback used when suspending a writing strand.
137
     */
138
    private $suspendWriter;
139
140
    /**
141
     * @var int The "size" of the channel. This number will be positive when
142
     *          $this->strands containers writers, and negative when it contains
143
     *          readers.
144
     */
145
    private $size = 0;
146
147
    /**
148
     * @var bool True if the channel is open.
149
     */
150
    private $isOpen = true;
151
}
152