Completed
Push — master ( 70b352...622f18 )
by Harry
02:46
created

StreamWriter::write()   B

Complexity

Conditions 5
Paths 5

Size

Total Lines 21
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 16
CRAP Score 5.005

Importance

Changes 0
Metric Value
dl 0
loc 21
ccs 16
cts 17
cp 0.9412
rs 8.7624
c 0
b 0
f 0
cc 5
eloc 15
nc 5
nop 1
crap 5.005
1
<?php
2
/**
3
 * This file is part of graze/dog-statsd
4
 *
5
 * Copyright (c) 2017 Nature Delivered Ltd. <https://www.graze.com>
6
 *
7
 * For the full copyright and license information, please view the LICENSE
8
 * file that was distributed with this source code.
9
 *
10
 * @license https://github.com/graze/dog-statsd/blob/master/LICENSE.md
11
 * @link    https://github.com/graze/dog-statsd
12
 */
13
14
namespace Graze\DogStatsD\Stream;
15
16
use Graze\DogStatsD\Exception\ConnectionException;
17
18
/**
19
 * StreamWriter will attempt to write a message to a udp socket.
20
 *
21
 * If the connection fails, it will never try and reconnect to prevent application blocking
22
 */
23
class StreamWriter implements WriterInterface
24
{
25
    /**
26
     * Seconds to wait (as a base) for exponential back-off on connection
27
     *
28
     * minDelay = RETRY_INTERVAL * (2 ^ num_failed_attempts)
29
     *
30
     * e.g.
31
     * 0, 0.1 0.2 0.4 0.8 1.6 3.2 6.4 12.8 25.6 51.2 102.4 etc...
32
     */
33
    const RETRY_INTERVAL = 0.1;
34
35
    /**
36
     * Maximum length of a string to send
37
     */
38
    const MAX_SEND_LENGTH = 1024;
39
40
    const ON_ERROR_ERROR     = 'error';
41
    const ON_ERROR_EXCEPTION = 'exception';
42
    const ON_ERROR_IGNORE    = 'ignore';
43
44
    /** @var resource|null */
45
    protected $socket;
46
    /** @var string */
47
    private $host;
48
    /** @var int */
49
    private $port;
50
    /** @var string */
51
    private $onError;
52
    /** @var float|null */
53
    private $timeout;
54
    /** @var string */
55
    private $instance;
56
    /** @var int */
57
    private $numFails = 0;
58
    /** @var float */
59
    private $waitTill = 0.0;
60
61
    /**
62
     * @param string     $instance
63
     * @param string     $host
64
     * @param int        $port
65
     * @param string     $onError What to do on connection error
66
     * @param float|null $timeout
67
     */
68 41
    public function __construct(
69
        $instance = 'writer',
70
        $host = '127.0.0.1',
71
        $port = 8125,
72
        $onError = self::ON_ERROR_EXCEPTION,
73
        $timeout = null
74
    ) {
75 41
        $this->instance = $instance;
76 41
        $this->host = $host;
77 41
        $this->port = $port;
78 41
        $this->onError = $onError;
79 41
        $this->timeout = $timeout;
80 41
    }
81
82 4
    public function __destruct()
83
    {
84 4
        if ($this->socket) {
85
            // the reason for this failing is that it is already closed, so ignore the result and not messing with
86
            // parent classes
87 3
            @fclose($this->socket);
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
88 3
        }
89 4
    }
90
91
    /**
92
     * @param string $message
93
     *
94
     * @return bool
95
     */
96 41
    public function write($message)
97
    {
98 41
        $this->ensureConnection();
99 40
        if ($this->socket) {
100 37
            $totalLength = strlen($message);
101 37
            $retries = 1;
102 37
            for ($written = 0; $written < $totalLength; $written += $response) {
103 37
                $response = @fwrite($this->socket, substr($message, $written), static::MAX_SEND_LENGTH);
104 37
                if ($response === false) {
105 1
                    if ($retries-- > 0) {
106 1
                        $this->socket = $this->connect();
107 1
                        $response = 0;
108 1
                    } else {
109
                        return false;
110
                    }
111 1
                }
112 37
            }
113 37
            return ($written === $totalLength);
114
        }
115 3
        return false;
116
    }
117
118
    /**
119
     * Ensure that we are currently connected to the socket
120
     */
121 41
    protected function ensureConnection()
122
    {
123 41
        if ((!$this->socket) && ($this->canConnect())) {
124 40
            $this->socket = $this->connect();
125 39
        }
126 40
    }
127
128
    /**
129
     * @return bool
130
     */
131 41
    protected function canConnect()
132
    {
133 41
        return (microtime(true) > $this->waitTill);
134
    }
135
136
    /**
137
     * Attempt to connect to a stream
138
     *
139
     * @return null|resource
140
     */
141 41
    protected function connect()
142
    {
143 41
        $socket = @fsockopen('udp://' . $this->host, $this->port, $errno, $errstr, $this->timeout);
144 41
        if ($socket === false) {
145 4
            $this->waitTill = microtime(true) + (static::RETRY_INTERVAL * (pow(2, $this->numFails++)));
146
147 4
            switch ($this->onError) {
148 4
                case static::ON_ERROR_ERROR:
149 1
                    trigger_error(
150 1
                        sprintf('StatsD server connection failed (udp://%s:%d)', $this->host, $this->port),
151
                        E_USER_WARNING
152 1
                    );
153 1
                    break;
154 3
                case static::ON_ERROR_EXCEPTION:
155 1
                    throw new ConnectionException($this->instance, '(' . $errno . ') ' . $errstr);
156 3
            }
157 3
        } else {
158 37
            $this->numFails = 0;
159 37
            $this->waitTill = 0.0;
160
161 37
            $sec = (int) $this->timeout;
162 37
            $ms = (int) (($this->timeout - $sec) * 1000);
163 37
            stream_set_timeout($socket, $sec, $ms);
164
        }
165 40
        return $socket;
166
    }
167
}
168