1 | <?php |
||||
2 | /** |
||||
3 | * Output to prometheus |
||||
4 | * User: moyo |
||||
5 | * Date: 08/01/2018 |
||||
6 | * Time: 4:34 PM |
||||
7 | */ |
||||
8 | |||||
9 | namespace Carno\Monitor\Output; |
||||
10 | |||||
11 | use function Carno\Coroutine\co; |
||||
12 | use Carno\HTTP\Client; |
||||
13 | use Carno\HTTP\Server; |
||||
14 | use Carno\HTTP\Server\Connection; |
||||
15 | use Carno\HTTP\Standard\Response; |
||||
16 | use Carno\Monitor\Collector\Aggregator; |
||||
17 | use Carno\Monitor\Contracts\Metrical; |
||||
18 | use Carno\Net\Address; |
||||
19 | use Carno\Net\Contracts\HTTP; |
||||
20 | use Carno\Timer\Timer; |
||||
21 | use Generator; |
||||
22 | use Throwable; |
||||
23 | |||||
24 | class Prometheus |
||||
25 | { |
||||
26 | // active push interval in milliseconds |
||||
27 | private const ACT_PUSH_INV = 5000; |
||||
28 | |||||
29 | /** |
||||
30 | * @var HTTP |
||||
31 | */ |
||||
32 | private $httpd = null; |
||||
33 | |||||
34 | /** |
||||
35 | * @var string |
||||
36 | */ |
||||
37 | private $pushd = null; |
||||
38 | |||||
39 | /** |
||||
40 | * @var string |
||||
41 | */ |
||||
42 | private $gate = null; |
||||
43 | |||||
44 | /** |
||||
45 | * @var string |
||||
46 | */ |
||||
47 | private $host = 'localhost'; |
||||
48 | |||||
49 | /** |
||||
50 | * @var string |
||||
51 | */ |
||||
52 | private $app = 'app'; |
||||
53 | |||||
54 | /** |
||||
55 | * @var Aggregator |
||||
56 | */ |
||||
57 | private $agg = null; |
||||
58 | |||||
59 | /** |
||||
60 | * Prometheus constructor. |
||||
61 | * @param string $host |
||||
62 | * @param string $app |
||||
63 | * @param Aggregator $agg |
||||
64 | */ |
||||
65 | public function __construct(string $host, string $app, Aggregator $agg) |
||||
66 | { |
||||
67 | $this->host = $host; |
||||
68 | $this->app = $app; |
||||
69 | $this->agg = $agg; |
||||
70 | } |
||||
71 | |||||
72 | /** |
||||
73 | * @param Address $listen |
||||
74 | * @param Address $push |
||||
75 | * @return static |
||||
76 | */ |
||||
77 | public function start(Address $listen = null, Address $push = null) : self |
||||
78 | { |
||||
79 | if (is_null($listen)) { |
||||
80 | goto GATEWAY; |
||||
81 | } |
||||
82 | |||||
83 | try { |
||||
84 | ($this->httpd = Server::httpd( |
||||
85 | $listen, |
||||
86 | function (Connection $conn) { |
||||
87 | switch ($conn->request()->getUri()->getPath()) { |
||||
88 | case '/metrics': |
||||
89 | $conn->reply(new Response( |
||||
90 | 200, |
||||
91 | ['Content-Type' => 'text/plain; version=0.0.4'], |
||||
92 | $this->exporting() |
||||
93 | )); |
||||
94 | break; |
||||
95 | default: |
||||
96 | $conn->reply(new Response(404)); |
||||
97 | } |
||||
98 | }, |
||||
99 | 'prom-exporter' |
||||
100 | ))->serve(); |
||||
101 | logger('monitor')->info('Prometheus exporter started', ['app' => $this->app, 'listen' => $listen]); |
||||
102 | } catch (Throwable $e) { |
||||
103 | logger('monitor')->notice( |
||||
104 | 'Prometheus exporter listening failed', |
||||
105 | ['error' => sprintf('%s::%s', get_class($e), $e->getMessage())] |
||||
106 | ); |
||||
107 | } |
||||
108 | |||||
109 | GATEWAY: |
||||
110 | |||||
111 | if (is_null($push)) { |
||||
112 | goto END; |
||||
113 | } |
||||
114 | |||||
115 | $this->pushd = Timer::loop(self::ACT_PUSH_INV, co(function () use ($listen, $push) { |
||||
116 | yield $this->pushing(Client::post( |
||||
0 ignored issues
–
show
Bug
introduced
by
![]() Are you sure the usage of
$this->pushing(Carno\HTT...ep-alive')), 'pushing') targeting Carno\Monitor\Output\Prometheus::pushing() seems to always return null.
This check looks for function or method calls that always return null and whose return value is used. class A
{
function getObject()
{
return null;
}
}
$a = new A();
if ($a->getObject()) {
The method The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes. ![]() |
|||||
117 | $this->gate = sprintf( |
||||
118 | 'http://%s:%d/metrics/job/%s/instance/%s', |
||||
119 | $push->host(), |
||||
120 | $push->port(), |
||||
121 | $this->host, |
||||
122 | $listen ?? 'default' |
||||
123 | ), |
||||
124 | $this->exporting(true), |
||||
125 | ['Connection' => 'keep-alive'] |
||||
126 | ), 'pushing'); |
||||
127 | })); |
||||
128 | |||||
129 | logger('monitor')->info('Prometheus reporter started', ['app' => $this->app, 'gateway' => $push]); |
||||
130 | |||||
131 | END: |
||||
132 | |||||
133 | return $this; |
||||
134 | } |
||||
135 | |||||
136 | /** |
||||
137 | */ |
||||
138 | public function stop() : void |
||||
139 | { |
||||
140 | $this->httpd && $this->httpd->shutdown(); |
||||
141 | $this->pushd && Timer::clear($this->pushd) && co(function () { |
||||
142 | $this->gate && yield $this->pushing(Client::delete($this->gate), 'removing'); |
||||
0 ignored issues
–
show
Carno\HTTP\Client::delete($this->gate) of type Carno\HTTP\Client\Responding is incompatible with the type Generator expected by parameter $caller of Carno\Monitor\Output\Prometheus::pushing() .
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() Are you sure the usage of
$this->pushing(Carno\HTT...his->gate), 'removing') targeting Carno\Monitor\Output\Prometheus::pushing() seems to always return null.
This check looks for function or method calls that always return null and whose return value is used. class A
{
function getObject()
{
return null;
}
}
$a = new A();
if ($a->getObject()) {
The method The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes. ![]() |
|||||
143 | })(); |
||||
144 | } |
||||
145 | |||||
146 | /** |
||||
147 | * @param Generator $caller |
||||
148 | * @param string $action |
||||
149 | */ |
||||
150 | private function pushing(Generator $caller, string $action) |
||||
151 | { |
||||
152 | /** |
||||
153 | * @var Client\Responding $resp |
||||
154 | */ |
||||
155 | try { |
||||
156 | ($resp = yield $caller) && $resp->data(); |
||||
157 | } catch (Throwable $e) { |
||||
158 | logger('monitor')->notice( |
||||
159 | sprintf('Prometheus metrics %s failed', $action), |
||||
160 | ['error' => sprintf('%s::%s', get_class($e), $e->getMessage())] |
||||
161 | ); |
||||
162 | } |
||||
163 | } |
||||
164 | |||||
165 | /** |
||||
166 | * @param bool $push |
||||
167 | * @return string |
||||
168 | */ |
||||
169 | private function exporting(bool $push = false) : string |
||||
170 | { |
||||
171 | $now = $push ? 0 : (int)(microtime(true) * 1000); |
||||
172 | |||||
173 | $lines = $types = []; |
||||
174 | |||||
175 | foreach ($this->agg->metrics() as $typed => $metrics) { |
||||
176 | foreach ($metrics as $named => $groups) { |
||||
177 | foreach ($groups as $grouped => $stack) { |
||||
178 | $named = $this->format($named); |
||||
179 | |||||
180 | list($data, $labels, $description) = $stack; |
||||
181 | |||||
182 | $types[$named] = $typed; |
||||
183 | $description && $lines[] = sprintf('# HELP %s %s', $named, $description); |
||||
184 | |||||
185 | if (in_array($typed, [Metrical::COUNTER, Metrical::GAUGE])) { |
||||
186 | $lines[] = sprintf('%s{%s} %g', $named, $this->labeled($labels), $data['value']) |
||||
187 | . ($now ? sprintf(' %d', $now) : '') |
||||
188 | ; |
||||
189 | } elseif (in_array($typed, [Metrical::HISTOGRAM, Metrical::SUMMARY])) { |
||||
190 | $lines[] = sprintf('%s_sum{%s} %g', $named, $this->labeled($labels), $data['sum']); |
||||
191 | $lines[] = sprintf('%s_count{%s} %d', $named, $this->labeled($labels), $data['count']); |
||||
192 | if ($typed === Metrical::HISTOGRAM) { |
||||
193 | foreach ($data['buckets'] as $bucket) { |
||||
194 | list($bound, $observed) = $bucket; |
||||
195 | $lines[] = sprintf( |
||||
196 | '%s_bucket{%s} %d', |
||||
197 | $named, |
||||
198 | $this->labeled($labels, ['le' => $bound]), |
||||
199 | $observed |
||||
200 | ); |
||||
201 | } |
||||
202 | } elseif ($typed === Metrical::SUMMARY) { |
||||
203 | foreach ($data['quantiles'] as $quantile) { |
||||
204 | list($position, $value) = $quantile; |
||||
205 | $lines[] = sprintf( |
||||
206 | '%s{%s} %g', |
||||
207 | $named, |
||||
208 | $this->labeled($labels, ['quantile' => $position]), |
||||
209 | $value |
||||
210 | ); |
||||
211 | } |
||||
212 | } |
||||
213 | } |
||||
214 | } |
||||
215 | } |
||||
216 | } |
||||
217 | |||||
218 | foreach ($types as $named => $typed) { |
||||
219 | array_unshift($lines, sprintf('# TYPE %s %s', $named, $typed)); |
||||
220 | } |
||||
221 | |||||
222 | $push && $lines[] = ''; |
||||
223 | |||||
224 | return implode("\n", $lines); |
||||
225 | } |
||||
226 | |||||
227 | /** |
||||
228 | * @param array ...$stack |
||||
229 | * @return string |
||||
230 | */ |
||||
231 | private function labeled(array ...$stack) : string |
||||
232 | { |
||||
233 | $merged = ['host' => $this->host, 'app' => $this->app]; |
||||
234 | |||||
235 | foreach ($stack as $labels) { |
||||
236 | $merged = array_merge($merged, $labels); |
||||
237 | } |
||||
238 | |||||
239 | $labeling = []; |
||||
240 | |||||
241 | array_walk($merged, function (string $v, string $k) use (&$labeling) { |
||||
242 | $labeling[] = sprintf('%s="%s"', $k, $v); |
||||
243 | }); |
||||
244 | |||||
245 | return implode(',', $labeling); |
||||
246 | } |
||||
247 | |||||
248 | /** |
||||
249 | * @param string $name |
||||
250 | * @return string |
||||
251 | */ |
||||
252 | private function format(string $name) : string |
||||
253 | { |
||||
254 | return str_replace(['-', '.'], '_', $name); |
||||
255 | } |
||||
256 | } |
||||
257 |