NNTmux /
newznab-tmux
| 1 | <?php |
||||
| 2 | |||||
| 3 | declare(strict_types=1); |
||||
| 4 | |||||
| 5 | namespace App\Services\Binaries; |
||||
| 6 | |||||
| 7 | use Illuminate\Support\Facades\DB; |
||||
| 8 | use Illuminate\Support\Facades\Log; |
||||
| 9 | |||||
| 10 | /** |
||||
| 11 | * Handles binary record creation and updates during header storage. |
||||
| 12 | */ |
||||
| 13 | final class BinaryHandler |
||||
| 14 | { |
||||
| 15 | /** @var array<int, array{Size: int, Parts: int}> Pending binary updates */ |
||||
| 16 | private array $binariesUpdate = []; |
||||
| 17 | |||||
| 18 | /** @var array<int, true> IDs of binaries created in this batch */ |
||||
| 19 | private array $insertedBinaryIds = []; |
||||
| 20 | |||||
| 21 | /** @var array<string, array{CollectionID: int, BinaryID: int}> Processed articles */ |
||||
| 22 | private array $articles = []; |
||||
| 23 | |||||
| 24 | public function __construct() {} |
||||
| 25 | |||||
| 26 | /** |
||||
| 27 | * Reset state for a new batch. |
||||
| 28 | */ |
||||
| 29 | public function reset(): void |
||||
| 30 | { |
||||
| 31 | $this->binariesUpdate = []; |
||||
| 32 | $this->insertedBinaryIds = []; |
||||
| 33 | $this->articles = []; |
||||
| 34 | } |
||||
| 35 | |||||
| 36 | /** |
||||
| 37 | * Get or create a binary for the given header. |
||||
| 38 | * |
||||
| 39 | * @return int|null Binary ID or null on failure |
||||
| 40 | */ |
||||
| 41 | public function getOrCreateBinary( |
||||
| 42 | array $header, |
||||
| 43 | int $collectionId, |
||||
| 44 | int $groupId, |
||||
| 45 | int $fileNumber |
||||
| 46 | ): ?int { |
||||
| 47 | $articleKey = $header['matches'][1]; |
||||
| 48 | |||||
| 49 | // Return cached if already processed |
||||
| 50 | if (isset($this->articles[$articleKey])) { |
||||
| 51 | $binaryId = $this->articles[$articleKey]['BinaryID']; |
||||
| 52 | $this->binariesUpdate[$binaryId]['Size'] += $header['Bytes']; |
||||
| 53 | $this->binariesUpdate[$binaryId]['Parts']++; |
||||
| 54 | |||||
| 55 | return $binaryId; |
||||
| 56 | } |
||||
| 57 | |||||
| 58 | $hash = md5($header['matches'][1].$header['From'].$groupId); |
||||
| 59 | $driver = DB::getDriverName(); |
||||
| 60 | |||||
| 61 | try { |
||||
| 62 | $binaryId = $this->insertOrGetBinary( |
||||
| 63 | $driver, |
||||
| 64 | $hash, |
||||
| 65 | $header, |
||||
| 66 | $collectionId, |
||||
| 67 | $fileNumber |
||||
| 68 | ); |
||||
| 69 | |||||
| 70 | if ($binaryId > 0) { |
||||
| 71 | $this->binariesUpdate[$binaryId] = ['Size' => 0, 'Parts' => 0]; |
||||
| 72 | $this->articles[$articleKey] = [ |
||||
| 73 | 'CollectionID' => $collectionId, |
||||
| 74 | 'BinaryID' => $binaryId, |
||||
| 75 | ]; |
||||
| 76 | |||||
| 77 | return $binaryId; |
||||
| 78 | } |
||||
| 79 | } catch (\Throwable $e) { |
||||
| 80 | if (config('app.debug') === true) { |
||||
| 81 | Log::error('Binary insert failed: '.$e->getMessage()); |
||||
| 82 | } |
||||
| 83 | } |
||||
| 84 | |||||
| 85 | return null; |
||||
| 86 | } |
||||
| 87 | |||||
| 88 | private function insertOrGetBinary( |
||||
| 89 | string $driver, |
||||
| 90 | string $hash, |
||||
| 91 | array $header, |
||||
| 92 | int $collectionId, |
||||
| 93 | int $fileNumber |
||||
| 94 | ): int { |
||||
| 95 | $name = mb_convert_encoding($header['matches'][1], 'UTF-8', mb_list_encodings()); |
||||
| 96 | $totalParts = (int) $header['matches'][3]; |
||||
| 97 | $partSize = (int) $header['Bytes']; |
||||
| 98 | |||||
| 99 | if ($driver === 'sqlite') { |
||||
| 100 | return $this->insertBinarySqlite($hash, $name, $collectionId, $totalParts, $fileNumber, $partSize); |
||||
|
0 ignored issues
–
show
Bug
introduced
by
Loading history...
|
|||||
| 101 | } |
||||
| 102 | |||||
| 103 | return $this->insertBinaryMysql($hash, $name, $collectionId, $totalParts, $fileNumber, $partSize); |
||||
|
0 ignored issues
–
show
It seems like
$name can also be of type array; however, parameter $name of App\Services\Binaries\Bi...er::insertBinaryMysql() does only seem to accept string, maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 104 | } |
||||
| 105 | |||||
| 106 | private function insertBinarySqlite( |
||||
| 107 | string $hash, |
||||
| 108 | string $name, |
||||
| 109 | int $collectionId, |
||||
| 110 | int $totalParts, |
||||
| 111 | int $fileNumber, |
||||
| 112 | int $partSize |
||||
| 113 | ): int { |
||||
| 114 | DB::statement( |
||||
| 115 | 'INSERT OR IGNORE INTO binaries (binaryhash, name, collections_id, totalparts, currentparts, filenumber, partsize) VALUES (?, ?, ?, ?, 1, ?, ?)', |
||||
| 116 | [$hash, $name, $collectionId, $totalParts, $fileNumber, $partSize] |
||||
| 117 | ); |
||||
| 118 | |||||
| 119 | $lastId = (int) DB::connection()->getPdo()->lastInsertId(); |
||||
| 120 | if ($lastId > 0) { |
||||
| 121 | $this->insertedBinaryIds[$lastId] = true; |
||||
| 122 | |||||
| 123 | return $lastId; |
||||
| 124 | } |
||||
| 125 | |||||
| 126 | $bin = DB::selectOne( |
||||
| 127 | 'SELECT id FROM binaries WHERE binaryhash = ? AND collections_id = ? LIMIT 1', |
||||
| 128 | [$hash, $collectionId] |
||||
| 129 | ); |
||||
| 130 | |||||
| 131 | return (int) ($bin->id ?? 0); |
||||
| 132 | } |
||||
| 133 | |||||
| 134 | private function insertBinaryMysql( |
||||
| 135 | string $hash, |
||||
| 136 | string $name, |
||||
| 137 | int $collectionId, |
||||
| 138 | int $totalParts, |
||||
| 139 | int $fileNumber, |
||||
| 140 | int $partSize |
||||
| 141 | ): int { |
||||
| 142 | $sql = 'INSERT INTO binaries ' |
||||
| 143 | .'(binaryhash, name, collections_id, totalparts, currentparts, filenumber, partsize) ' |
||||
| 144 | .'VALUES (UNHEX(?), ?, ?, ?, 1, ?, ?) ' |
||||
| 145 | .'ON DUPLICATE KEY UPDATE currentparts = currentparts + 1, partsize = partsize + VALUES(partsize)'; |
||||
| 146 | |||||
| 147 | DB::statement($sql, [$hash, $name, $collectionId, $totalParts, $fileNumber, $partSize]); |
||||
| 148 | |||||
| 149 | $lastId = (int) DB::connection()->getPdo()->lastInsertId(); |
||||
| 150 | if ($lastId > 0) { |
||||
| 151 | $this->insertedBinaryIds[$lastId] = true; |
||||
| 152 | |||||
| 153 | return $lastId; |
||||
| 154 | } |
||||
| 155 | |||||
| 156 | $bin = DB::selectOne( |
||||
| 157 | 'SELECT id FROM binaries WHERE binaryhash = UNHEX(?) AND collections_id = ? LIMIT 1', |
||||
| 158 | [$hash, $collectionId] |
||||
| 159 | ); |
||||
| 160 | |||||
| 161 | return (int) ($bin->id ?? 0); |
||||
| 162 | } |
||||
| 163 | |||||
| 164 | /** |
||||
| 165 | * Flush accumulated size/parts updates to the database. |
||||
| 166 | */ |
||||
| 167 | public function flushUpdates(int $chunkSize = 1000): bool |
||||
| 168 | { |
||||
| 169 | $updates = $this->getPendingUpdates(); |
||||
| 170 | if (empty($updates)) { |
||||
| 171 | return true; |
||||
| 172 | } |
||||
| 173 | |||||
| 174 | $driver = DB::getDriverName(); |
||||
| 175 | |||||
| 176 | try { |
||||
| 177 | if ($driver === 'sqlite') { |
||||
| 178 | return $this->flushUpdatesSqlite($updates); |
||||
| 179 | } |
||||
| 180 | |||||
| 181 | return $this->flushUpdatesMysql($updates, $chunkSize); |
||||
| 182 | } catch (\Throwable $e) { |
||||
| 183 | if (config('app.debug') === true) { |
||||
| 184 | Log::error('Binaries aggregate update failed: '.$e->getMessage()); |
||||
| 185 | } |
||||
| 186 | |||||
| 187 | return false; |
||||
| 188 | } |
||||
| 189 | } |
||||
| 190 | |||||
| 191 | private function flushUpdatesSqlite(array $updates): bool |
||||
| 192 | { |
||||
| 193 | foreach ($updates as $row) { |
||||
| 194 | DB::statement( |
||||
| 195 | 'UPDATE binaries SET partsize = partsize + ?, currentparts = currentparts + ? WHERE id = ?', |
||||
| 196 | [$row['partsize'], $row['currentparts'], $row['id']] |
||||
| 197 | ); |
||||
| 198 | } |
||||
| 199 | |||||
| 200 | return true; |
||||
| 201 | } |
||||
| 202 | |||||
| 203 | private function flushUpdatesMysql(array $updates, int $chunkSize): bool |
||||
| 204 | { |
||||
| 205 | foreach (array_chunk($updates, $chunkSize) as $chunk) { |
||||
| 206 | $placeholders = []; |
||||
| 207 | $bindings = []; |
||||
| 208 | |||||
| 209 | foreach ($chunk as $row) { |
||||
| 210 | $placeholders[] = '(?,?,?)'; |
||||
| 211 | $bindings[] = $row['id']; |
||||
| 212 | $bindings[] = $row['partsize']; |
||||
| 213 | $bindings[] = $row['currentparts']; |
||||
| 214 | } |
||||
| 215 | |||||
| 216 | $sql = 'INSERT INTO binaries (id, partsize, currentparts) VALUES '.implode(',', $placeholders) |
||||
| 217 | .' ON DUPLICATE KEY UPDATE partsize = partsize + VALUES(partsize), currentparts = currentparts + VALUES(currentparts)'; |
||||
| 218 | |||||
| 219 | DB::statement($sql, $bindings); |
||||
| 220 | } |
||||
| 221 | |||||
| 222 | return true; |
||||
| 223 | } |
||||
| 224 | |||||
| 225 | /** |
||||
| 226 | * Check if article is already processed. |
||||
| 227 | */ |
||||
| 228 | public function hasArticle(string $articleKey): bool |
||||
| 229 | { |
||||
| 230 | return isset($this->articles[$articleKey]); |
||||
| 231 | } |
||||
| 232 | |||||
| 233 | /** |
||||
| 234 | * Get IDs created in this batch. |
||||
| 235 | */ |
||||
| 236 | public function getInsertedIds(): array |
||||
| 237 | { |
||||
| 238 | return array_keys($this->insertedBinaryIds); |
||||
| 239 | } |
||||
| 240 | |||||
| 241 | /** |
||||
| 242 | * Get pending binary updates that haven't been flushed. |
||||
| 243 | */ |
||||
| 244 | private function getPendingUpdates(): array |
||||
| 245 | { |
||||
| 246 | $rows = []; |
||||
| 247 | foreach ($this->binariesUpdate as $binaryId => $binary) { |
||||
| 248 | if (($binary['Size'] ?? 0) > 0 || ($binary['Parts'] ?? 0) > 0) { |
||||
| 249 | $rows[] = [ |
||||
| 250 | 'id' => $binaryId, |
||||
| 251 | 'partsize' => $binary['Size'], |
||||
| 252 | 'currentparts' => $binary['Parts'], |
||||
| 253 | ]; |
||||
| 254 | } |
||||
| 255 | } |
||||
| 256 | |||||
| 257 | return $rows; |
||||
| 258 | } |
||||
| 259 | } |
||||
| 260 | |||||
| 261 |