Passed
Push — main ( b85551...acdca4 )
by
unknown
03:16
created

ListenerCommand::handle()   A

Complexity

Conditions 5
Paths 8

Size

Total Lines 32
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 5
eloc 17
c 3
b 0
f 0
nc 8
nop 0
dl 0
loc 32
rs 9.3888
1
<?php
2
3
namespace Padosoft\SuperCache\Console;
4
5
use Illuminate\Console\Command;
6
use Illuminate\Support\Str;
7
use Padosoft\SuperCache\RedisConnector;
8
use Illuminate\Support\Facades\Log;
9
use Padosoft\SuperCache\SuperCacheManager;
10
11
class ListenerCommand extends Command
12
{
13
    protected $signature = 'supercache:listener
14
                                {--connection_name= : (opzionale) nome della connessione redis }
15
                                {--namespace_id= : (opzionale) id del namespace da usare per suddividere i processi e da impostare se supercache.use_namespace = true }
16
                                {--checkEvent= : (opzionale) se 1 si esegue controllo su attivazione evento expired di Redis }
17
                                {--host= : (opzionale) host del nodo del cluster (da impostare solo in caso di Redis in cluster) }
18
                                {--port= : (opzionale) porta del nodo del cluster (da impostare solo in caso di Redis in cluster) }
19
                                ';
20
    protected $description = 'Listener per eventi di scadenza chiavi Redis';
21
    protected RedisConnector $redis;
22
    protected array $batch = []; // Accumula chiavi scadute
23
    protected int $batchSizeThreshold; // Numero di chiavi per batch
24
    protected int $timeThreshold; // Tempo massimo prima di processare il batch
25
    protected bool $useNamespace;
26
    protected SuperCacheManager $superCache;
27
28
    public function __construct(RedisConnector $redis, SuperCacheManager $superCache)
29
    {
30
        parent::__construct();
31
        $this->redis = $redis;
32
        // Parametri di batch processing da config
33
        $this->batchSizeThreshold = config('supercache.batch_size');
34
        $this->timeThreshold = config('supercache.time_threshold'); // secondi
35
        $this->useNamespace = (bool) config('supercache.use_namespace', false);
36
        $this->superCache = $superCache;
37
    }
38
39
    /**
40
     * Verifica se Redis è configurato per generare notifiche di scadenza.
41
     */
42
    protected function checkRedisNotifications(): bool
43
    {
44
        $checkEvent = $this->option('checkEvent');
45
        if ($checkEvent === null) {
0 ignored issues
show
introduced by
The condition $checkEvent === null is always false.
Loading history...
46
            return true;
47
        }
48
        if ((int) $checkEvent === 0) {
49
            return true;
50
        }
51
        $config = $this->redis->getRedisConnection($this->option('connection_name'))->config('GET', 'notify-keyspace-events');
52
53
        return str_contains($config['notify-keyspace-events'], 'Ex') || str_contains($config['notify-keyspace-events'], 'xE');
54
    }
55
56
    protected function onExpireEvent(string $key): void
57
    {
58
        $debug = 'EXPIRED $key: ' . $key . PHP_EOL .
0 ignored issues
show
Unused Code introduced by
The assignment to $debug is dead and can be removed.
Loading history...
59
            'Host: ' . $this->option('host') . PHP_EOL .
60
            'Port: ' . $this->option('port') . PHP_EOL .
61
            'Connection Name: ' . $this->option('connection_name') . PHP_EOL .
62
            'Namespace ID: ' . $this->option('namespace_id') . PHP_EOL;
63
        // Filtro le chiavi di competenza di questo listener, ovvero quelle che iniziano con gescat_laravel_database_supercache: e che eventualemnte terminano con ns<namespace_id> se c'è il namespace attivo
64
        // Attenzione la chiave arriva completa con il prefisso da conf redis.oprion.prefix + il prefisso della supercache
65
        // del tipo 'gescat_laravel_database_supercache:'
66
        $prefix = config('database.redis.options')['prefix'] . config('supercache.prefix');
67
        $cleanedKey = str_replace(['{', '}'], '', $key);
68
        if (!Str::startsWith($cleanedKey, $prefix)) {
69
            return;
70
        }
71
72
        if ($this->useNamespace && $this->option('namespace_id') !== null && !Str::endsWith($cleanedKey, 'ns' . $this->option('namespace_id'))) {
73
            return;
74
        }
75
76
        $original_key = str_replace(config('database.redis.options')['prefix'], '', $key);
77
        //$original_key = $this->superCache->getOriginalKey($key);
78
        $hash_key = crc32($original_key); // questo hash mi serve poi nello script LUA in quanto Redis non ha nativa la funzione crc32, ma solo il crc16 che però non è nativo in php
79
        $this->batch[] = $original_key . '|' . $hash_key; // faccio la concatenzazione con il '|' come separatore in quanto Lua non supporta array multidimensionali
80
    }
81
82
    /**
83
     * Verifica se è passato abbastanza tempo da processare il batch.
84
     */
85
    protected function shouldProcessBatchByTime(): bool
86
    {
87
        static $lastBatchTime = null;
88
        if (!$lastBatchTime) {
89
            $lastBatchTime = time();
90
91
            return false;
92
        }
93
94
        if ((time() - $lastBatchTime) >= $this->timeThreshold) {
95
            $lastBatchTime = time();
96
97
            return true;
98
        }
99
100
        return false;
101
    }
102
103
    protected function processBatchOnCluster(): void
104
    {
105
        foreach ($this->batch as $key) {
106
            $explodeKey = explode('|', $key);
107
            $cleanedKey = str_replace(['{', '}'], '', $explodeKey[0]);
108
            $this->superCache->forget($cleanedKey, $this->option('connection_name'), true, true, true);
109
        }
110
111
        $this->batch = [];
112
    }
113
114
    /**
115
     * Processa le chiavi accumulate in batch tramite uno script Lua.
116
     */
117
    protected function processBatchOnStandalone(): void
118
    {
119
        $debug = 'Processo batch: ' . implode(', ', $this->batch) . PHP_EOL .
0 ignored issues
show
Unused Code introduced by
The assignment to $debug is dead and can be removed.
Loading history...
120
            'Host: ' . $this->option('host') . PHP_EOL .
121
            'Port: ' . $this->option('port') . PHP_EOL;
122
123
        $luaScript = <<<'LUA'
124
125
        local success, result = pcall(function()
126
            local keys = ARGV
127
            local prefix = ARGV[1]
128
            local database_prefix = ARGV[3]
129
            local shard_count = ARGV[2]
130
            -- redis.log(redis.LOG_NOTICE, 'prefix: ' .. prefix);
131
            -- redis.log(redis.LOG_NOTICE, 'database_prefix: ' .. database_prefix);
132
            -- redis.log(redis.LOG_NOTICE, 'shard_count: ' .. shard_count);
133
            for i, key in ipairs(keys) do
134
                -- salto le prime 3 chiavi che ho usato come settings
135
                if i > 3 then
136
                    local row = {}
137
                    for value in string.gmatch(key, "[^|]+") do
138
                        table.insert(row, value)
139
                    end
140
                    local fullKey = database_prefix .. row[1]
141
                    -- redis.log(redis.LOG_NOTICE, 'Chiave Redis Expired: ' .. fullKey)
142
                    -- Controlla se la chiave è effettivamente scaduta
143
                    if redis.call('EXISTS', fullKey) == 0 then
144
                        -- local tagsKey = prefix .. 'tags:' .. row[1]
145
                        local tagsKey = fullKey .. ':tags'
146
                        -- redis.log(redis.LOG_NOTICE, 'Tag: ' .. tagsKey);
147
                        local tags = redis.call("SMEMBERS", tagsKey)
148
                        -- redis.log(redis.LOG_NOTICE, 'Tags associati: ' .. table.concat(tags, ", "));
149
                        -- Rimuove la chiave dai set di tag associati
150
                        for j, tag in ipairs(tags) do
151
                            local shardIndex = tonumber(row[2]) % tonumber(shard_count)
152
                            local shardKey = database_prefix .. prefix .. "tag:" .. tag .. ":shard:" .. shardIndex
153
                            -- redis.log(redis.LOG_NOTICE, 'Rimuovo la chiave dallo shard: ' .. row[1]);
154
                            redis.call("SREM", shardKey, row[1])
155
                            -- redis.log(redis.LOG_NOTICE, 'Rimossa chiave tag: ' .. shardKey);
156
                        end
157
                        -- Rimuove l'associazione della chiave con i tag
158
                        redis.call("DEL", tagsKey)
159
                        -- redis.log(redis.LOG_NOTICE, 'Rimossa chiave tags: ' .. tagsKey);
160
                    else
161
                        redis.log(redis.LOG_WARNING, 'la chiave ' .. fullKey .. ' è ancora attiva');
162
                    end
163
                end
164
            end
165
        end)
166
        if not success then
167
            redis.log(redis.LOG_WARNING, "Errore durante l'esecuzione del batch: " .. result)
168
            return result;
169
        end
170
        return "OK"
171
        LUA;
172
173
174
        try {
175
            // Esegue lo script Lua passando le chiavi in batch
176
            $connection = $this->redis->getNativeRedisConnection($this->option('connection_name'), $this->option('host'), $this->option('port'));
177
178
            $return = $connection['connection']->eval($luaScript, [config('supercache.prefix'), config('supercache.num_shards'), config('database.redis.options')['prefix'], ...$this->batch], 0);
179
            if ($return !== 'OK') {
180
                Log::error('Errore durante l\'esecuzione dello script Lua: ' . $return);
181
            }
182
            // Pulisce il batch dopo il successo
183
            $this->batch = [];
184
            // Essendo una connessione nativa va chiusa
185
            $connection['connection']->close();
186
        } catch (\Throwable $e) {
187
            Log::error('Errore durante l\'esecuzione dello script Lua: ' . $e->getMessage());
188
        }
189
    }
190
191
    public function handle(): void
192
    {
193
        if (!$this->checkRedisNotifications()) {
194
            $this->error('Le notifiche di scadenza di Redis non sono abilitate. Abilitale per usare il listener.');
195
        }
196
197
        try {
198
            $async_connection = $this->redis->getNativeRedisConnection($this->option('connection_name'), $this->option('host'), $this->option('port'));
199
            $pattern = '__keyevent@' . $async_connection['database'] . '__:expired';
200
            // La psubscribe è BLOCCANTE, il command resta attivo finchè non cade la connessione
201
            $async_connection['connection']->psubscribe([$pattern], function ($redis, $channel, $message, $key) {
0 ignored issues
show
Bug introduced by
function(...) { /* ... */ } of type callable is incompatible with the type array|string expected by parameter $callback of Redis::psubscribe(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

201
            $async_connection['connection']->psubscribe([$pattern], /** @scrutinizer ignore-type */ function ($redis, $channel, $message, $key) {
Loading history...
202
                $this->onExpireEvent($key);
203
204
                // Verifica se è necessario processare il batch
205
                // In caso di un cluster Redis il primo che arriva al count impostato fa scattare la pulizia.
206
                // Possono andare in conflitto? No, perchè ogni nodo ha i suoi eventi, per cui non può esserci lo stesso evento expire su più nodi
207
                if (count($this->batch) >= $this->batchSizeThreshold || $this->shouldProcessBatchByTime()) {
208
                    //if (config('database.redis.clusters.' . ($this->option('connection_name') ?? 'default')) !== null) {
209
                    $this->processBatchOnCluster();
210
                    //} else {
211
                    //    $this->processBatchOnStandalone();
212
                    //}
213
                }
214
            });
215
        } catch (\Throwable $e) {
216
            $error = 'Errore durante la sottoscrizione agli eventi EXPIRED:' . PHP_EOL .
217
                'Host: ' . $this->option('host') . PHP_EOL .
218
                'Port: ' . $this->option('port') . PHP_EOL .
219
                'Connection Name: ' . $this->option('connection_name') . PHP_EOL .
220
                'Namespace ID: ' . $this->option('namespace_id') . PHP_EOL .
221
                'Error: ' . $e->getMessage();
222
            Log::error($error);
223
        }
224
    }
225
}
226