1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Bavix\Entry\Commands; |
4
|
|
|
|
5
|
|
|
use Bavix\LaravelClickHouse\Database\Eloquent\Model as Entry; |
6
|
|
|
use Bavix\Entry\Services\BulkService; |
7
|
|
|
use Bavix\Entry\Jobs\BulkWriter; |
8
|
|
|
use Illuminate\Console\Command; |
9
|
|
|
use Illuminate\Contracts\Cache\LockTimeoutException; |
10
|
|
|
use Illuminate\Support\Facades\Cache; |
11
|
|
|
|
12
|
|
|
class BulkWrite extends Command |
13
|
|
|
{ |
14
|
|
|
|
15
|
|
|
/** |
16
|
|
|
* The name and signature of the console command. |
17
|
|
|
* |
18
|
|
|
* @var string |
19
|
|
|
*/ |
20
|
|
|
protected $signature = 'entry:bulk'; |
21
|
|
|
|
22
|
|
|
/** |
23
|
|
|
* The console command description. |
24
|
|
|
* |
25
|
|
|
* @var string |
26
|
|
|
*/ |
27
|
|
|
protected $description = 'Pull out from redis and throws in the queue for recording'; |
28
|
|
|
|
29
|
|
|
/** |
30
|
|
|
* @return void |
31
|
|
|
* @throws |
32
|
|
|
*/ |
33
|
|
|
public function handle(): void |
34
|
|
|
{ |
35
|
|
|
$lock = Cache::lock(__CLASS__, 120); |
36
|
|
|
try { |
37
|
|
|
$lock->block(1); |
38
|
|
|
// Lock acquired after waiting maximum of second... |
39
|
|
|
$batchSize = \config('entry.batchSize', 10000); |
40
|
|
|
$queueName = \config('entry.queueName', 'default'); |
41
|
|
|
$keys = app(BulkService::class)->keys(); |
42
|
|
|
foreach ($keys as $key) { |
43
|
|
|
[$bulkName, $class] = \explode(':', $key, 2); |
44
|
|
|
$chunkIterator = app(BulkService::class) |
45
|
|
|
->chunkIterator($batchSize, $key); |
46
|
|
|
|
47
|
|
|
foreach ($chunkIterator as $bulkData) { |
48
|
|
|
foreach ($bulkData as $itemKey => $itemValue) { |
49
|
|
|
$bulkData[$itemKey] = \json_decode($itemValue, true); |
50
|
|
|
} |
51
|
|
|
|
52
|
|
|
$modelEntry = new $class; |
53
|
|
|
$bulkData = $this->bulkHandling($modelEntry, $bulkData); |
54
|
|
|
if ($bulkData) { |
|
|
|
|
55
|
|
|
$job = new BulkWriter($modelEntry, $bulkData); |
56
|
|
|
$job->onQueue($queueName); |
57
|
|
|
\dispatch($job); |
58
|
|
|
} |
59
|
|
|
} |
60
|
|
|
} |
61
|
|
|
} catch (LockTimeoutException $timeoutException) { |
62
|
|
|
// Unable to acquire lock... |
63
|
|
|
} finally { |
64
|
|
|
optional($lock)->release(); |
65
|
|
|
} |
66
|
|
|
} |
67
|
|
|
|
68
|
|
|
/** |
69
|
|
|
* The process of processing data before sending it to the queue. |
70
|
|
|
* Here we analyze the data and Supplement it with information from the heap. |
71
|
|
|
* |
72
|
|
|
* @param Entry $entry |
73
|
|
|
* @param array $bulk |
74
|
|
|
* @return array |
75
|
|
|
*/ |
76
|
|
|
protected function bulkHandling(Entry $entry, array $bulk): array |
|
|
|
|
77
|
|
|
{ |
78
|
|
|
return $bulk; |
79
|
|
|
} |
80
|
|
|
|
81
|
|
|
} |
82
|
|
|
|
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.