carno-php /
monitor
| 1 | <?php |
||||
| 2 | /** |
||||
| 3 | * Output to prometheus |
||||
| 4 | * User: moyo |
||||
| 5 | * Date: 08/01/2018 |
||||
| 6 | * Time: 4:34 PM |
||||
| 7 | */ |
||||
| 8 | |||||
| 9 | namespace Carno\Monitor\Output; |
||||
| 10 | |||||
| 11 | use function Carno\Coroutine\co; |
||||
| 12 | use Carno\HTTP\Client; |
||||
| 13 | use Carno\HTTP\Server; |
||||
| 14 | use Carno\HTTP\Server\Connection; |
||||
| 15 | use Carno\HTTP\Standard\Response; |
||||
| 16 | use Carno\Monitor\Collector\Aggregator; |
||||
| 17 | use Carno\Monitor\Contracts\Metrical; |
||||
| 18 | use Carno\Net\Address; |
||||
| 19 | use Carno\Net\Contracts\HTTP; |
||||
| 20 | use Carno\Timer\Timer; |
||||
| 21 | use Generator; |
||||
| 22 | use Throwable; |
||||
| 23 | |||||
| 24 | class Prometheus |
||||
| 25 | { |
||||
| 26 | // active push interval in milliseconds |
||||
| 27 | private const ACT_PUSH_INV = 5000; |
||||
| 28 | |||||
| 29 | /** |
||||
| 30 | * @var HTTP |
||||
| 31 | */ |
||||
| 32 | private $httpd = null; |
||||
| 33 | |||||
| 34 | /** |
||||
| 35 | * @var string |
||||
| 36 | */ |
||||
| 37 | private $pushd = null; |
||||
| 38 | |||||
| 39 | /** |
||||
| 40 | * @var string |
||||
| 41 | */ |
||||
| 42 | private $gate = null; |
||||
| 43 | |||||
| 44 | /** |
||||
| 45 | * @var string |
||||
| 46 | */ |
||||
| 47 | private $host = 'localhost'; |
||||
| 48 | |||||
| 49 | /** |
||||
| 50 | * @var string |
||||
| 51 | */ |
||||
| 52 | private $app = 'app'; |
||||
| 53 | |||||
| 54 | /** |
||||
| 55 | * @var Aggregator |
||||
| 56 | */ |
||||
| 57 | private $agg = null; |
||||
| 58 | |||||
| 59 | /** |
||||
| 60 | * Prometheus constructor. |
||||
| 61 | * @param string $host |
||||
| 62 | * @param string $app |
||||
| 63 | * @param Aggregator $agg |
||||
| 64 | */ |
||||
| 65 | public function __construct(string $host, string $app, Aggregator $agg) |
||||
| 66 | { |
||||
| 67 | $this->host = $host; |
||||
| 68 | $this->app = $app; |
||||
| 69 | $this->agg = $agg; |
||||
| 70 | } |
||||
| 71 | |||||
| 72 | /** |
||||
| 73 | * @param Address $listen |
||||
| 74 | * @param Address $push |
||||
| 75 | * @return static |
||||
| 76 | */ |
||||
| 77 | public function start(Address $listen = null, Address $push = null) : self |
||||
| 78 | { |
||||
| 79 | if (is_null($listen)) { |
||||
| 80 | goto GATEWAY; |
||||
| 81 | } |
||||
| 82 | |||||
| 83 | try { |
||||
| 84 | ($this->httpd = Server::httpd( |
||||
| 85 | $listen, |
||||
| 86 | function (Connection $conn) { |
||||
| 87 | switch ($conn->request()->getUri()->getPath()) { |
||||
| 88 | case '/metrics': |
||||
| 89 | $conn->reply(new Response( |
||||
| 90 | 200, |
||||
| 91 | ['Content-Type' => 'text/plain; version=0.0.4'], |
||||
| 92 | $this->exporting() |
||||
| 93 | )); |
||||
| 94 | break; |
||||
| 95 | default: |
||||
| 96 | $conn->reply(new Response(404)); |
||||
| 97 | } |
||||
| 98 | }, |
||||
| 99 | 'prom-exporter' |
||||
| 100 | ))->serve(); |
||||
| 101 | logger('monitor')->info('Prometheus exporter started', ['app' => $this->app, 'listen' => $listen]); |
||||
| 102 | } catch (Throwable $e) { |
||||
| 103 | logger('monitor')->notice( |
||||
| 104 | 'Prometheus exporter listening failed', |
||||
| 105 | ['error' => sprintf('%s::%s', get_class($e), $e->getMessage())] |
||||
| 106 | ); |
||||
| 107 | } |
||||
| 108 | |||||
| 109 | GATEWAY: |
||||
| 110 | |||||
| 111 | if (is_null($push)) { |
||||
| 112 | goto END; |
||||
| 113 | } |
||||
| 114 | |||||
| 115 | $this->pushd = Timer::loop(self::ACT_PUSH_INV, co(function () use ($listen, $push) { |
||||
| 116 | yield $this->pushing(Client::post( |
||||
|
0 ignored issues
–
show
Bug
introduced
by
Loading history...
Are you sure the usage of
$this->pushing(Carno\HTT...ep-alive')), 'pushing') targeting Carno\Monitor\Output\Prometheus::pushing() seems to always return null.
This check looks for function or method calls that always return null and whose return value is used. class A
{
function getObject()
{
return null;
}
}
$a = new A();
if ($a->getObject()) {
The method The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes. Loading history...
|
|||||
| 117 | $this->gate = sprintf( |
||||
| 118 | 'http://%s:%d/metrics/job/%s/instance/%s', |
||||
| 119 | $push->host(), |
||||
| 120 | $push->port(), |
||||
| 121 | $this->host, |
||||
| 122 | $listen ?? 'default' |
||||
| 123 | ), |
||||
| 124 | $this->exporting(true), |
||||
| 125 | ['Connection' => 'keep-alive'] |
||||
| 126 | ), 'pushing'); |
||||
| 127 | })); |
||||
| 128 | |||||
| 129 | logger('monitor')->info('Prometheus reporter started', ['app' => $this->app, 'gateway' => $push]); |
||||
| 130 | |||||
| 131 | END: |
||||
| 132 | |||||
| 133 | return $this; |
||||
| 134 | } |
||||
| 135 | |||||
| 136 | /** |
||||
| 137 | */ |
||||
| 138 | public function stop() : void |
||||
| 139 | { |
||||
| 140 | $this->httpd && $this->httpd->shutdown(); |
||||
| 141 | $this->pushd && Timer::clear($this->pushd) && co(function () { |
||||
| 142 | $this->gate && yield $this->pushing(Client::delete($this->gate), 'removing'); |
||||
|
0 ignored issues
–
show
Carno\HTTP\Client::delete($this->gate) of type Carno\HTTP\Client\Responding is incompatible with the type Generator expected by parameter $caller of Carno\Monitor\Output\Prometheus::pushing().
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
Are you sure the usage of
$this->pushing(Carno\HTT...his->gate), 'removing') targeting Carno\Monitor\Output\Prometheus::pushing() seems to always return null.
This check looks for function or method calls that always return null and whose return value is used. class A
{
function getObject()
{
return null;
}
}
$a = new A();
if ($a->getObject()) {
The method The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes. Loading history...
|
|||||
| 143 | })(); |
||||
| 144 | } |
||||
| 145 | |||||
| 146 | /** |
||||
| 147 | * @param Generator $caller |
||||
| 148 | * @param string $action |
||||
| 149 | */ |
||||
| 150 | private function pushing(Generator $caller, string $action) |
||||
| 151 | { |
||||
| 152 | /** |
||||
| 153 | * @var Client\Responding $resp |
||||
| 154 | */ |
||||
| 155 | try { |
||||
| 156 | ($resp = yield $caller) && $resp->data(); |
||||
| 157 | } catch (Throwable $e) { |
||||
| 158 | logger('monitor')->notice( |
||||
| 159 | sprintf('Prometheus metrics %s failed', $action), |
||||
| 160 | ['error' => sprintf('%s::%s', get_class($e), $e->getMessage())] |
||||
| 161 | ); |
||||
| 162 | } |
||||
| 163 | } |
||||
| 164 | |||||
| 165 | /** |
||||
| 166 | * @param bool $push |
||||
| 167 | * @return string |
||||
| 168 | */ |
||||
| 169 | private function exporting(bool $push = false) : string |
||||
| 170 | { |
||||
| 171 | $now = $push ? 0 : (int)(microtime(true) * 1000); |
||||
| 172 | |||||
| 173 | $lines = $types = []; |
||||
| 174 | |||||
| 175 | foreach ($this->agg->metrics() as $typed => $metrics) { |
||||
| 176 | foreach ($metrics as $named => $groups) { |
||||
| 177 | foreach ($groups as $grouped => $stack) { |
||||
| 178 | $named = $this->format($named); |
||||
| 179 | |||||
| 180 | list($data, $labels, $description) = $stack; |
||||
| 181 | |||||
| 182 | $types[$named] = $typed; |
||||
| 183 | $description && $lines[] = sprintf('# HELP %s %s', $named, $description); |
||||
| 184 | |||||
| 185 | if (in_array($typed, [Metrical::COUNTER, Metrical::GAUGE])) { |
||||
| 186 | $lines[] = sprintf('%s{%s} %g', $named, $this->labeled($labels), $data['value']) |
||||
| 187 | . ($now ? sprintf(' %d', $now) : '') |
||||
| 188 | ; |
||||
| 189 | } elseif (in_array($typed, [Metrical::HISTOGRAM, Metrical::SUMMARY])) { |
||||
| 190 | $lines[] = sprintf('%s_sum{%s} %g', $named, $this->labeled($labels), $data['sum']); |
||||
| 191 | $lines[] = sprintf('%s_count{%s} %d', $named, $this->labeled($labels), $data['count']); |
||||
| 192 | if ($typed === Metrical::HISTOGRAM) { |
||||
| 193 | foreach ($data['buckets'] as $bucket) { |
||||
| 194 | list($bound, $observed) = $bucket; |
||||
| 195 | $lines[] = sprintf( |
||||
| 196 | '%s_bucket{%s} %d', |
||||
| 197 | $named, |
||||
| 198 | $this->labeled($labels, ['le' => $bound]), |
||||
| 199 | $observed |
||||
| 200 | ); |
||||
| 201 | } |
||||
| 202 | } elseif ($typed === Metrical::SUMMARY) { |
||||
| 203 | foreach ($data['quantiles'] as $quantile) { |
||||
| 204 | list($position, $value) = $quantile; |
||||
| 205 | $lines[] = sprintf( |
||||
| 206 | '%s{%s} %g', |
||||
| 207 | $named, |
||||
| 208 | $this->labeled($labels, ['quantile' => $position]), |
||||
| 209 | $value |
||||
| 210 | ); |
||||
| 211 | } |
||||
| 212 | } |
||||
| 213 | } |
||||
| 214 | } |
||||
| 215 | } |
||||
| 216 | } |
||||
| 217 | |||||
| 218 | foreach ($types as $named => $typed) { |
||||
| 219 | array_unshift($lines, sprintf('# TYPE %s %s', $named, $typed)); |
||||
| 220 | } |
||||
| 221 | |||||
| 222 | $push && $lines[] = ''; |
||||
| 223 | |||||
| 224 | return implode("\n", $lines); |
||||
| 225 | } |
||||
| 226 | |||||
| 227 | /** |
||||
| 228 | * @param array ...$stack |
||||
| 229 | * @return string |
||||
| 230 | */ |
||||
| 231 | private function labeled(array ...$stack) : string |
||||
| 232 | { |
||||
| 233 | $merged = ['host' => $this->host, 'app' => $this->app]; |
||||
| 234 | |||||
| 235 | foreach ($stack as $labels) { |
||||
| 236 | $merged = array_merge($merged, $labels); |
||||
| 237 | } |
||||
| 238 | |||||
| 239 | $labeling = []; |
||||
| 240 | |||||
| 241 | array_walk($merged, function (string $v, string $k) use (&$labeling) { |
||||
| 242 | $labeling[] = sprintf('%s="%s"', $k, $v); |
||||
| 243 | }); |
||||
| 244 | |||||
| 245 | return implode(',', $labeling); |
||||
| 246 | } |
||||
| 247 | |||||
| 248 | /** |
||||
| 249 | * @param string $name |
||||
| 250 | * @return string |
||||
| 251 | */ |
||||
| 252 | private function format(string $name) : string |
||||
| 253 | { |
||||
| 254 | return str_replace(['-', '.'], '_', $name); |
||||
| 255 | } |
||||
| 256 | } |
||||
| 257 |