Completed
Push — master ( ac84ef...df650f )
by Stanislav
02:18
created

InfluxDbClient::sendTorrentPoints()   A

Complexity

Conditions 3
Paths 2

Size

Total Lines 10
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 10
rs 9.4285
cc 3
eloc 7
nc 2
nop 2
1
<?php
2
3
namespace Popstas\Transmission\Console;
4
5
use GuzzleHttp\Exception\ConnectException;
6
use InfluxDB;
7
use Martial\Transmission\API\Argument\Torrent;
8
use Popstas\Transmission\Console\Helpers\TorrentUtils;
9
use Psr\Log\LoggerInterface;
10
11
class InfluxDbClient
12
{
13
    /**
14
     * @var InfluxDB\Client $influxDb
15
     */
16
    private $influxDb;
17
18
    /**
19
     * @var InfluxDB\Database
20
     */
21
    private $database;
22
23
    /**
24
     * @var LoggerInterface
25
     */
26
    private $logger;
27
28
    private $databaseName;
29
30
    public function __construct(InfluxDB\Client $influxDb, $databaseName)
31
    {
32
33
        $this->influxDb = $influxDb;
34
        $this->databaseName = $databaseName;
35
    }
36
37
    /**
38
     * @return InfluxDB\Database $database
39
     */
40
    private function getDatabase()
41
    {
42
        if (!isset($this->database)) {
43
            $this->database = $this->connectDatabase();
44
        }
45
        return $this->database;
46
    }
47
48
    /**
49
     * @param InfluxDB\Database $database
50
     */
51
    public function setDatabase($database)
52
    {
53
        $this->database = $database;
54
    }
55
56
    /**
57
     * Injects a logger.
58
     *
59
     * @param LoggerInterface $logger
60
     */
61
    public function setLogger(LoggerInterface $logger)
62
    {
63
        $this->logger = $logger;
64
    }
65
66
    private function log($level, $message, $context = [])
67
    {
68
        if (!is_null($this->logger)) {
69
            $this->logger->log($level, $message, $context);
70
        }
71
    }
72
73
    /**
74
     * @return InfluxDB\Database
75
     * @throws InfluxDB\Database\Exception
76
     */
77
    public function connectDatabase()
78
    {
79
        if (isset($this->database)) {
80
            $this->database;
81
        }
82
83
        $database = $this->influxDb->selectDB($this->databaseName);
84
85
        try {
86
            $databaseExists = $database->exists();
87
        } catch (ConnectException $e) {
88
            throw new \RuntimeException('InfluxDb connection error: ' . $e->getMessage());
89
        }
90
        if (!$databaseExists) {
91
            $this->log('info', 'Database ' . $this->databaseName . ' not exists, creating');
92
            $database->create();
93
        }
94
95
        return $database;
96
    }
97
98
    /**
99
     * @param array $torrent
100
     * @param string $transmissionHost
101
     * @return InfluxDB\Point
102
     */
103
    public function buildPoint(array $torrent, $transmissionHost)
104
    {
105
        $age = TorrentUtils::getTorrentAge($torrent);
106
        $lastPoint = $this->getLastPoint($torrent, $transmissionHost);
107
108
        $tagsData = [
109
            'host'             => $transmissionHost,
110
            'torrent_name'     => $torrent[Torrent\Get::NAME],
111
        ];
112
113
        $uploadedDerivative = count($lastPoint) && $torrent[Torrent\Get::UPLOAD_EVER] - $lastPoint['last'] >= 0 ?
114
            $torrent[Torrent\Get::UPLOAD_EVER] - $lastPoint['last'] : $torrent[Torrent\Get::UPLOAD_EVER];
115
116
        $fieldsData = [
117
            'uploaded_last'       => $uploadedDerivative,
118
            'downloaded'          => $torrent[Torrent\Get::TOTAL_SIZE],
119
            'age'                 => $age,
120
            'uploaded_per_day'    => $age ? intval($torrent[Torrent\Get::UPLOAD_EVER] / $age * 86400) : 0,
121
        ];
122
123
        return new InfluxDB\Point(
124
            'uploaded',
125
            $torrent[Torrent\Get::UPLOAD_EVER],
126
            $tagsData,
127
            $fieldsData,
128
            time()
129
        );
130
    }
131
132
    public function getLastPoint(array $torrent, $transmissionHost)
133
    {
134
        $torrentName = $torrent[Torrent\Get::NAME];
135
        $queryBuilder = $this->getDatabase()->getQueryBuilder();
136
        $results = $queryBuilder
137
            ->last('value')
138
            ->from('uploaded')
139
            ->where([
140
                "host='$transmissionHost'",
141
                "torrent_name='$torrentName'"
142
            ])
143
            ->getResultSet()
144
            ->getPoints();
145
146
        return count($results) ? $results[0] : [];
147
    }
148
    
149
    public function writePoints($points, $precision = InfluxDB\Database::PRECISION_SECONDS)
150
    {
151
        foreach ($points as $point) {
152
            $this->log('debug', 'Send point: {point}', ['point' => $point]);
153
        }
154
        return $this->getDatabase()->writePoints($points, $precision);
155
    }
156
157
    public function sendTorrentPoints(array $torrentList, $transmissionHost)
158
    {
159
        $points = [];
160
        foreach ($torrentList as $torrent) {
161
            $points[] = $this->buildPoint($torrent, $transmissionHost);
162
        }
163
        $isSuccess = $this->writePoints($points);
164
        $this->log('info', 'InfluxDB write ' . ($isSuccess ? 'success' : 'failed'));
165
        return $isSuccess;
166
    }
167
168
    /**
169
     * @param array $torrent
170
     * @param string $fieldName
171
     * @param string $transmissionHost
172
     * @param int $lastDays
173
     * @return int
174
     */
175
    public function getTorrentSum(array $torrent, $fieldName, $transmissionHost = '', $lastDays = 0)
176
    {
177
        $where = [];
178
179
        if (isset($torrent[Torrent\Get::NAME])) {
180
            $where[] = "torrent_name = '" . $torrent[Torrent\Get::NAME] . "'";
181
        }
182
183
        if ($transmissionHost) {
184
            $where[] = "host = '" . $transmissionHost . "'";
185
        }
186
187
        if ($lastDays) {
188
            $fromTimestamp = strtotime('-' . $lastDays . ' days');
189
            $fromDate = date('c', $fromTimestamp);
190
            $where[] = "time >= '$fromDate'";
191
        }
192
193
        $results = $this->getDatabase()->getQueryBuilder()
194
            ->from('uploaded')
195
            ->select("sum($fieldName) as $fieldName")
196
            ->where($where)
197
            ->getResultSet()
198
            ->getPoints();
199
        ;
200
201
        $this->log('debug', $this->influxDb->getLastQuery());
202
203
        if (!empty($results)) {
204
            return $results[0][$fieldName];
205
        }
206
        return 0;
207
    }
208
}
209