1 | <?php |
||
2 | |||
3 | /** |
||
4 | * This file is part of graze/dog-statsd |
||
5 | * |
||
6 | * Copyright (c) 2017 Nature Delivered Ltd. <https://www.graze.com> |
||
7 | * |
||
8 | * For the full copyright and license information, please view the LICENSE |
||
9 | * file that was distributed with this source code. |
||
10 | * |
||
11 | * @license https://github.com/graze/dog-statsd/blob/master/LICENSE.md |
||
12 | * @link https://github.com/graze/dog-statsd |
||
13 | */ |
||
14 | |||
15 | namespace Graze\DogStatsD\Stream; |
||
16 | |||
17 | use Graze\DogStatsD\Exception\ConnectionException; |
||
18 | |||
19 | /** |
||
20 | * StreamWriter will attempt to write a message to a udp socket. |
||
21 | * |
||
22 | * If the connection fails, it will never try and reconnect to prevent application blocking |
||
23 | */ |
||
24 | class StreamWriter implements WriterInterface |
||
25 | { |
||
26 | /** |
||
27 | * Seconds to wait (as a base) for exponential back-off on connection |
||
28 | * |
||
29 | * minDelay = RETRY_INTERVAL * (2 ^ num_failed_attempts) |
||
30 | * |
||
31 | * e.g. |
||
32 | * 0, 0.1 0.2 0.4 0.8 1.6 3.2 6.4 12.8 25.6 51.2 102.4 etc... |
||
33 | */ |
||
34 | const RETRY_INTERVAL = 0.1; |
||
35 | |||
36 | /** |
||
37 | * Maximum length of a string to send |
||
38 | */ |
||
39 | const MAX_SEND_LENGTH = 1024; |
||
40 | |||
41 | const ON_ERROR_ERROR = 'error'; |
||
42 | const ON_ERROR_EXCEPTION = 'exception'; |
||
43 | const ON_ERROR_IGNORE = 'ignore'; |
||
44 | |||
45 | /** @var resource|null */ |
||
46 | protected $socket; |
||
47 | /** @var string */ |
||
48 | private $host; |
||
49 | /** @var int */ |
||
50 | private $port; |
||
51 | /** @var string */ |
||
52 | private $onError; |
||
53 | /** @var float|null */ |
||
54 | private $timeout; |
||
55 | /** @var string */ |
||
56 | private $instance; |
||
57 | /** @var int */ |
||
58 | public $numFails = 0; |
||
59 | /** @var float */ |
||
60 | public $waitTill = 0.0; |
||
61 | |||
62 | /** |
||
63 | * @param string $instance |
||
64 | * @param string $host |
||
65 | * @param int $port |
||
66 | * @param string $onError What to do on connection error |
||
67 | * @param float|null $timeout |
||
68 | */ |
||
69 | 50 | public function __construct( |
|
70 | $instance = 'writer', |
||
71 | $host = '127.0.0.1', |
||
72 | $port = 8125, |
||
73 | $onError = self::ON_ERROR_EXCEPTION, |
||
74 | $timeout = null |
||
75 | ) { |
||
76 | 50 | $this->instance = $instance; |
|
77 | 50 | $this->host = $host; |
|
78 | 50 | $this->port = $port; |
|
79 | 50 | $this->onError = $onError; |
|
80 | 50 | $this->timeout = $timeout; |
|
81 | 50 | } |
|
82 | |||
83 | 7 | public function __destruct() |
|
84 | { |
||
85 | 7 | if ($this->socket && is_resource($this->socket)) { |
|
86 | // the reason for this failing is that it is already closed, so ignore the result and not messing with |
||
87 | // parent classes |
||
88 | 5 | @fclose($this->socket); |
|
0 ignored issues
–
show
|
|||
89 | } |
||
90 | 7 | } |
|
91 | |||
92 | /** |
||
93 | * @param string $message |
||
94 | * |
||
95 | * @return bool |
||
96 | */ |
||
97 | 50 | public function write($message) |
|
98 | { |
||
99 | 50 | $this->ensureConnection(); |
|
100 | 49 | if ($this->socket) { |
|
101 | 46 | $totalLength = strlen($message); |
|
102 | 46 | $retries = 1; |
|
103 | 46 | $response = 0; |
|
0 ignored issues
–
show
|
|||
104 | 46 | for ($written = 0; $written < $totalLength; $written += $response) { |
|
105 | 46 | $response = @fwrite($this->socket, substr($message, $written), static::MAX_SEND_LENGTH); |
|
106 | 46 | if ($response === false) { |
|
107 | 6 | if ($retries-- > 0) { |
|
108 | 6 | $this->socket = $this->connect(); |
|
109 | 6 | $response = 0; |
|
110 | } else { |
||
111 | 6 | return false; |
|
112 | } |
||
113 | } else { |
||
114 | 46 | $retries = 1; |
|
115 | } |
||
116 | } |
||
117 | 46 | return ($written === $totalLength); |
|
118 | } |
||
119 | 3 | return false; |
|
120 | } |
||
121 | |||
122 | /** |
||
123 | * Ensure that we are currently connected to the socket |
||
124 | */ |
||
125 | 50 | protected function ensureConnection() |
|
126 | { |
||
127 | 50 | if ((!$this->socket || !is_resource($this->socket)) && $this->canConnect()) { |
|
128 | 49 | $this->socket = $this->connect(); |
|
129 | } |
||
130 | 49 | } |
|
131 | |||
132 | /** |
||
133 | * @return bool |
||
134 | */ |
||
135 | 50 | protected function canConnect() |
|
136 | { |
||
137 | 50 | return (microtime(true) > $this->waitTill); |
|
138 | } |
||
139 | |||
140 | /** |
||
141 | * Attempt to connect to a stream |
||
142 | * |
||
143 | * @return null|resource |
||
144 | */ |
||
145 | 50 | protected function connect() |
|
146 | { |
||
147 | 50 | $socket = @fsockopen('udp://' . $this->host, $this->port, $errno, $errstr, $this->timeout); |
|
148 | 50 | if ($socket === false) { |
|
149 | 4 | $this->waitTill = microtime(true) + (static::RETRY_INTERVAL * (pow(2, $this->numFails++))); |
|
150 | |||
151 | 4 | switch ($this->onError) { |
|
152 | 4 | case static::ON_ERROR_ERROR: |
|
153 | 1 | trigger_error( |
|
154 | 1 | sprintf('StatsD server connection failed (udp://%s:%d)', $this->host, $this->port), |
|
155 | 1 | E_USER_WARNING |
|
156 | ); |
||
157 | 1 | break; |
|
158 | 3 | case static::ON_ERROR_EXCEPTION: |
|
159 | 4 | throw new ConnectionException($this->instance, '(' . $errno . ') ' . $errstr); |
|
160 | } |
||
161 | } else { |
||
162 | 46 | $this->numFails = 0; |
|
163 | 46 | $this->waitTill = 0.0; |
|
164 | |||
165 | 46 | $sec = (int) $this->timeout; |
|
166 | 46 | $ms = (int) (($this->timeout - $sec) * 1000); |
|
167 | 46 | stream_set_timeout($socket, $sec, $ms); |
|
168 | } |
||
169 | 49 | return $socket; |
|
0 ignored issues
–
show
The expression
return $socket could also return false which is incompatible with the documented return type null|resource . Did you maybe forget to handle an error condition?
If the returned type also contains false, it is an indicator that maybe an error condition leading to the specific return statement remains unhandled. ![]() |
|||
170 | } |
||
171 | } |
||
172 |
If you suppress an error, we recommend checking for the error condition explicitly: