Completed
Push — master ( 4b8e25...c2d0d8 )
by Boris
03:11 queued 01:33
created

Client::readVersionAndType()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 6
ccs 0
cts 4
cp 0
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 4
nc 1
nop 0
crap 2
1
<?php
2
namespace Ekho\Logstash\Lumberjack;
3
4
class Client
5
{
6
    const SEQUENCE_MAX = 4294967295;
7
8
    /** @var int */
9
    private $sequence = 0;
10
11
    /** @var int */
12
    private $lastAck = 0;
13
14
    /** @var int */
15
    private $windowSize;
16
17
    /** @var SocketInterface */
18
    private $socket;
19
20
    /** @var EncoderInterface */
21
    private $encoder;
22
23
    /**
24
     * @param SocketInterface $socket
25
     * @param EncoderInterface $encoder
26
     * @param int $windowSize
27
     */
28 1
    public function __construct(SocketInterface $socket, EncoderInterface $encoder, $windowSize = 5000)
29
    {
30 1
        $this->socket = $socket;
31 1
        $this->encoder = $encoder;
32 1
        $this->setWindowSize($windowSize);
33 1
    }
34
35
    /**
36
     * @param array $hash
37
     * @return int
38
     */
39 1
    public function write(array $hash)
40
    {
41 1
        $frame = $this->encoder->toCompressedFrame($hash, $this->nextSequence());
42 1
        if ($this->unackedSequenceSize() >= $this->windowSize) {
43
            $this->ack();
44
        }
45 1
        return $this->socket->write($frame);
46
    }
47
48
    /**
49
     * @param int $windowSize
50
     */
51 1
    private function setWindowSize($windowSize)
52
    {
53 1
        $this->windowSize = $windowSize;
54 1
        $buffer = pack('AAN', "1", "W", $windowSize);
55 1
        $this->socket->write($buffer);
56 1
    }
57
58
    /**
59
     * @return int
60
     */
61 1
    private function nextSequence()
62
    {
63 1
        if ($this->sequence + 1 > self::SEQUENCE_MAX) {
64
            $this->sequence = 0;
65
        }
66
67 1
        return ++$this->sequence;
68
    }
69
70
    /**
71
     * @throws \RuntimeException
72
     */
73
    private function ack()
74
    {
75
        list(, $type) = $this->readVersionAndType();
76
        if ($type != 'A') {
77
            throw new \RuntimeException(sprintf("Whoa we shouldn't get this frame: %s", var_export($type, true)));
78
        }
79
        $this->lastAck = $this->readLastAck();
80
        if ($this->unackedSequenceSize() >= $this->windowSize) {
81
            $this->ack();
82
        }
83
    }
84
85
    /**
86
     * @return int
87
     */
88 1
    private function unackedSequenceSize()
89
    {
90 1
        return $this->sequence - ($this->lastAck + 1);
91
    }
92
93
    /**
94
     * @return array
95
     */
96
    private function readVersionAndType()
97
    {
98
        $version = $this->socket->read(1);
99
        $type = $this->socket->read(1);
100
        return array($version, $type);
101
    }
102
103
    /**
104
     * @return int
105
     */
106
    private function readLastAck()
107
    {
108
        $unpacked = unpack('N', $this->socket->read(4));
109
        return reset($unpacked);
110
    }
111
}
112