ZipkinHAV2::connect()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 17
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 10
c 0
b 0
f 0
nc 2
nop 2
dl 0
loc 17
rs 9.9332
1
<?php
2
/**
3
 * Zipkin http api v2
4
 * User: moyo
5
 * Date: 24/11/2017
6
 * Time: 11:21 AM
7
 */
8
9
namespace Carno\Traced\Transport;
10
11
use function Carno\Coroutine\co;
12
use Carno\HTTP\Client;
13
use Carno\HTTP\Exception\RequestException;
14
use Carno\HTTP\Options as HOptions;
15
use Carno\HTTP\Standard\Request;
16
use Carno\HTTP\Standard\Response;
17
use Carno\HTTP\Standard\Streams\Body;
18
use Carno\HTTP\Standard\Uri;
19
use Carno\Net\Address;
20
use Carno\Pool\Exception\SelectWaitException;
21
use Carno\Pool\Options as POptions;
22
use Carno\Promise\Promised;
23
use Carno\Timer\Timer;
24
use Carno\Traced\Chips\TransferQueued;
25
use Carno\Tracing\Contracts\Transport;
26
use Closure;
27
28
class ZipkinHAV2 implements Transport
29
{
30
    use TransferQueued;
31
32
    /**
33
     * batch operating
34
     */
35
    private const BATCH_INV = 1500;
36
    private const BATCH_PACK = 100;
37
    private const BATCH_STACK = 10000;
38
39
    /**
40
     * @var Address
41
     */
42
    private $endpoint = null;
43
44
    /**
45
     * @var string
46
     */
47
    private $path = '/api/v2/spans';
48
49
    /**
50
     * @var Client
51
     */
52
    private $client = null;
53
54
    /**
55
     * @var string
56
     */
57
    private $daemon = null;
58
59
    /**
60
     * @param Address $endpoint
61
     * @param string $identify
62
     */
63
    public function connect(Address $endpoint, string $identify = null) : void
64
    {
65
        $this->options(self::BATCH_PACK, self::BATCH_STACK);
66
67
        $this->endpoint = $endpoint->port() > 0 ? $endpoint : new Address($endpoint->host(), 80);
68
69
        $this->path = $identify ?? $this->path;
70
71
        $this->client = new Client(
72
            (new HOptions())
73
                ->setTimeouts(1000)
74
                ->keepalive(new POptions(1, 10, 1, 1, 90, 30, 0, 1000, 800), "zipkin:{$this->endpoint}"),
75
            $this->endpoint
76
        );
77
78
        $this->daemon = Timer::loop(self::BATCH_INV, function () {
79
            $this->submitting();
80
        });
81
    }
82
83
    /**
84
     * @return Promised
85
     */
86
    public function disconnect() : Promised
87
    {
88
        Timer::clear($this->daemon);
89
90
        $closing = function () {
91
            $this->client->closed()->pended() && $this->client->close();
92
        };
93
94
        $this->stashed() ? $this->submitting($closing) : $closing();
95
96
        return $this->client->closed();
97
    }
98
99
    /**
100
     * @param string $data
101
     */
102
    public function loading(string $data) : void
103
    {
104
        $this->stashing($data);
105
    }
106
107
    /**
108
     * flush queued data
109
     */
110
    public function flushing() : void
111
    {
112
        if ($this->stashed() >= self::BATCH_PACK) {
113
            $this->submitting();
114
        }
115
    }
116
117
    /**
118
     * really submit to remote
119
     * @param Closure $then
120
     */
121
    private function submitting(Closure $then = null) : void
122
    {
123
        $this->spouting(co(function (array $spans) use ($then) {
124
            $request = new Request(
125
                'POST',
126
                new Uri('http', $this->endpoint->host(), $this->endpoint->port(), $this->path),
127
                [
128
                    'Content-Type' => 'application/json',
129
                ],
130
                new Body(sprintf('[%s]', implode(',', $spans)))
131
            );
132
            try {
133
                /**
134
                 * @var Response $response
135
                 */
136
                $response = yield $this->client->perform($request);
137
                if ((int)($response->getStatusCode() / 100) !== 2) {
138
                    logger('traced')->notice(
139
                        'Server not accepting',
140
                        [
141
                            'endpoint' => (string)$request->getUri(),
142
                            'error' => sprintf('#%d->%s', $response->getStatusCode(), (string)$response->getBody()),
143
                            'payload' => debug() ? (string)$request->getBody() : '[IGNORED]',
0 ignored issues
show
Bug introduced by
The function debug was not found. Maybe you did not declare it correctly or list all dependencies? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

143
                            'payload' => /** @scrutinizer ignore-call */ debug() ? (string)$request->getBody() : '[IGNORED]',
Loading history...
144
                        ]
145
                    );
146
                }
147
            } catch (RequestException | SelectWaitException $e) {
148
                logger('traced')->notice(
149
                    'Posting failed',
150
                    [
151
                        'endpoint' => (string)$request->getUri(),
152
                        'error' => sprintf('%s::%s', get_class($e), $e->getMessage()),
153
                    ]
154
                );
155
            }
156
            $then && $then();
157
        }));
158
    }
159
}
160