Completed
Branch long-message (4d2a8b)
by Harry
12:26 queued 10s
created

StreamWriter::write()   B

Complexity

Conditions 5
Paths 5

Size

Total Lines 21
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 21
rs 8.7624
c 0
b 0
f 0
cc 5
eloc 15
nc 5
nop 1
1
<?php
2
/**
3
 * This file is part of graze/dog-statsd
4
 *
5
 * Copyright (c) 2017 Nature Delivered Ltd. <https://www.graze.com>
6
 *
7
 * For the full copyright and license information, please view the LICENSE
8
 * file that was distributed with this source code.
9
 *
10
 * @license https://github.com/graze/dog-statsd/blob/master/LICENSE.md
11
 * @link    https://github.com/graze/dog-statsd
12
 */
13
14
namespace Graze\DogStatsD\Stream;
15
16
use Graze\DogStatsD\Exception\ConnectionException;
17
18
/**
19
 * StreamWriter will attempt to write a message to a udp socket.
20
 *
21
 * If the connection fails, it will never try and reconnect to prevent application blocking
22
 */
23
class StreamWriter implements WriterInterface
24
{
25
    /**
26
     * Seconds to wait (as a base) for exponential back-off on connection
27
     *
28
     * minDelay = RETRY_INTERVAL * (2 ^ num_failed_attempts)
29
     *
30
     * e.g.
31
     * 0, 0.1 0.2 0.4 0.8 1.6 3.2 6.4 12.8 25.6 51.2 102.4 etc...
32
     */
33
    const RETRY_INTERVAL = 0.1;
34
35
    /**
36
     * Maximum length of a string to send
37
     */
38
    const MAX_SEND_LENGTH = 4096;
39
40
    const ON_ERROR_ERROR     = 'error';
41
    const ON_ERROR_EXCEPTION = 'exception';
42
    const ON_ERROR_IGNORE    = 'ignore';
43
44
    /** @var resource|null */
45
    protected $socket;
46
    /** @var string */
47
    private $host;
48
    /** @var int */
49
    private $port;
50
    /** @var string */
51
    private $onError;
52
    /** @var float|null */
53
    private $timeout;
54
    /** @var string */
55
    private $instance;
56
    /** @var int */
57
    private $numFails = 0;
58
    /** @var float */
59
    private $waitTill = 0.0;
60
61
    /**
62
     * @param string     $instance
63
     * @param string     $host
64
     * @param int        $port
65
     * @param string     $onError What to do on connection error
66
     * @param float|null $timeout
67
     */
68
    public function __construct(
69
        $instance = 'writer',
70
        $host = '127.0.0.1',
71
        $port = 8125,
72
        $onError = self::ON_ERROR_EXCEPTION,
73
        $timeout = null
74
    ) {
75
        $this->instance = $instance;
76
        $this->host = $host;
77
        $this->port = $port;
78
        $this->onError = $onError;
79
        $this->timeout = $timeout;
80
    }
81
82
    public function __destruct()
83
    {
84
        if ($this->socket) {
85
            // the reason for this failing is that it is already closed, so ignore the result and not messing with
86
            // parent classes
87
            @fclose($this->socket);
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
88
        }
89
    }
90
91
    /**
92
     * @param string $message
93
     *
94
     * @return bool
95
     */
96
    public function write($message)
97
    {
98
        $this->ensureConnection();
99
        if ($this->socket) {
100
            $totalLength = strlen($message);
101
            $retries = 1;
102
            for ($written = 0; $written < $totalLength; $written += $response) {
103
                $response = @fwrite($this->socket, substr($message, $written), static::MAX_SEND_LENGTH);
104
                if ($response === false) {
105
                    if ($retries-- > 0) {
106
                        $this->socket = $this->connect();
107
                        $response = 0;
108
                    } else {
109
                        return false;
110
                    }
111
                }
112
            }
113
            return ($written === $totalLength);
114
        }
115
        return false;
116
    }
117
118
    /**
119
     * Ensure that we are currently connected to the socket
120
     */
121
    protected function ensureConnection()
122
    {
123
        if ((!$this->socket) && ($this->canConnect())) {
124
            $this->socket = $this->connect();
125
        }
126
    }
127
128
    /**
129
     * @return bool
130
     */
131
    protected function canConnect()
132
    {
133
        return (microtime(true) > $this->waitTill);
134
    }
135
136
    /**
137
     * Attempt to connect to a stream
138
     *
139
     * @return null|resource
140
     */
141
    protected function connect()
142
    {
143
        $socket = @fsockopen('udp://' . $this->host, $this->port, $errno, $errstr, $this->timeout);
144
        if ($socket === false) {
145
            $this->waitTill = microtime(true) + (static::RETRY_INTERVAL * (pow(2, $this->numFails++)));
146
147
            switch ($this->onError) {
148
                case static::ON_ERROR_ERROR:
149
                    trigger_error(
150
                        sprintf('StatsD server connection failed (udp://%s:%d)', $this->host, $this->port),
151
                        E_USER_WARNING
152
                    );
153
                    break;
154
                case static::ON_ERROR_EXCEPTION:
155
                    throw new ConnectionException($this->instance, '(' . $errno . ') ' . $errstr);
156
            }
157
        } else {
158
            $this->numFails = 0;
159
            $this->waitTill = 0.0;
160
161
            $sec = (int) $this->timeout;
162
            $ms = (int) (($this->timeout - $sec) * 1000);
163
            stream_set_timeout($socket, $sec, $ms);
164
        }
165
        return $socket;
166
    }
167
}
168