Passed
Pull Request — master (#26)
by Harry
03:06 queued 01:17
created

StreamWriter   A

Complexity

Total Complexity 16

Size/Duplication

Total Lines 143
Duplicated Lines 0 %

Test Coverage

Coverage 98.21%

Importance

Changes 0
Metric Value
dl 0
loc 143
ccs 55
cts 56
cp 0.9821
rs 10
c 0
b 0
f 0
wmc 16

6 Methods

Rating   Name   Duplication   Size   Complexity  
A ensureConnection() 0 4 3
A __destruct() 0 6 2
A canConnect() 0 3 1
A __construct() 0 12 1
A write() 0 20 5
A connect() 0 25 4
1
<?php
2
/**
3
 * This file is part of graze/dog-statsd
4
 *
5
 * Copyright (c) 2017 Nature Delivered Ltd. <https://www.graze.com>
6
 *
7
 * For the full copyright and license information, please view the LICENSE
8
 * file that was distributed with this source code.
9
 *
10
 * @license https://github.com/graze/dog-statsd/blob/master/LICENSE.md
11
 * @link    https://github.com/graze/dog-statsd
12
 */
13
14
namespace Graze\DogStatsD\Stream;
15
16
use Graze\DogStatsD\Exception\ConnectionException;
17
18
/**
19
 * StreamWriter will attempt to write a message to a udp socket.
20
 *
21
 * If the connection fails, it will never try and reconnect to prevent application blocking
22
 */
23
class StreamWriter implements WriterInterface
24
{
25
    /**
26
     * Seconds to wait (as a base) for exponential back-off on connection
27
     *
28
     * minDelay = RETRY_INTERVAL * (2 ^ num_failed_attempts)
29
     *
30
     * e.g.
31
     * 0, 0.1 0.2 0.4 0.8 1.6 3.2 6.4 12.8 25.6 51.2 102.4 etc...
32
     */
33
    const RETRY_INTERVAL = 0.1;
34
35
    /**
36
     * Maximum length of a string to send
37
     */
38
    const MAX_SEND_LENGTH = 1024;
39
40
    const ON_ERROR_ERROR     = 'error';
41
    const ON_ERROR_EXCEPTION = 'exception';
42
    const ON_ERROR_IGNORE    = 'ignore';
43
44
    /** @var resource|null */
45
    protected $socket;
46
    /** @var string */
47
    private $host;
48
    /** @var int */
49
    private $port;
50
    /** @var string */
51
    private $onError;
52
    /** @var float|null */
53
    private $timeout;
54
    /** @var string */
55
    private $instance;
56
    /** @var int */
57
    private $numFails = 0;
58
    /** @var float */
59
    private $waitTill = 0.0;
60
61
    /**
62
     * @param string     $instance
63
     * @param string     $host
64
     * @param int        $port
65
     * @param string     $onError What to do on connection error
66
     * @param float|null $timeout
67
     */
68 45
    public function __construct(
69
        $instance = 'writer',
70
        $host = '127.0.0.1',
71
        $port = 8125,
72
        $onError = self::ON_ERROR_EXCEPTION,
73
        $timeout = null
74
    ) {
75 45
        $this->instance = $instance;
76 45
        $this->host = $host;
77 45
        $this->port = $port;
78 45
        $this->onError = $onError;
79 45
        $this->timeout = $timeout;
80 45
    }
81
82 7
    public function __destruct()
83
    {
84 7
        if ($this->socket) {
85
            // the reason for this failing is that it is already closed, so ignore the result and not messing with
86
            // parent classes
87 6
            @fclose($this->socket);
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition for fclose(). This can introduce security issues, and is generally not recommended. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unhandled  annotation

87
            /** @scrutinizer ignore-unhandled */ @fclose($this->socket);

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
88 6
        }
89 7
    }
90
91
    /**
92
     * @param string $message
93
     *
94
     * @return bool
95
     */
96 45
    public function write($message)
97
    {
98 45
        $this->ensureConnection();
99 44
        if ($this->socket) {
100 41
            $totalLength = strlen($message);
101 41
            $retries = 1;
102 41
            for ($written = 0; $written < $totalLength; $written += $response) {
103 41
                $response = @fwrite($this->socket, substr($message, $written), static::MAX_SEND_LENGTH);
104 41
                if ($response === false) {
105 1
                    if ($retries-- > 0) {
106 1
                        $this->socket = $this->connect();
107 1
                        $response = 0;
108 1
                    } else {
109
                        return false;
110
                    }
111 1
                }
112 41
            }
113 41
            return ($written === $totalLength);
114
        }
115 3
        return false;
116
    }
117
118
    /**
119
     * Ensure that we are currently connected to the socket
120
     */
121 45
    protected function ensureConnection()
122
    {
123 45
        if ((!$this->socket) && ($this->canConnect())) {
124 44
            $this->socket = $this->connect();
125 43
        }
126 44
    }
127
128
    /**
129
     * @return bool
130
     */
131 45
    protected function canConnect()
132
    {
133 45
        return (microtime(true) > $this->waitTill);
134
    }
135
136
    /**
137
     * Attempt to connect to a stream
138
     *
139
     * @return null|resource
140
     */
141 45
    protected function connect()
142
    {
143 45
        $socket = @fsockopen('udp://' . $this->host, $this->port, $errno, $errstr, $this->timeout);
144 45
        if ($socket === false) {
145 4
            $this->waitTill = microtime(true) + (static::RETRY_INTERVAL * (pow(2, $this->numFails++)));
146
147 4
            switch ($this->onError) {
148 4
                case static::ON_ERROR_ERROR:
149 1
                    trigger_error(
150 1
                        sprintf('StatsD server connection failed (udp://%s:%d)', $this->host, $this->port),
151
                        E_USER_WARNING
152 1
                    );
153 1
                    break;
154 3
                case static::ON_ERROR_EXCEPTION:
155 1
                    throw new ConnectionException($this->instance, '(' . $errno . ') ' . $errstr);
156 3
            }
157 3
        } else {
158 41
            $this->numFails = 0;
159 41
            $this->waitTill = 0.0;
160
161 41
            $sec = (int) $this->timeout;
162 41
            $ms = (int) (($this->timeout - $sec) * 1000);
163 41
            stream_set_timeout($socket, $sec, $ms);
164
        }
165 44
        return $socket;
166
    }
167
}
168