Passed
Push — master ( 8c3de8...fb82f9 )
by Tolga
01:29 queued 14s
created

gc.*GC.Run   A

Complexity

Conditions 5

Size

Total Lines 32
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 18
nop 0
dl 0
loc 32
rs 9.0333
c 0
b 0
f 0
1
package gc
2
3
import (
4
	"context"
5
	"database/sql"
6
	"errors"
7
	"fmt"
8
	"log/slog"
9
	"time"
10
11
	"github.com/Masterminds/squirrel"
12
13
	"github.com/Permify/permify/internal/storage/postgres"
14
	"github.com/Permify/permify/internal/storage/postgres/utils"
15
	db "github.com/Permify/permify/pkg/database/postgres"
16
)
17
18
// GC represents a Garbage Collector configuration for database cleanup.
19
type GC struct {
20
	// database is the database instance used for garbage collection.
21
	database *db.Postgres
22
	// interval is the duration between garbage collection runs.
23
	interval time.Duration
24
	// window is the time window for data considered for cleanup.
25
	window time.Duration
26
	// timeout is the maximum time allowed for a single GC run.
27
	timeout time.Duration
28
}
29
30
// NewGC creates a new GC instance with the provided configuration.
31
func NewGC(db *db.Postgres, opts ...Option) *GC {
32
	gc := &GC{
33
		interval: _defaultInterval,
34
		window:   _defaultWindow,
35
		timeout:  _defaultTimeout,
36
		database: db,
37
	}
38
39
	// Custom options
40
	for _, opt := range opts {
41
		opt(gc)
42
	}
43
44
	return gc
45
}
46
47
// Start initiates the garbage collection process periodically.
48
func (gc *GC) Start(ctx context.Context) error {
49
	ticker := time.NewTicker(gc.interval)
50
	defer ticker.Stop() // Ensure the ticker is stopped when the function exits.
51
52
	for {
53
		select {
54
		case <-ticker.C: // Periodically trigger garbage collection.
55
			if err := gc.Run(); err != nil {
56
				slog.Error("Garbage collection failed:", slog.Any("error", err))
57
				continue
58
			} else {
59
				slog.Info("Garbage collection completed successfully")
60
			}
61
		case <-ctx.Done():
62
			return ctx.Err() // Return context error if cancellation is requested.
63
		}
64
	}
65
}
66
67
// Run performs the garbage collection process.
68
func (gc *GC) Run() error {
69
	ctx, cancel := context.WithTimeout(context.Background(), gc.timeout)
70
	defer cancel()
71
72
	// Get the current time from the database timezone.
73
	var dbNow time.Time
74
	err := gc.database.WritePool.QueryRow(ctx, "SELECT NOW() AT TIME ZONE 'UTC'").Scan(&dbNow)
75
	if err != nil {
76
		slog.Error("Failed to get current time from the database:", slog.Any("error", err))
77
		return err
78
	}
79
80
	// Calculate the cutoff timestamp based on the window duration.
81
	cutoffTime := dbNow.Add(-gc.window)
82
83
	// Get all tenants for tenant-specific garbage collection
84
	tenants, err := gc.getAllTenants(ctx)
85
	if err != nil {
86
		slog.Error("Failed to retrieve tenants:", slog.Any("error", err))
87
		return err
88
	}
89
90
	// Process garbage collection for each tenant individually
91
	for _, tenantID := range tenants {
92
		if err := gc.runForTenant(ctx, tenantID, cutoffTime); err != nil {
93
			slog.Error("Garbage collection failed for tenant:", slog.String("tenant_id", tenantID), slog.Any("error", err))
94
			// Continue with other tenants even if one fails
95
			continue
96
		}
97
	}
98
99
	return nil
100
}
101
102
// getAllTenants retrieves all tenant IDs from the tenants table.
103
func (gc *GC) getAllTenants(ctx context.Context) ([]string, error) {
104
	builder := gc.database.Builder.
105
		Select("id").
106
		From("tenants").
107
		OrderBy("id")
108
109
	query, args, err := builder.ToSql()
110
	if err != nil {
111
		return nil, err
112
	}
113
114
	rows, err := gc.database.WritePool.Query(ctx, query, args...)
115
	if err != nil {
116
		return nil, err
117
	}
118
	defer rows.Close()
119
120
	var tenants []string
121
	for rows.Next() {
122
		var tenantID string
123
		if err := rows.Scan(&tenantID); err != nil {
124
			return nil, err
125
		}
126
		tenants = append(tenants, tenantID)
127
	}
128
129
	if err = rows.Err(); err != nil {
130
		return nil, err
131
	}
132
133
	return tenants, nil
134
}
135
136
// runForTenant performs garbage collection for a specific tenant.
137
func (gc *GC) runForTenant(ctx context.Context, tenantID string, cutoffTime time.Time) error {
138
	// Retrieve the last transaction ID for this specific tenant that occurred before the cutoff time.
139
	lastTransactionID, err := gc.getLastTransactionIDForTenant(ctx, tenantID, cutoffTime)
140
	if err != nil {
141
		slog.Error("Failed to retrieve last transaction ID for tenant:", slog.String("tenant_id", tenantID), slog.Any("error", err))
142
		return err
143
	}
144
145
	if lastTransactionID == 0 {
146
		// No transactions to clean up for this tenant
147
		return nil
148
	}
149
150
	// Delete records in relation_tuples, attributes, and transactions tables for this specific tenant.
151
	if err := gc.deleteRecordsForTenant(ctx, postgres.RelationTuplesTable, tenantID, lastTransactionID); err != nil {
152
		slog.Error("Failed to delete records in relation_tuples for tenant:", slog.String("tenant_id", tenantID), slog.Any("error", err))
153
		return err
154
	}
155
	if err := gc.deleteRecordsForTenant(ctx, postgres.AttributesTable, tenantID, lastTransactionID); err != nil {
156
		slog.Error("Failed to delete records in attributes for tenant:", slog.String("tenant_id", tenantID), slog.Any("error", err))
157
		return err
158
	}
159
	if err := gc.deleteTransactionsForTenant(ctx, tenantID, lastTransactionID); err != nil {
160
		slog.Error("Failed to delete transactions for tenant:", slog.String("tenant_id", tenantID), slog.Any("error", err))
161
		return err
162
	}
163
164
	slog.Debug("Garbage collection completed for tenant", slog.String("tenant_id", tenantID), slog.Uint64("last_transaction_id", lastTransactionID))
165
	return nil
166
}
167
168
// getLastTransactionIDForTenant retrieves the last transaction ID for a specific tenant that occurred before the provided timestamp.
169
func (gc *GC) getLastTransactionIDForTenant(ctx context.Context, tenantID string, before time.Time) (uint64, error) {
170
	builder := gc.database.Builder.
171
		Select("id").
172
		From(postgres.TransactionsTable).
173
		Where(squirrel.Eq{"tenant_id": tenantID}).
174
		Where(squirrel.Lt{"timestamp": before}).
175
		OrderBy("id DESC").
176
		Limit(1)
177
178
	tquery, targs, terr := builder.ToSql()
179
	if terr != nil {
180
		return 0, terr
181
	}
182
183
	var lastTransactionID uint64
184
	row := gc.database.WritePool.QueryRow(ctx, tquery, targs...)
185
	err := row.Scan(&lastTransactionID)
186
	if err != nil {
187
		if errors.Is(err, sql.ErrNoRows) {
188
			return 0, nil
189
		}
190
		return 0, err
191
	}
192
193
	return lastTransactionID, nil
194
}
195
196
// deleteRecordsForTenant generates and executes DELETE queries for relation_tuples and attributes tables for a specific tenant.
197
func (gc *GC) deleteRecordsForTenant(ctx context.Context, table string, tenantID string, lastTransactionID uint64) error {
198
	queryBuilder := utils.GenerateGCQueryForTenant(table, tenantID, lastTransactionID)
199
	query, args, err := queryBuilder.ToSql()
200
	if err != nil {
201
		return err
202
	}
203
204
	_, err = gc.database.WritePool.Exec(ctx, query, args...)
205
	return err
206
}
207
208
// deleteTransactionsForTenant deletes transactions for a specific tenant older than the provided lastTransactionID.
209
func (gc *GC) deleteTransactionsForTenant(ctx context.Context, tenantID string, lastTransactionID uint64) error {
210
	// Convert the provided lastTransactionID into a string format suitable for SQL queries.
211
	valStr := fmt.Sprintf("'%v'::xid8", lastTransactionID)
212
213
	// Create a Squirrel DELETE query builder for the 'transactions' table.
214
	queryBuilder := gc.database.Builder.Delete(postgres.TransactionsTable)
215
216
	// Create an expression to compare the 'id' column with the lastTransactionID using Lt.
217
	idExpr := squirrel.Expr(fmt.Sprintf("id < %s", valStr))
218
219
	// Add the WHERE clauses to filter transactions for the specific tenant and before the cutoff.
220
	queryBuilder = queryBuilder.Where(squirrel.Eq{"tenant_id": tenantID}).Where(idExpr)
221
222
	// Generate the SQL query and its arguments from the query builder.
223
	query, args, err := queryBuilder.ToSql()
224
	if err != nil {
225
		return err
226
	}
227
228
	// Execute the DELETE query with the provided context.
229
	_, err = gc.database.WritePool.Exec(ctx, query, args...)
230
	return err
231
}
232
233
// Legacy methods for backward compatibility - these are now deprecated and will be removed in future versions
234
235
// getLastTransactionID retrieves the last transaction ID from the transactions table that occurred before the provided timestamp.
236
// DEPRECATED: Use getLastTransactionIDForTenant instead for tenant-aware garbage collection.
237
func (gc *GC) getLastTransactionID(ctx context.Context, before time.Time) (uint64, error) {
238
	builder := gc.database.Builder.
239
		Select("id").
240
		From(postgres.TransactionsTable).
241
		Where(squirrel.Lt{"timestamp": before}).
242
		OrderBy("id DESC").
243
		Limit(1)
244
245
	tquery, targs, terr := builder.ToSql()
246
	if terr != nil {
247
		return 0, terr
248
	}
249
250
	var lastTransactionID uint64
251
	row := gc.database.WritePool.QueryRow(ctx, tquery, targs...)
252
	err := row.Scan(&lastTransactionID)
253
	if err != nil {
254
		if errors.Is(err, sql.ErrNoRows) {
255
			return 0, nil
256
		}
257
		return 0, err
258
	}
259
260
	return lastTransactionID, nil
261
}
262