Passed
Push — main ( c5069f...73772f )
by
unknown
08:51
created

getEventsToInvalidate()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 15
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 12
c 1
b 0
f 0
nc 1
nop 1
dl 0
loc 15
rs 9.8666
1
<?php
2
3
namespace Padosoft\SuperCacheInvalidate\Console;
4
5
use Illuminate\Console\Command;
6
use Illuminate\Support\Carbon;
7
use Illuminate\Support\Facades\Cache;
8
use Illuminate\Support\Facades\DB;
9
use Padosoft\SuperCacheInvalidate\Helpers\SuperCacheInvalidationHelper;
10
11
class ProcessCacheInvalidationEventsCommand extends Command
12
{
13
    protected $signature = 'supercache:process-invalidation
14
                            {--shard= : The shard number to process}
15
                            {--priority= : The priority level}
16
                            {--limit= : The maximum number of events to fetch per batch}
17
                            {--tag-batch-size= : The number of identifiers to process per invalidation batch}
18
                            {--connection_name= : The Redis connection name}
19
                            {--log_attivo= : Indica se il log delle operazioni è attivo (0=no, 1=si)}';
20
    protected $description = 'Process cache invalidation events for a given shard and priority';
21
    protected SuperCacheInvalidationHelper $helper;
22
    protected bool $log_attivo = false;
23
    protected string $connection_name = 'cache';
24
    protected int $tagBatchSize = 100;
25
    protected int $limit = 1000;
26
    protected int $priority = 0;
27
    protected int $shardId = 0;
28
    protected int $invalidation_window = 600;
29
30
    public function __construct(SuperCacheInvalidationHelper $helper)
31
    {
32
        parent::__construct();
33
        $this->helper = $helper;
34
    }
35
36
    private function getEventsToInvalidate(Carbon $processingStartTime): array
37
    {
38
        $partitionCache_invalidation_events = $this->helper->getCacheInvalidationEventsUnprocessedPartitionName($this->shardId, $this->priority);
39
        return DB::table(DB::raw("`cache_invalidation_events` PARTITION ({$partitionCache_invalidation_events})"))
40
            ->select(['id', 'type', 'identifier', 'connection_name', 'partition_key', 'event_time'])
41
            ->where('processed', '=', 0)
42
            ->where('shard', '=', $this->shardId)
43
            ->where('priority', '=', $this->priority)
44
            // Cerco tutte le chiavi/tag da invalidare per questo database redis
45
            ->where('connection_name', '=', $this->connection_name)
46
            ->where('event_time', '<', $processingStartTime)
47
            ->orderBy('event_time')
48
            ->limit($this->limit)
49
            ->get()
50
            ->toArray()
51
        ;
52
    }
53
54
    private function getStoreFromConnectionName(string $connection_name): ?string
55
    {
56
        // Cerca il nome dello store associato alla connessione Redis
57
        foreach (config('cache.stores') as $storeName => $storeConfig) {
58
            if (
59
                isset($storeConfig['driver'], $storeConfig['connection']) &&
60
                $storeConfig['driver'] === 'redis' &&
61
                $storeConfig['connection'] === $connection_name
62
            ) {
63
                return $storeName;
64
            }
65
        }
66
67
        return null;
68
    }
69
70
    private function logIf(string $message, string $level = 'info')
71
    {
72
        if (!$this->log_attivo && $level === 'info') {
73
            return;
74
        }
75
        $this->$level(now()->toDateTimeString() . ' Shard[' . $this->shardId . '] Priority[' . $this->priority . '] Connection[' . $this->connection_name . '] : ' . PHP_EOL . $message);
76
    }
77
78
    protected function processEvents(): void
79
    {
80
        $processingStartTime = now();
81
        // Recupero gli eventi da invalidare
82
        $events = $this->getEventsToInvalidate($processingStartTime);
83
        $this->logIf('Trovati ' . count($events) . ' Eventi da invalidare');
84
        if (count($events) === 0) {
85
            return;
86
        }
87
88
        // Creiamo un array per memorizzare il valore più vecchio per ogni coppia (type, identifier) così elimino i doppioni che si sono accumulati nel tempo di apertura della finestra
89
        $unique_events = [];
90
91
        foreach ($events as $event) {
92
            $key = $event->type . ':' . $event->identifier; // Chiave univoca per type + identifier
93
            $event->event_time = \Illuminate\Support\Carbon::parse($event->event_time);
94
            // Quando la chiave non esiste o il nuovo valore ha un event_time più vecchio, lo sostituisco così a parità di tag ho sempre quello più vecchio e mi baso su quello per verificare la finestra
95
            if (!isset($unique_events[$key]) || $event->event_time <= $unique_events[$key]->event_time) {
96
                $unique_events[$key] = $event;
97
            }
98
        }
99
100
        $unique_events = array_values($unique_events);
101
102
        $this->logIf('Eventi unici ' . count($unique_events));
103
        // Quando il numero di eventi unici è inferiore al batchSize e quello più vecchio aspetta da almeno due minuti, mando l'invalidazione.
104
        // Questo serve per i siti piccoli che hanno pochi eventi, altrimenti si rischia di attendere troppo per invalidare i tags
105
        // In questo caso invalido i tag/key "unici" e setto a processed = 1 tutti quelli recuperati
106
        if (count($unique_events) < $this->tagBatchSize && $processingStartTime->diffInSeconds($unique_events[0]->event_time) >= 120) {
107
            $this->logIf('Il numero di eventi unici è inferiore al batchSize ( ' . $this->tagBatchSize . ' ) e sono passati più di due minuti, procedo');
108
            $this->processBatch($events, $unique_events);
109
110
            return;
111
        }
112
113
        // Altrimenti ho raggiunto il tagbatchsize e devo prendere solo quelli che hanno la finestra di invalidazione attiva
114
        $eventsToUpdate = [];
115
        $eventsAll = [];
116
        foreach ($unique_events as $event) {
117
            $elapsed = $processingStartTime->diffInSeconds($event->event_time);
118
            $typeFilter = $event->type;
119
            $identifierFilter = $event->identifier;
120
            if ($elapsed < $this->invalidation_window) {
121
                // Se la richiesta (cmq del più vecchio) non è nella finestra di invalidazione salto l'evento
122
                continue;
123
            }
124
            // altrimenti aggiungo l'evento a quelli da processare
125
            $eventsToUpdate[] = $event;
126
            // e recupero tutti gli ID che hanno quel tag/key
127
            $eventsAll[] = array_filter($events, function ($event) use ($typeFilter, $identifierFilter) {
128
                return $event->type === $typeFilter && $event->identifier === $identifierFilter;
129
            });
130
        }
131
        if (count($eventsToUpdate) === 0) {
132
            return;
133
        }
134
        $this->processBatch(array_merge(...$eventsAll), $eventsToUpdate);
135
    }
136
137
    protected function processBatch(array $allEvents, array $eventsToInvalidate): void
138
    {
139
        // Separo le chiavi dai tags
140
        $keys = [];
141
        $tags = [];
142
143
        foreach ($eventsToInvalidate as $item) {
144
            switch ($item->type) {
145
                case 'key':
146
                    $keys[] = $item->identifier . '§' . $item->connection_name;
147
                    break;
148
                case 'tag':
149
                    $tags[] = $item->identifier . '§' . $item->connection_name;
150
                    break;
151
            }
152
        }
153
154
        $this->logIf('Invalido ' . count($keys) . ' chiavi e ' . count($tags) . ' tags' . ' per un totale di ' . count($allEvents) . ' events_ID');
155
156
        if (!empty($keys)) {
157
            $this->invalidateKeys($keys);
158
        }
159
160
        if (!empty($tags)) {
161
            $this->invalidateTags($tags);
162
        }
163
164
        // Disabilita i controlli delle chiavi esterne e dei vincoli univoci
165
        DB::statement('SET FOREIGN_KEY_CHECKS=0;');
166
        DB::statement('SET UNIQUE_CHECKS=0;');
167
168
        // Mark event as processed
169
        // QUI NON VA USATA PARTITION perchè la cross partition è più lenta! Però è necessario utilizzare la $partition_key per sfruttare l'indice della primary key (id+partition_key)
170
        DB::table('cache_invalidation_events')
171
            ->whereIn('id', array_map(fn ($event) => $event->id, $allEvents))
172
            ->whereIn('partition_key', array_map(fn ($event) => $event->partition_key, $allEvents))
173
            ->update(['processed' => 1, 'updated_at' => now()])
174
        ;
175
        // Riattiva i controlli
176
        DB::statement('SET FOREIGN_KEY_CHECKS=1;');
177
        DB::statement('SET UNIQUE_CHECKS=1;');
178
    }
179
180
    /**
181
     * Invalidate cache keys.
182
     *
183
     * @param array $keys Array of cache keys to invalidate
184
     */
185
    protected function invalidateKeys(array $keys): void
186
    {
187
        $callback = config('super_cache_invalidate.key_invalidation_callback');
188
189
        // Anche in questo caso va fatto un loop perchè le chiavi potrebbero stare in database diversi
190
        foreach ($keys as $keyAndConnectionName) {
191
            [$key, $connection_name] = explode('§', $keyAndConnectionName);
192
193
            // Metodo del progetto
194
            if (is_callable($callback)) {
195
                $callback($key, $connection_name);
196
                continue;
197
            }
198
            // oppure di default uso Laravel
199
            $storeName =  $this->getStoreFromConnectionName($connection_name);
200
201
            if ($storeName === null) {
202
                continue;
203
            }
204
            Cache::store($storeName)->forget($key);
205
        }
206
    }
207
208
    /**
209
     * Invalidate cache tags.
210
     *
211
     * @param array $tags Array of cache tags to invalidate
212
     */
213
    protected function invalidateTags(array $tags): void
214
    {
215
        $callback = config('super_cache_invalidate.tag_invalidation_callback');
216
217
        $groupByConnection = [];
218
219
        // Anche in questo caso va fatto un loop perchè i tags potrebbero stare in database diversi,
220
        // ma per ottimizzare possiamo raggruppare le operazioni per connessione
221
        foreach ($tags as $tagAndConnectionName) {
222
            // chiave e connessione
223
            [$key, $connection] = explode('§', $tagAndConnectionName);
224
225
            // Aggiungo la chiave alla connessione appropriata
226
            $groupByConnection[$connection][] = $key;
227
        }
228
        if (is_callable($callback)) {
229
            foreach ($groupByConnection as $connection_name => $arrTags) {
230
                $callback($arrTags, $connection_name);
231
            }
232
233
            return;
234
        }
235
        foreach ($groupByConnection as $connection_name => $arrTags) {
236
            $storeName =  $this->getStoreFromConnectionName($connection_name);
237
            if ($storeName === null) {
238
                continue;
239
            }
240
            Cache::store($storeName)->tags($arrTags)->flush();
241
        }
242
    }
243
244
    /**
245
     * Execute the console command.
246
     */
247
    public function handle(): void
248
    {
249
        // Recupero dei parametri impostati nel command
250
        $this->shardId = (int) $this->option('shard');
251
        $this->priority = (int) $this->option('priority');
252
        $limit = $this->option('limit') ?? config('super_cache_invalidate.processing_limit');
253
        $this->limit = (int) $limit;
254
        $tagBatchSize = $this->option('tag-batch-size') ?? config('super_cache_invalidate.tag_batch_size');
255
        $this->tagBatchSize = (int) $tagBatchSize;
256
        $this->connection_name = $this->option('connection_name') ?? config('super_cache_invalidate.default_connection_name');
257
        $this->log_attivo = $this->option('log_attivo') && (int)$this->option('log_attivo') === 1;
258
        $this->invalidation_window = (int) config('super_cache_invalidate.invalidation_window');
259
        $lockTimeout = (int) config('super_cache_invalidate.lock_timeout');
260
261
        // Acquisisco il lock in modo da essere sicura che le esecuzioni non si accavallino
262
        $lockValue = $this->helper->acquireShardLock($this->shardId, $this->priority, $lockTimeout, $this->connection_name);
263
        $this->logIf('Starting Elaborazione ...' . $this->invalidation_window);
264
        if (!$lockValue) {
265
            return;
266
        }
267
        $startTime = microtime(true);
268
        try {
269
            $this->processEvents();
270
        } catch (\Throwable $e) {
271
            $this->logIf(sprintf(
272
                "Eccezione: %s in %s on line %d\nStack trace:\n%s",
273
                $e->getMessage(),
274
                $e->getFile(),
275
                $e->getLine(),
276
                $e->getTraceAsString()
277
            ), 'error');
278
        } finally {
279
            $this->helper->releaseShardLock($this->shardId, $this->priority, $lockValue, $this->connection_name);
280
        }
281
        $executionTime = (microtime(true) - $startTime) * 1000;
282
        $this->logIf('Fine Elaborazione - Tempo di esecuzione: ' . $executionTime . ' millisec.');
283
    }
284
}
285